Files
rpa_vision_v3/agent_v0/server_v1/session_worker.py
Dom ae65be2555 chore: ajouter agent_v0/ au tracking git (était un repo embarqué)
Suppression du .git embarqué dans agent_v0/ — le code est maintenant
tracké normalement dans le repo principal.
Inclut : agent_v1 (client), server_v1 (streaming), lea_ui (chat client)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 11:12:23 +01:00

254 lines
10 KiB
Python

# agent_v0/server_v1/session_worker.py
"""
SessionWorker — Traitement asynchrone des sessions finalisées en arrière-plan.
Résout le problème de finalize qui retourne "insufficient_data" : l'analyse VLM
prend plusieurs minutes, et le client n'attend plus. Le worker traite les sessions
à son rythme et notifie quand le workflow est prêt.
Tourne dans un thread daemon. Traite une session à la fois.
Pour chaque session :
1. Analyse les screenshots via ScreenAnalyzer + VLM
2. Construit le workflow via GraphBuilder
3. Sauvegarde le workflow
4. Notifie que c'est prêt (callback)
"""
import logging
import threading
import time
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger("session_worker")
class SessionWorker:
"""Worker qui traite les sessions finalisées en arrière-plan.
Tourne dans un thread daemon. Traite une session à la fois.
Pour chaque session :
1. Analyse les screenshots via ScreenAnalyzer + VLM
2. Construit le workflow via GraphBuilder
3. Sauvegarde le workflow
4. Notifie que c'est prêt (callback on_complete)
"""
def __init__(self, processor, poll_interval: int = 10):
"""
Args:
processor: Instance de StreamProcessor partagée avec l'API.
poll_interval: Intervalle de polling en secondes quand la queue est vide.
"""
from .stream_processor import StreamProcessor
self._processor: StreamProcessor = processor
self._queue: List[str] = [] # session_ids à traiter
self._lock = threading.Lock()
self._running = False
self._current_session: Optional[str] = None
self._current_progress: Optional[Dict[str, Any]] = None
self._on_complete: Optional[Callable[[str, Dict[str, Any]], None]] = None
self._poll_interval = poll_interval
# Historique des traitements (succès et échecs)
self._completed: List[Dict[str, Any]] = []
self._failed: List[Dict[str, Any]] = []
self._thread: Optional[threading.Thread] = None
def start(self):
"""Démarre le worker dans un thread daemon."""
if self._running:
logger.warning("[WORKER] Déjà en cours d'exécution")
return
self._running = True
self._thread = threading.Thread(
target=self._process_loop,
name="SessionWorker",
daemon=True,
)
self._thread.start()
logger.info("[WORKER] Démarré — traitement asynchrone des sessions finalisées")
def stop(self):
"""Arrête proprement le worker."""
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=10)
logger.info("[WORKER] Arrêté")
def enqueue(self, session_id: str):
"""Ajoute une session à la file d'attente.
Évite les doublons : si la session est déjà dans la queue ou en cours
de traitement, elle n'est pas ré-ajoutée.
"""
with self._lock:
if session_id in self._queue:
logger.info(f"[WORKER] Session {session_id} déjà dans la queue, skip")
return
if self._current_session == session_id:
logger.info(f"[WORKER] Session {session_id} en cours de traitement, skip")
return
# Vérifier si déjà traitée avec succès
for item in self._completed:
if item.get("session_id") == session_id:
logger.info(f"[WORKER] Session {session_id} déjà traitée avec succès, skip")
return
self._queue.append(session_id)
logger.info(
f"[WORKER] Session {session_id} ajoutée à la queue "
f"(position {len(self._queue)})"
)
def get_status(self) -> Dict[str, Any]:
"""Retourne l'état complet du worker."""
with self._lock:
return {
"running": self._running,
"queue_length": len(self._queue),
"queue": list(self._queue),
"current_session": self._current_session,
"current_progress": dict(self._current_progress) if self._current_progress else None,
"completed_count": len(self._completed),
"completed": list(self._completed[-10:]), # 10 derniers
"failed_count": len(self._failed),
"failed": list(self._failed[-10:]), # 10 derniers
}
def _dequeue(self) -> Optional[str]:
"""Retire et retourne le prochain session_id de la queue."""
with self._lock:
if self._queue:
return self._queue.pop(0)
return None
def _process_loop(self):
"""Boucle principale — prend la prochaine session et la traite."""
logger.info("[WORKER] Boucle de traitement démarrée")
while self._running:
session_id = self._dequeue()
if session_id:
self._process_session(session_id)
else:
time.sleep(self._poll_interval)
def _process_session(self, session_id: str):
"""Traite une session complète (analyse screenshots + build workflow).
Utilise StreamProcessor.reprocess_session() qui :
1. Liste les screenshots shot_*_full.png sur disque
2. Appelle process_screenshot() pour chaque (VLM + CLIP)
3. Appelle finalize_session() pour construire le workflow
"""
with self._lock:
self._current_session = session_id
self._current_progress = {
"session_id": session_id,
"status": "starting",
"started_at": datetime.now().isoformat(),
"screenshots_total": 0,
"screenshots_processed": 0,
}
logger.info(f"[WORKER] === Début traitement session {session_id} ===")
start_time = time.time()
try:
result = self._processor.reprocess_session(
session_id,
progress_callback=self._update_progress,
)
elapsed = time.time() - start_time
if result.get("error"):
# Erreur pendant le traitement
logger.error(
f"[WORKER] Échec traitement session {session_id} "
f"après {elapsed:.1f}s : {result['error']}"
)
with self._lock:
self._failed.append({
"session_id": session_id,
"error": result["error"],
"elapsed_seconds": round(elapsed, 1),
"timestamp": datetime.now().isoformat(),
})
elif result.get("status") == "insufficient_data":
# Pas assez de screenshots valides
logger.warning(
f"[WORKER] Session {session_id} : données insuffisantes "
f"({result.get('states_count', 0)} states) après {elapsed:.1f}s"
)
with self._lock:
self._failed.append({
"session_id": session_id,
"error": "insufficient_data",
"states_count": result.get("states_count", 0),
"elapsed_seconds": round(elapsed, 1),
"timestamp": datetime.now().isoformat(),
})
else:
# Succès
logger.info(
f"[WORKER] Session {session_id} traitée avec succès en {elapsed:.1f}s | "
f"workflow={result.get('workflow_id', '?')} | "
f"{result.get('nodes', 0)} nodes, {result.get('edges', 0)} edges"
)
with self._lock:
self._completed.append({
"session_id": session_id,
"workflow_id": result.get("workflow_id"),
"workflow_name": result.get("workflow_name"),
"nodes": result.get("nodes", 0),
"edges": result.get("edges", 0),
"states_analyzed": result.get("states_analyzed", 0),
"elapsed_seconds": round(elapsed, 1),
"timestamp": datetime.now().isoformat(),
})
# Callback de notification
if self._on_complete:
try:
self._on_complete(session_id, result)
except Exception as e:
logger.error(f"[WORKER] Erreur callback on_complete: {e}")
except Exception as e:
elapsed = time.time() - start_time
logger.error(
f"[WORKER] Exception inattendue pour session {session_id} "
f"après {elapsed:.1f}s : {e}",
exc_info=True,
)
with self._lock:
self._failed.append({
"session_id": session_id,
"error": f"exception: {e}",
"elapsed_seconds": round(elapsed, 1),
"timestamp": datetime.now().isoformat(),
})
finally:
with self._lock:
self._current_session = None
self._current_progress = None
logger.info(f"[WORKER] === Fin traitement session {session_id} ===")
def _update_progress(self, session_id: str, current: int, total: int, shot_id: str = ""):
"""Callback de progression appelé par reprocess_session."""
with self._lock:
if self._current_progress:
self._current_progress["screenshots_total"] = total
self._current_progress["screenshots_processed"] = current
self._current_progress["status"] = "processing"
self._current_progress["current_shot"] = shot_id
logger.info(
f"[WORKER] Session {session_id} : screenshot {current}/{total}"
+ (f" ({shot_id})" if shot_id else "")
)