#!/usr/bin/env python3 """ Queue persistante pour le traitement des sessions. Gère une file d'attente de sessions à traiter avec: - Persistance sur disque (survit aux redémarrages) - Worker en arrière-plan - Retry automatique en cas d'échec - Status de traitement """ import json import logging import os import threading import time from pathlib import Path from datetime import datetime from typing import Dict, List, Optional from enum import Enum logger = logging.getLogger("processing_queue") class ProcessingStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" class ProcessingQueue: """ Queue persistante pour le traitement des sessions. """ def __init__(self, queue_file: str = "data/training/.processing_queue.json"): self.queue_file = Path(queue_file) self.queue_file.parent.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() self._worker_thread: Optional[threading.Thread] = None self._running = False def add(self, session_id: str, base_path: str = "data/training") -> None: """Ajoute une session à la queue de traitement.""" with self._lock: queue = self._load() # Éviter les doublons if any(item["session_id"] == session_id for item in queue): logger.info(f"Session {session_id} déjà dans la queue") return queue.append({ "session_id": session_id, "base_path": base_path, "status": ProcessingStatus.PENDING, "added_at": datetime.now().isoformat(), "attempts": 0, "last_error": None }) self._save(queue) logger.info(f"Session {session_id} ajoutée à la queue") def get_status(self, session_id: str) -> Optional[Dict]: """Retourne le status d'une session.""" with self._lock: queue = self._load() for item in queue: if item["session_id"] == session_id: return item return None def get_all(self) -> List[Dict]: """Retourne toutes les sessions dans la queue.""" with self._lock: return self._load() def get_pending_count(self) -> int: """Retourne le nombre de sessions en attente.""" with self._lock: queue = self._load() return sum(1 for item in queue if item["status"] == ProcessingStatus.PENDING) def start_worker(self, process_func) -> None: """ Démarre le worker de traitement en arrière-plan. Args: process_func: Fonction de traitement (session_id, base_path) -> dict """ if self._running: logger.warning("Worker déjà en cours d'exécution") return self._running = True self._worker_thread = threading.Thread( target=self._worker_loop, args=(process_func,), name="ProcessingWorker", daemon=False # Non-daemon pour finir le traitement en cours ) self._worker_thread.start() logger.info("Worker de traitement démarré") def stop_worker(self, wait: bool = True) -> None: """Arrête le worker de traitement.""" self._running = False if wait and self._worker_thread: self._worker_thread.join(timeout=30) logger.info("Worker de traitement arrêté") def _worker_loop(self, process_func) -> None: """Boucle principale du worker.""" while self._running: try: # Chercher une session à traiter session = self._get_next_pending() if session: session_id = session["session_id"] base_path = session["base_path"] logger.info(f"Traitement de la session: {session_id}") self._update_status(session_id, ProcessingStatus.PROCESSING) try: # Traiter la session result = process_func(session_id, base_path) if result.get("status") == "success": self._update_status(session_id, ProcessingStatus.COMPLETED) logger.info(f"Session {session_id} traitée avec succès") else: error = result.get("errors", ["Unknown error"]) self._mark_failed(session_id, str(error)) logger.error(f"Échec traitement session {session_id}: {error}") except Exception as e: self._mark_failed(session_id, str(e)) logger.exception(f"Erreur traitement session {session_id}: {e}") else: # Pas de session à traiter, attendre time.sleep(5) except Exception as e: logger.exception(f"Erreur dans le worker: {e}") time.sleep(10) def _get_next_pending(self) -> Optional[Dict]: """Retourne la prochaine session à traiter.""" with self._lock: queue = self._load() for item in queue: if item["status"] == ProcessingStatus.PENDING: return item return None def _update_status(self, session_id: str, status: ProcessingStatus) -> None: """Met à jour le status d'une session.""" with self._lock: queue = self._load() for item in queue: if item["session_id"] == session_id: item["status"] = status item["updated_at"] = datetime.now().isoformat() break self._save(queue) def _mark_failed(self, session_id: str, error: str) -> None: """Marque une session comme échouée.""" with self._lock: queue = self._load() for item in queue: if item["session_id"] == session_id: item["attempts"] = item.get("attempts", 0) + 1 item["last_error"] = error item["updated_at"] = datetime.now().isoformat() # Après 3 tentatives, marquer comme failed définitivement if item["attempts"] >= 3: item["status"] = ProcessingStatus.FAILED else: item["status"] = ProcessingStatus.PENDING # Retry break self._save(queue) def _load(self) -> List[Dict]: """Charge la queue depuis le fichier.""" if self.queue_file.exists(): try: with open(self.queue_file, "r") as f: return json.load(f) except Exception: pass return [] def _save(self, queue: List[Dict]) -> None: """Sauvegarde la queue dans le fichier.""" try: with open(self.queue_file, "w") as f: json.dump(queue, f, indent=2, default=str) except Exception as e: logger.error(f"Erreur sauvegarde queue: {e}") # Instance globale _queue: Optional[ProcessingQueue] = None def get_queue() -> ProcessingQueue: """Retourne l'instance globale de la queue.""" global _queue if _queue is None: _queue = ProcessingQueue() return _queue def add_to_queue(session_id: str, base_path: str = "data/training") -> None: """Ajoute une session à la queue de traitement.""" get_queue().add(session_id, base_path) def get_processing_status(session_id: str) -> Optional[Dict]: """Retourne le status de traitement d'une session.""" return get_queue().get_status(session_id) def start_processing_worker(process_func) -> None: """Démarre le worker de traitement.""" get_queue().start_worker(process_func) def stop_processing_worker() -> None: """Arrête le worker de traitement.""" get_queue().stop_worker()