From c1ce6a3964e6a0639e7d7ecfa8d148c0c8639fb2 Mon Sep 17 00:00:00 2001 From: Dom Date: Sat, 4 Apr 2026 18:48:00 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20s=C3=A9parer=20grounding=20(qwen2.5vl)?= =?UTF-8?q?=20et=20compr=C3=A9hension=20(gemma4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Grounding : qwen2.5vl:7b hardcodé (seul modèle avec bbox_2d précis) - Compréhension/VLM : gemma4:e4b via RPA_VLM_MODEL (description, identification) - Ajout think=False + num_predict=200 pour éviter le mode thinking gemma4 - Variable RPA_GROUNDING_MODEL pour override si besoin Co-Authored-By: Claude Opus 4.6 (1M context) --- agent_v0/server_v1/api_stream.py | 1232 +++++++++++++++++++++++++++++- 1 file changed, 1191 insertions(+), 41 deletions(-) diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index cec5c278e..359a75ea7 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -8,10 +8,12 @@ Tous les calculs GPU (ScreenAnalyzer, CLIP, FAISS) tournent ici sur le serveur. Inclut les endpoints de replay pour renvoyer des ordres d'exécution à l'Agent V1. """ +import atexit import json import logging import os import secrets +import signal import threading import time import uuid @@ -24,8 +26,9 @@ from fastapi import BackgroundTasks, Depends, FastAPI, File, HTTPException, Requ from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel +from .replay_failure_logger import log_replay_failure from .replay_verifier import ReplayVerifier, VerificationResult -from .stream_processor import StreamProcessor, build_replay_from_raw_events +from .stream_processor import StreamProcessor, build_replay_from_raw_events, enrich_click_from_screenshot from .worker_stream import StreamWorker # Instance globale du vérificateur de replay (comparaison screenshots avant/après) @@ -148,6 +151,556 @@ _KNOWN_KEY_NAMES = { } +# ========================================================================= +# Setup environnement — Préparation automatique avant le replay +# ========================================================================= +# Mapping des noms d'exécutables Windows courants vers la commande de lancement. +# Utilisé comme fallback pour le texte de recherche dans le menu Démarrer. +# Le format est : "processname.exe" (minuscule) -> commande shell +_APP_LAUNCH_COMMANDS: Dict[str, str] = { + "notepad.exe": "notepad", + "explorer.exe": "explorer", + "calc.exe": "calc", + "mspaint.exe": "mspaint", + "cmd.exe": "cmd", + "powershell.exe": "powershell", + "wordpad.exe": "wordpad", + "charmap.exe": "charmap", + "snippingtool.exe": "snippingtool", + "taskmgr.exe": "taskmgr", + "regedit.exe": "regedit", + "mstsc.exe": "mstsc", + "winword.exe": "winword", + "excel.exe": "excel", + "powerpnt.exe": "powerpnt", + "outlook.exe": "outlook", + "msedge.exe": "msedge", + "chrome.exe": "chrome", + "firefox.exe": "firefox", + "code.exe": "code", +} + +# Mapping des exécutables vers le nom visuel à chercher dans le menu Démarrer. +# Contient le texte de recherche (souvent le nom français) et une description +# pour le VLM afin d'identifier l'icône dans les résultats de recherche. +# Format : "processname.exe" -> {"search_text": ..., "display_name": ..., "vlm_description": ...} +_APP_VISUAL_SEARCH: Dict[str, Dict[str, str]] = { + "notepad.exe": { + "search_text": "Bloc-notes", + "display_name": "Bloc-notes", + "vlm_description": "L'application Bloc-notes (Notepad) dans les résultats de recherche", + }, + "calc.exe": { + "search_text": "Calculatrice", + "display_name": "Calculatrice", + "vlm_description": "L'application Calculatrice dans les résultats de recherche", + }, + "mspaint.exe": { + "search_text": "Paint", + "display_name": "Paint", + "vlm_description": "L'application Paint dans les résultats de recherche", + }, + "cmd.exe": { + "search_text": "Invite de commandes", + "display_name": "Invite de commandes", + "vlm_description": "L'Invite de commandes (Command Prompt) dans les résultats", + }, + "powershell.exe": { + "search_text": "PowerShell", + "display_name": "PowerShell", + "vlm_description": "Windows PowerShell dans les résultats de recherche", + }, + "wordpad.exe": { + "search_text": "WordPad", + "display_name": "WordPad", + "vlm_description": "L'application WordPad dans les résultats de recherche", + }, + "winword.exe": { + "search_text": "Word", + "display_name": "Microsoft Word", + "vlm_description": "Microsoft Word dans les résultats de recherche", + }, + "excel.exe": { + "search_text": "Excel", + "display_name": "Microsoft Excel", + "vlm_description": "Microsoft Excel dans les résultats de recherche", + }, + "powerpnt.exe": { + "search_text": "PowerPoint", + "display_name": "Microsoft PowerPoint", + "vlm_description": "Microsoft PowerPoint dans les résultats de recherche", + }, + "outlook.exe": { + "search_text": "Outlook", + "display_name": "Microsoft Outlook", + "vlm_description": "Microsoft Outlook dans les résultats de recherche", + }, + "msedge.exe": { + "search_text": "Edge", + "display_name": "Microsoft Edge", + "vlm_description": "Microsoft Edge dans les résultats de recherche", + }, + "chrome.exe": { + "search_text": "Chrome", + "display_name": "Google Chrome", + "vlm_description": "Google Chrome dans les résultats de recherche", + }, + "firefox.exe": { + "search_text": "Firefox", + "display_name": "Mozilla Firefox", + "vlm_description": "Mozilla Firefox dans les résultats de recherche", + }, + "code.exe": { + "search_text": "Visual Studio Code", + "display_name": "Visual Studio Code", + "vlm_description": "Visual Studio Code dans les résultats de recherche", + }, + "taskmgr.exe": { + "search_text": "Gestionnaire des tâches", + "display_name": "Gestionnaire des tâches", + "vlm_description": "Le Gestionnaire des tâches dans les résultats de recherche", + }, + "snippingtool.exe": { + "search_text": "Outil Capture", + "display_name": "Outil Capture d'écran", + "vlm_description": "L'Outil Capture d'écran dans les résultats de recherche", + }, + "mstsc.exe": { + "search_text": "Connexion Bureau à distance", + "display_name": "Bureau à distance", + "vlm_description": "La Connexion Bureau à distance dans les résultats", + }, +} + +# Applications Windows à ignorer pour le setup (processus système, agents, etc.) +_SETUP_IGNORE_APPS = { + "searchhost.exe", # Barre de recherche Windows + "explorer.exe", # Explorer est toujours lancé (shell Windows) + "pythonw.exe", # Agent Python (notre propre agent) + "python.exe", # Idem + "shellexperiencehost.exe", + "startmenuexperiencehost.exe", + "applicationframehost.exe", + "systemsettings.exe", + "textinputhost.exe", + "runtimebroker.exe", +} + + +def _extract_required_apps_from_events(raw_events: list) -> Dict[str, Any]: + """Extraire les applications requises depuis les événements bruts d'une session. + + Analyse les window_focus_change pour identifier : + - L'application principale (la plus utilisée hors apps système) + - La première fenêtre ciblée (pour le setup initial) + + Args: + raw_events: Événements bruts depuis live_events.jsonl. + + Returns: + Dict avec les clés : + - primary_app: str (nom de l'exécutable principal, ex: "Notepad.exe") + - primary_launch_cmd: str (commande Win+R, ex: "notepad") + - first_window_title: str (titre de la première fenêtre applicative) + - apps: dict[str, int] (app_name -> nombre d'occurrences) + """ + app_counts: Dict[str, int] = defaultdict(int) + first_app = None + first_window_title = None + + for raw_evt in raw_events: + event_data = raw_evt.get("event", raw_evt) + evt_type = event_data.get("type", "") + + if evt_type == "window_focus_change": + to_info = event_data.get("to", {}) + if not to_info: + continue + app_name = to_info.get("app_name", "") + title = to_info.get("title", "") + if app_name: + app_counts[app_name] += 1 + if first_app is None and app_name.lower() not in _SETUP_IGNORE_APPS: + first_app = app_name + first_window_title = title + + # Aussi extraire depuis les mouse_click qui ont un champ window + elif evt_type == "mouse_click": + window = event_data.get("window", {}) + if isinstance(window, dict): + app_name = window.get("app_name", "") + if app_name: + app_counts[app_name] += 1 + + if not app_counts: + return {} + + # Déterminer l'application principale (la plus fréquente hors apps ignorées) + filtered_apps = { + k: v for k, v in app_counts.items() + if k.lower() not in _SETUP_IGNORE_APPS + } + if not filtered_apps: + return {} + + primary_app = max(filtered_apps, key=filtered_apps.get) + + # Résoudre la commande de lancement + primary_launch_cmd = _resolve_launch_command(primary_app) + + return { + "primary_app": primary_app, + "primary_launch_cmd": primary_launch_cmd, + "first_window_title": first_window_title or "", + "apps": dict(app_counts), + } + + +def _extract_required_apps_from_workflow(workflow) -> Dict[str, Any]: + """Extraire les applications requises depuis un workflow structuré. + + Analyse les nodes du workflow pour identifier les titres de fenêtres + requis, puis infère l'application principale. + + Args: + workflow: Objet Workflow ou dict brut. + + Returns: + Même format que _extract_required_apps_from_events. + """ + # Accéder aux données (objet ou dict) + if hasattr(workflow, 'nodes'): + nodes = workflow.nodes + metadata = workflow.metadata if hasattr(workflow, 'metadata') else {} + elif isinstance(workflow, dict): + nodes = workflow.get('nodes', []) + metadata = workflow.get('metadata', {}) + else: + return {} + + if not nodes: + return {} + + # Collecter les titres de fenêtres depuis les nodes + window_titles = [] + for node in nodes: + template = node.template if hasattr(node, 'template') else node.get('template', {}) + if isinstance(template, dict): + window = template.get('window', {}) + elif hasattr(template, 'window'): + window = template.window if hasattr(template.window, '__dict__') else {} + else: + window = {} + + if isinstance(window, dict): + title = window.get('title_pattern', '') or window.get('title_contains', '') + elif hasattr(window, 'title_pattern'): + title = getattr(window, 'title_pattern', '') or '' + else: + title = '' + + if title: + window_titles.append(title) + + # Inférer l'app principale depuis les titres de fenêtres + primary_app, primary_launch_cmd, matched_title = _infer_app_from_window_titles(window_titles) + # Utiliser le titre qui a matché l'app (pas le premier node qui peut être "Rechercher") + first_title = matched_title or (window_titles[0] if window_titles else "") + + if not primary_app: + return {} + + source_session_id = metadata.get("source_session_id", "") if isinstance(metadata, dict) else "" + machine_id = metadata.get("machine_id", "") if isinstance(metadata, dict) else "" + + return { + "primary_app": primary_app, + "primary_launch_cmd": primary_launch_cmd, + "first_window_title": first_title, + "apps": {}, + "source_session_id": source_session_id, + "machine_id": machine_id, + } + + +def _resolve_launch_command(app_name: str) -> str: + """Résoudre la commande Win+R pour lancer une application. + + Si l'app n'est pas dans le mapping, utilise le nom de l'exécutable + directement sans l'extension .exe (fonctionne pour la plupart des apps). + """ + app_lower = app_name.lower() + if app_lower in _APP_LAUNCH_COMMANDS: + return _APP_LAUNCH_COMMANDS[app_lower] + # Fallback : utiliser le nom sans l'extension .exe + if app_lower.endswith(".exe"): + return app_name[:-4] + return app_name + + +def _infer_app_from_window_titles(titles: list) -> tuple: + """Inférer le nom de l'application et la commande de lancement depuis des titres de fenêtres. + + Utilise des heuristiques basées sur les patterns de titres Windows courants. + + Returns: + Tuple (app_name, launch_command, matched_title). + ("", "", "") si non identifié. + """ + _TITLE_APP_PATTERNS = [ + ("bloc-notes", "Notepad.exe", "notepad"), + ("notepad", "Notepad.exe", "notepad"), + ("word", "winword.exe", "winword"), + ("excel", "excel.exe", "excel"), + ("powerpoint", "powerpnt.exe", "powerpnt"), + ("outlook", "outlook.exe", "outlook"), + ("paint", "mspaint.exe", "mspaint"), + ("calculatrice", "calc.exe", "calc"), + ("calculator", "calc.exe", "calc"), + ("explorateur de fichiers", "explorer.exe", "explorer"), + ("file explorer", "explorer.exe", "explorer"), + ("invite de commandes", "cmd.exe", "cmd"), + ("command prompt", "cmd.exe", "cmd"), + ("powershell", "powershell.exe", "powershell"), + ("visual studio code", "code.exe", "code"), + ("edge", "msedge.exe", "msedge"), + ("chrome", "chrome.exe", "chrome"), + ("firefox", "firefox.exe", "firefox"), + ] + + for title in titles: + title_lower = title.lower() + for pattern, app_name, launch_cmd in _TITLE_APP_PATTERNS: + if pattern in title_lower: + # Ignorer les apps système (explorer, etc.) + if app_name.lower() in _SETUP_IGNORE_APPS: + continue + return (app_name, launch_cmd, title) + + return ("", "", "") + + +def _get_visual_search_info(app_name: str) -> Dict[str, str]: + """Obtenir les informations de recherche visuelle pour une application. + + Consulte _APP_VISUAL_SEARCH, sinon construit un fallback à partir du nom + de l'exécutable (ex: "MonApp.exe" → search_text="MonApp"). + + Args: + app_name: Nom de l'exécutable (ex: "Notepad.exe"). + + Returns: + Dict avec search_text, display_name, vlm_description. + """ + app_lower = app_name.lower() + if app_lower in _APP_VISUAL_SEARCH: + return dict(_APP_VISUAL_SEARCH[app_lower]) + + # Fallback : utiliser le nom sans .exe + base_name = app_name[:-4] if app_lower.endswith(".exe") else app_name + return { + "search_text": base_name, + "display_name": base_name, + "vlm_description": f"L'application {base_name} dans les résultats de recherche", + } + + +def _generate_setup_actions( + app_info: Dict[str, Any], + setup_id_prefix: str = "setup", +) -> List[Dict[str, Any]]: + """Générer les actions 100% visuelles pour ouvrir l'application avant le replay. + + Approche entièrement visuelle — JAMAIS de raccourcis clavier (Win, Win+R, + Ctrl+X, etc.) qui n'ont pas été enregistrés par l'utilisateur. Tout passe + par des clics visuels résolus par le VLM (Qwen2.5-VL). + + La séquence est : + 1. Clic visuel sur le bouton Démarrer (coin bas-gauche de l'écran) + 2. Attendre que le menu Démarrer s'ouvre (1s) + 3. Clic visuel sur la barre de recherche du menu Démarrer + 4. Attendre que la barre de recherche soit active (500ms) + 5. Taper le nom de l'application (texte français, ex: "Bloc-notes") + 6. Attendre les résultats de recherche (1.2s) + 7. Clic visuel sur le résultat de l'application trouvée + 8. Attendre que l'application s'ouvre (2-3s selon le poids) + 9. verify_screen : vérifier que la fenêtre attendue est apparue + + Args: + app_info: Dict retourné par _extract_required_apps_from_events ou + _extract_required_apps_from_workflow. + setup_id_prefix: Préfixe pour les action_id générés. + + Returns: + Liste d'actions normalisées, prêtes à injecter dans la queue. + Liste vide si aucune préparation n'est nécessaire. + """ + if not app_info: + return [] + + launch_cmd = app_info.get("primary_launch_cmd", "") + primary_app = app_info.get("primary_app", "") + first_title = app_info.get("first_window_title", "") + + if not launch_cmd: + logger.debug( + "setup_actions : pas de commande de lancement pour '%s', skip", + primary_app, + ) + return [] + + # Ne pas lancer les apps système (toujours présentes) + if primary_app.lower() in _SETUP_IGNORE_APPS: + logger.debug("setup_actions : app '%s' ignorée (système)", primary_app) + return [] + + # Obtenir les informations de recherche visuelle pour cette app + visual_info = _get_visual_search_info(primary_app) + search_text = visual_info["search_text"] + display_name = visual_info["display_name"] + vlm_description = visual_info["vlm_description"] + + actions = [] + + logger.info( + "Génération setup env 100%% visuel : lancement de '%s' via clic " + "Démarrer → recherche visuelle '%s' (fenêtre attendue : '%s')", + primary_app, search_text, first_title, + ) + + # 1. Clic visuel sur le bouton Démarrer (toujours visible, bas-gauche) + # Le VLM résout la position exacte ; x_pct/y_pct sont des fallbacks. + actions.append({ + "action_id": f"act_{setup_id_prefix}_click_start", + "type": "click", + "x_pct": 0.02, + "y_pct": 0.98, + "button": "left", + "visual_mode": True, + "target_spec": { + "by_text": "Démarrer", + "by_role": "start_button", + "vlm_description": ( + "Le bouton Démarrer de Windows (icône Windows), " + "en bas à gauche de la barre des tâches" + ), + }, + "_setup_phase": True, + "_setup_step": "click_start_menu", + }) + + # 2. Attendre que le menu Démarrer s'ouvre + actions.append({ + "action_id": f"act_{setup_id_prefix}_wait_start", + "type": "wait", + "duration_ms": 1000, + "_setup_phase": True, + "_setup_step": "wait_start_menu", + }) + + # 3. Clic visuel sur la barre de recherche du menu Démarrer + # Sur Windows 10/11, la barre de recherche est intégrée au menu Démarrer + # ou visible dans la barre des tâches. Le VLM la trouve visuellement. + actions.append({ + "action_id": f"act_{setup_id_prefix}_click_search", + "type": "click", + "x_pct": 0.20, + "y_pct": 0.92, + "button": "left", + "visual_mode": True, + "target_spec": { + "by_text": "Rechercher", + "by_role": "search_box", + "vlm_description": ( + "La barre ou le champ de recherche dans le menu Démarrer " + "de Windows, souvent intitulé 'Tapez ici pour rechercher' " + "ou 'Rechercher'" + ), + }, + "_setup_phase": True, + "_setup_step": "click_search_box", + }) + + # 4. Attendre que la barre de recherche soit active et prête + actions.append({ + "action_id": f"act_{setup_id_prefix}_wait_search_ready", + "type": "wait", + "duration_ms": 500, + "_setup_phase": True, + "_setup_step": "wait_search_ready", + }) + + # 5. Taper le nom visuel de l'application (texte français) + # Le champ de recherche a été cliqué visuellement à l'étape 3, + # donc le type s'exécute dans le champ actif. + actions.append({ + "action_id": f"act_{setup_id_prefix}_type_search", + "type": "type", + "text": search_text, + "_setup_phase": True, + "_setup_step": "type_app_name", + }) + + # 6. Attendre que la recherche Windows trouve l'application + actions.append({ + "action_id": f"act_{setup_id_prefix}_wait_results", + "type": "wait", + "duration_ms": 1200, + "_setup_phase": True, + "_setup_step": "wait_search_results", + }) + + # 7. Clic visuel sur le résultat de l'application dans la liste + # Le VLM identifie l'icône/texte de l'app dans les résultats. + actions.append({ + "action_id": f"act_{setup_id_prefix}_click_result", + "type": "click", + "x_pct": 0.20, + "y_pct": 0.50, + "button": "left", + "visual_mode": True, + "target_spec": { + "by_text": display_name, + "by_role": "app_icon", + "vlm_description": vlm_description, + }, + "_setup_phase": True, + "_setup_step": "click_app_result", + }) + + # 8. Attendre que l'application s'ouvre + # Durée variable : 3s pour les apps lourdes (Office, VS Code), 2s sinon + heavy_apps = {"winword.exe", "excel.exe", "powerpnt.exe", "outlook.exe", "code.exe"} + wait_ms = 3000 if primary_app.lower() in heavy_apps else 2000 + actions.append({ + "action_id": f"act_{setup_id_prefix}_wait_launch", + "type": "wait", + "duration_ms": wait_ms, + "_setup_phase": True, + "_setup_step": "wait_app_launch", + }) + + # 9. Vérification visuelle que la fenêtre attendue est apparue + if first_title: + actions.append({ + "action_id": f"act_{setup_id_prefix}_verify", + "type": "verify_screen", + "expected_node": "setup_initial", + "timeout_ms": 5000, + "_setup_phase": True, + "_setup_step": "verify_app_ready", + "_expected_title": first_title, + }) + + logger.info( + "Setup env visuel généré : %d actions pour lancer '%s' " + "(recherche visuelle : '%s')", + len(actions), primary_app, search_text, + ) + + return actions + + def _validate_replay_action(action: dict) -> Optional[str]: """Valide une action de replay. Retourne un message d'erreur ou None si valide.""" action_type = action.get("type", "") @@ -273,6 +826,41 @@ processor = StreamProcessor(data_dir=str(LIVE_SESSIONS_DIR)) worker = StreamWorker(live_dir=str(LIVE_SESSIONS_DIR), processor=processor) +# ========================================================================= +# Flush garanti à l'arrêt — signal handler + atexit (ceinture et bretelles) +# ========================================================================= +# Le shutdown handler FastAPI (@app.on_event("shutdown")) fait déjà un flush, +# mais si le serveur est tué par SIGTERM (systemd) ou SIGINT (Ctrl+C) avant +# que uvicorn ait le temps de déclencher le shutdown propre, le flush n'a pas +# lieu. On ajoute donc un signal handler ET un atexit comme filets de sécurité. + +def _emergency_flush(signum=None, frame=None): + """Flush les sessions dirty sur disque avant exit. + + Appelé par SIGTERM/SIGINT ou atexit. Idempotent (flush() est thread-safe). + """ + sig_name = signal.Signals(signum).name if signum else "atexit" + logger.info(f"Flush d'urgence des sessions en cours ({sig_name})...") + try: + processor.session_manager.flush() + logger.info("Flush d'urgence terminé — données persistées.") + except Exception as e: + logger.error(f"Erreur pendant le flush d'urgence : {e}") + # Si c'est un signal, on laisse le handler par défaut terminer le process + if signum is not None: + # Remettre le handler par défaut et re-raise le signal + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + +# Enregistrer les handlers uniquement quand le module est exécuté comme serveur +# (pas lors d'un simple import depuis un autre process comme le retraitement batch) +def _register_shutdown_handlers(): + signal.signal(signal.SIGTERM, _emergency_flush) + signal.signal(signal.SIGINT, _emergency_flush) + atexit.register(processor.session_manager.flush) + logger.info("Handlers de shutdown enregistrés (SIGTERM, SIGINT, atexit)") + + def _enqueue_to_worker(session_id: str): """Ajoute un session_id à la queue du worker VLM (fichier sur disque). @@ -404,7 +992,7 @@ class ReplayResultReport(BaseModel): action_id: str success: bool error: Optional[str] = None - warning: Optional[str] = None # "no_screen_change", "popup_handled", etc. + warning: Optional[str] = None # "no_screen_change", "popup_handled", "visual_resolve_failed" screenshot: Optional[str] = None # Chemin ou base64 du screenshot post-action screenshot_after: Optional[str] = None # Chemin ou base64 du screenshot APRES l'action actual_position: Optional[Dict[str, float]] = None # {"x": px, "y": py} position réelle du clic @@ -412,6 +1000,9 @@ class ReplayResultReport(BaseModel): resolution_method: Optional[str] = None # som_text_match, som_vlm, vlm_quick_find, etc. resolution_score: Optional[float] = None resolution_elapsed_ms: Optional[float] = None + # Champs enrichis pour target_not_found (pause supervisée) + target_description: Optional[str] = None # Description humaine de la cible + target_spec: Optional[Dict[str, Any]] = None # Spec complete de la cible class ErrorCallbackConfig(BaseModel): @@ -497,7 +1088,7 @@ async def health_check(): def _check_gpu_ready(): """Vérifier que le GPU a assez de VRAM pour le pipeline. - Minimum 6 GB requis pour qwen3-vl:8b et les modèles CLIP/FAISS. + Minimum 6 GB requis pour le VLM (gemma4:e4b ~10 GB) et les modèles CLIP/FAISS. Loggue un avertissement si insuffisante, info sinon. """ try: @@ -512,7 +1103,7 @@ def _check_gpu_ready(): # nvidia-smi peut retourner plusieurs lignes (multi-GPU) — prendre la première free_mb_str = result.stdout.strip().split("\n")[0].strip() free_mb = int(free_mb_str) - if free_mb < 6000: # 6 GB minimum pour qwen3-vl:8b + if free_mb < 6000: # 6 GB minimum pour le VLM + CLIP logger.warning( f"VRAM insuffisante : {free_mb} MB libres (minimum 6000 MB). " f"Vérifier les process GPU avec nvidia-smi." @@ -542,6 +1133,15 @@ async def startup(): # Vérifier la VRAM GPU disponible au démarrage _check_gpu_ready() + # Résoudre et afficher le modèle VLM utilisé + # Enregistrer les handlers de shutdown (SIGTERM, SIGINT, atexit) + _register_shutdown_handlers() + + from core.detection.vlm_config import get_vlm_model + _vlm_model_name = get_vlm_model() + logger.info("VLM model: %s", _vlm_model_name) + print(f"\n VLM model: {_vlm_model_name}") + # Afficher le token API au démarrage pour que l'utilisateur puisse configurer l'agent _token_source = "env RPA_API_TOKEN" if os.environ.get("RPA_API_TOKEN") else "auto-généré" logger.info(f"API Token ({_token_source}): {API_TOKEN}") @@ -694,6 +1294,21 @@ async def stream_event(data: StreamEvent): # Traitement direct via StreamProcessor result = worker.process_event_direct(session_id, data.event) + + # ── Enrichissement SomEngine temps réel pour les mouse_click ── + # Après l'enregistrement de l'event, tenter l'enrichissement si le + # screenshot est déjà arrivé. Sinon, l'event est mis en attente et + # sera enrichi quand le screenshot arrivera (voir stream_image). + event = data.event + if event.get("type") == "mouse_click" and event.get("screenshot_id"): + session = processor.session_manager.get_session(session_id) + if session: + event_index = len(session.events) - 1 + submitted = _try_enrich_click_event( + session_id, event, event_index, machine_id, + ) + result["som_enrichment"] = "submitted" if submitted else "pending_screenshot" + return {"status": "event_synced", "session_id": session_id, **result} @@ -718,6 +1333,298 @@ _PRECHECK_SIMILARITY_THRESHOLD = 0.85 # ThreadPool pour l'analyse GPU (évite de bloquer le event loop async) _gpu_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="gpu_analysis") +# ========================================================================= +# Enrichissement SomEngine en temps réel +# Quand un mouse_click arrive avec un screenshot_id, on lance SomEngine +# pour identifier l'élément UI cliqué. Le résultat est stocké dans l'event +# de la session, prêt pour le replay sans retraitement VLM. +# ========================================================================= + +# ThreadPool dédié SomEngine (1 seul worker pour ne pas saturer le GPU) +_som_enrichment_executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="som_enrich", +) + +# Clics en attente d'enrichissement (le screenshot n'est pas encore arrivé) +# Clé : (session_id, screenshot_id) → dict avec les infos nécessaires +_pending_click_enrichments: Dict[tuple, Dict[str, Any]] = {} +_enrichment_lock = threading.Lock() + +# Screenshots d'action arrivés (pour matcher avec les events en attente) +# Clé : (session_id, screenshot_id) → chemin du fichier +_arrived_action_screenshots: Dict[tuple, str] = {} + + +def _get_session_dir(session_id: str, machine_id: str = "default") -> Path: + """Retrouver le répertoire d'une session live.""" + if machine_id and machine_id != "default": + return LIVE_SESSIONS_DIR / machine_id / session_id + return LIVE_SESSIONS_DIR / session_id + + +def _get_screen_resolution_for_session(session_id: str) -> tuple: + """Récupérer la résolution d'écran depuis la session en mémoire.""" + session = processor.session_manager.get_session(session_id) + if session and session.last_window_info: + res = session.last_window_info.get("screen_resolution", [1920, 1080]) + if isinstance(res, list) and len(res) == 2: + return (int(res[0]), int(res[1])) + return (1920, 1080) + + +def _submit_click_enrichment( + session_id: str, + event_data: dict, + screenshot_path: str, + event_index: int, + machine_id: str = "default", +) -> None: + """Soumettre l'enrichissement SomEngine d'un clic au thread pool dédié. + + Ne bloque pas le handler HTTP — le résultat sera stocké dans l'event + de la session quand SomEngine aura terminé (~1-2 secondes). + + Args: + session_id: Identifiant de la session. + event_data: Données de l'événement mouse_click (pos, window, etc.). + screenshot_path: Chemin vers le screenshot full (PNG). + event_index: Index de l'event dans la liste session.events. + machine_id: Identifiant machine. + """ + _som_enrichment_executor.submit( + _enrich_click_background, + session_id, event_data, screenshot_path, event_index, machine_id, + ) + + +def _enrich_click_background( + session_id: str, + event_data: dict, + screenshot_path: str, + event_index: int, + machine_id: str = "default", +) -> None: + """Enrichir un clic avec SomEngine en arrière-plan (thread séparé). + + Appelle enrich_click_from_screenshot() et stocke le résultat + directement dans l'event de la session (enrichment dict). + """ + try: + pos = event_data.get("pos", [0, 0]) + if not pos or len(pos) < 2: + return + + click_x, click_y = int(pos[0]), int(pos[1]) + screen_w, screen_h = _get_screen_resolution_for_session(session_id) + + # Extraire le titre de fenêtre + window = event_data.get("window", {}) + if isinstance(window, dict): + window_title = window.get("title", "") + else: + window_title = event_data.get("window_title", "") + + # Extraire vision_info si disponible (OCR côté agent) + vision_info = event_data.get("vision_info") + + # Déduire session_dir et screenshot_id pour le cache SomEngine + session_dir = _get_session_dir(session_id, machine_id) + screenshot_id = event_data.get("screenshot_id", "") + + logger.info( + "[SoM-RT] Enrichissement clic (%d,%d) pour %s/%s", + click_x, click_y, session_id, screenshot_id, + ) + + enrichment = enrich_click_from_screenshot( + screenshot_path=Path(screenshot_path), + click_x=click_x, + click_y=click_y, + screen_w=screen_w, + screen_h=screen_h, + window_title=window_title, + vision_info=vision_info, + session_dir=session_dir, + screenshot_id=screenshot_id, + ) + + if not enrichment: + logger.debug( + "[SoM-RT] Enrichissement vide pour %s/%s (screenshot illisible ?)", + session_id, screenshot_id, + ) + return + + # Stocker le résultat dans l'event de la session + session = processor.session_manager.get_session(session_id) + if session and 0 <= event_index < len(session.events): + session.events[event_index]["enrichment"] = enrichment + # Forcer la persistance pour sauvegarder l'enrichissement + processor.session_manager._maybe_persist(session_id) + logger.info( + "[SoM-RT] Clic enrichi : %s/%s → by_text='%s', by_role='%s', som=%s", + session_id, screenshot_id, + enrichment.get("by_text", ""), + enrichment.get("by_role", ""), + bool(enrichment.get("som_element")), + ) + else: + logger.warning( + "[SoM-RT] Session %s introuvable ou event_index %d invalide", + session_id, event_index, + ) + + except Exception as e: + logger.error( + "[SoM-RT] Erreur enrichissement clic %s : %s", + session_id, e, exc_info=True, + ) + + +def _try_enrich_click_event( + session_id: str, + event_data: dict, + event_index: int, + machine_id: str = "default", +) -> bool: + """Tenter l'enrichissement SomEngine d'un event mouse_click. + + Vérifie si le screenshot est déjà arrivé. Si oui, soumet l'enrichissement. + Si non, enregistre l'event dans la file d'attente. + + Returns: + True si l'enrichissement a été soumis, False si en attente du screenshot. + """ + screenshot_id = event_data.get("screenshot_id", "") + if not screenshot_id: + return False + + key = (session_id, screenshot_id) + + with _enrichment_lock: + # Le screenshot est-il déjà arrivé ? + screenshot_path = _arrived_action_screenshots.get(key) + if screenshot_path: + # Screenshot disponible → soumettre immédiatement + _submit_click_enrichment( + session_id, event_data, screenshot_path, event_index, machine_id, + ) + # Nettoyer : plus besoin de garder le screenshot en mémoire + _arrived_action_screenshots.pop(key, None) + return True + else: + # Screenshot pas encore arrivé → mettre en attente + _pending_click_enrichments[key] = { + "event_data": event_data, + "event_index": event_index, + "machine_id": machine_id, + } + logger.debug( + "[SoM-RT] Clic en attente du screenshot %s/%s", + session_id, screenshot_id, + ) + return False + + +def _on_action_screenshot_arrived( + session_id: str, + shot_id: str, + file_path: str, + machine_id: str = "default", +) -> bool: + """Appelé quand un screenshot d'action (shot_XXXX_full) arrive. + + Vérifie s'il y a un clic en attente d'enrichissement pour ce screenshot. + Si oui, soumet l'enrichissement au thread pool. + + Args: + session_id: Identifiant de la session. + shot_id: Identifiant du screenshot (ex: "shot_0003_full"). + file_path: Chemin complet vers le fichier PNG. + machine_id: Identifiant machine. + + Returns: + True si un enrichissement a été soumis, False sinon. + """ + # Extraire le screenshot_id depuis le shot_id : "shot_0003_full" → "shot_0003" + screenshot_id = shot_id.replace("_full", "") + key = (session_id, screenshot_id) + + with _enrichment_lock: + # Y a-t-il un clic en attente pour ce screenshot ? + pending = _pending_click_enrichments.pop(key, None) + if pending: + # Clic trouvé → soumettre l'enrichissement + _submit_click_enrichment( + session_id, + pending["event_data"], + file_path, + pending["event_index"], + pending.get("machine_id", machine_id), + ) + return True + else: + # Pas de clic en attente → enregistrer le screenshot pour plus tard + _arrived_action_screenshots[key] = file_path + # Nettoyage : limiter la taille du cache (les vieux screenshots + # dont l'event n'arrivera jamais) + if len(_arrived_action_screenshots) > 200: + # Supprimer les plus anciennes entrées (FIFO via insertion order) + oldest = next(iter(_arrived_action_screenshots)) + _arrived_action_screenshots.pop(oldest, None) + return False + + +def _merge_enrichments_into_raw_events( + raw_events: List[Dict[str, Any]], + session_events: List[Dict[str, Any]], +) -> int: + """Fusionner les enrichissements SomEngine temps réel dans les events JSONL. + + Les events JSONL (raw_events) sont écrits AVANT l'enrichissement SomEngine. + Les events en mémoire (session_events) contiennent l'enrichissement dans + le champ "enrichment". On les fusionne par correspondance screenshot_id. + + Args: + raw_events: Events chargés depuis live_events.jsonl (structure + {"session_id": ..., "event": {...}} ou directement {...}). + session_events: Events en mémoire depuis LiveSessionState.events + (contiennent potentiellement le champ "enrichment"). + + Returns: + Nombre d'enrichissements fusionnés. + """ + # Construire un index screenshot_id → enrichment depuis les events mémoire + enrichment_by_shot: Dict[str, dict] = {} + for evt in session_events: + enr = evt.get("enrichment") + shot_id = evt.get("screenshot_id", "") + if enr and shot_id: + enrichment_by_shot[shot_id] = enr + + if not enrichment_by_shot: + return 0 + + merged = 0 + for raw_evt in raw_events: + inner = raw_evt.get("event", raw_evt) + if inner.get("type") != "mouse_click": + continue + shot_id = inner.get("screenshot_id", "") + if not shot_id: + continue + enr = enrichment_by_shot.get(shot_id) + if enr and "enrichment" not in inner: + inner["enrichment"] = enr + merged += 1 + + if merged: + logger.info( + "[SoM-RT] %d enrichissement(s) temps réel fusionné(s) dans les events JSONL", + merged, + ) + return merged + def _image_hash(file_path: str) -> str: """Hash rapide d'une image pour détecter les doublons (~identiques). @@ -790,6 +1697,12 @@ async def stream_image( logger.debug(f"Screenshot {shot_id} stocké sans analyse GPU") return {"status": "stored_no_analysis", "shot_id": shot_id} + # Enrichissement SomEngine temps réel (léger, ~1-2s en background) + # Lancé AVANT la déduplication VLM car c'est un traitement indépendant. + # Si un event mouse_click attend ce screenshot, on lance SomEngine en background. + # Sinon, on enregistre le screenshot pour le matcher quand l'event arrivera. + _on_action_screenshot_arrived(session_id, shot_id, file_path_str, machine_id) + # Déduplication par ID : ne pas réanalyser un screenshot déjà traité with _pending_lock: if shot_id in _analyzed_shots[session_id]: @@ -809,15 +1722,15 @@ async def stream_image( with _pending_lock: _analyzed_shots[session_id].add(shot_id) - # Screenshots full : STOCKAGE UNIQUEMENT (pas d'analyse VLM en temps réel) - # L'analyse VLM est faite par le worker séparé (run_worker.py) après - # finalisation de la session. Cela évite de bloquer le serveur HTTP - # (le GIL Python bloque tout quand le VLM tourne dans un thread). - # Le screenshot est déjà sauvegardé sur disque par le session_manager. + # Screenshots full : STOCKAGE UNIQUEMENT (pas d'analyse VLM lourde en temps réel) + # L'analyse VLM complète (ScreenAnalyzer + CLIP + FAISS) est faite par le + # worker séparé (run_worker.py) après finalisation de la session. logger.debug(f"Screenshot {shot_id} stocké (analyse VLM différée au worker)") + return {"status": "image_stored", "shot_id": shot_id} + def _process_screenshot_thread(session_id: str, shot_id: str, path: str): """Analyse GPU d'un screenshot dans un thread séparé (ne bloque pas FastAPI).""" try: @@ -867,6 +1780,15 @@ async def finalize(session_id: str, machine_id: str = "default"): processor.session_manager.finalize(session_id) logger.info(f"Session {session_id} finalisée, ajout à la queue du worker VLM") + # Nettoyer les structures d'enrichissement temps réel pour cette session + with _enrichment_lock: + keys_to_remove = [k for k in _pending_click_enrichments if k[0] == session_id] + for k in keys_to_remove: + del _pending_click_enrichments[k] + keys_to_remove = [k for k in _arrived_action_screenshots if k[0] == session_id] + for k in keys_to_remove: + del _arrived_action_screenshots[k] + # Écrire dans le fichier queue pour le worker VLM (process séparé) _enqueue_to_worker(session_id) @@ -1408,6 +2330,20 @@ async def start_replay(request: ReplayRequest): "Découpez le workflow en parties plus petites." ) + # ── Setup environnement — ouvrir les applications nécessaires ── + setup_actions = [] + app_info = _extract_required_apps_from_workflow(workflow) + if app_info: + setup_actions = _generate_setup_actions(app_info, setup_id_prefix="setup_wf") + if setup_actions: + actions = setup_actions + actions + logger.info( + "replay workflow %s : %d actions de setup injectées " + "(app=%s, cmd=%s)", + workflow_id, len(setup_actions), + app_info.get("primary_app"), app_info.get("primary_launch_cmd"), + ) + # Créer l'identifiant de replay replay_id = f"replay_{uuid.uuid4().hex[:8]}" @@ -1436,7 +2372,8 @@ async def start_replay(request: ReplayRequest): logger.info( f"Replay démarré : {replay_id} | workflow={workflow_id} | " f"session={session_id} | machine={resolved_machine_id} | " - f"{len(actions)} actions à exécuter (worker suspendu)" + f"{len(actions)} actions ({len(setup_actions)} setup + " + f"{len(actions) - len(setup_actions)} replay) (worker suspendu)" ) return { @@ -1446,6 +2383,8 @@ async def start_replay(request: ReplayRequest): "session_id": session_id, "machine_id": resolved_machine_id, "total_actions": len(actions), + "setup_actions": len(setup_actions), + "setup_app": app_info.get("primary_app", "") if app_info else "", } @@ -1622,6 +2561,14 @@ async def replay_from_session( detail=f"Session '{session_id}' : aucun événement trouvé dans live_events.jsonl" ) + # ── 2b. Fusionner les enrichissements temps réel depuis la session en mémoire ── + # Le JSONL ne contient pas les enrichissements SomEngine calculés pendant + # l'enregistrement (ils sont ajoutés en mémoire après écriture JSONL). + # On les injecte ici pour que build_replay_from_raw_events puisse les réutiliser. + session_mem = processor.session_manager.get_session(session_id) + if session_mem and session_mem.events: + _merge_enrichments_into_raw_events(raw_events, session_mem.events) + # ── 3. Construire le replay propre depuis les events bruts ── # Passer le répertoire de session pour activer le visual replay (crops de référence) session_dir = str(events_file.parent) @@ -1665,6 +2612,22 @@ async def replay_from_session( if _gesture_catalog and actions: actions = _gesture_catalog.optimize_replay_actions(actions) + # ── 3b. Setup environnement — ouvrir les applications nécessaires ── + # Analyser les événements bruts pour détecter quelles applications sont requises + # et injecter des actions de setup en tête de la queue de replay. + setup_actions = [] + app_info = _extract_required_apps_from_events(raw_events) + if app_info: + setup_actions = _generate_setup_actions(app_info, setup_id_prefix="setup_sess") + if setup_actions: + actions = setup_actions + actions + logger.info( + "replay-session %s : %d actions de setup injectées avant le replay " + "(app=%s, cmd=%s)", + session_id, len(setup_actions), + app_info.get("primary_app"), app_info.get("primary_launch_cmd"), + ) + # ── 4. Trouver la session de replay cible (Agent V1 actif) ── # L'agent actif peut avoir une session différente de la session source target_session_id = _find_active_agent_session(machine_id=machine_id) @@ -1700,8 +2663,10 @@ async def replay_from_session( _set_replay_lock(replay_id) logger.info( - "Replay session démarré : %s | source=%s | target=%s | machine=%s | %d actions (worker suspendu)", - replay_id, session_id, target_session_id, machine_id, len(actions), + "Replay session démarré : %s | source=%s | target=%s | machine=%s | " + "%d actions (%d setup + %d replay) (worker suspendu)", + replay_id, session_id, target_session_id, machine_id, + len(actions), len(setup_actions), len(actions) - len(setup_actions), ) return { @@ -1711,7 +2676,10 @@ async def replay_from_session( "target_session_id": target_session_id, "machine_id": machine_id, "total_actions": len(actions), + "setup_actions": len(setup_actions), + "replay_actions": len(actions) - len(setup_actions), "total_raw_events": len(raw_events), + "setup_app": app_info.get("primary_app", "") if app_info else "", "actions_preview": [ { k: ( @@ -1723,7 +2691,7 @@ async def replay_from_session( for k, v in a.items() if k != "action_id" } - for a in actions[:5] + for a in actions[:8] # Montrer plus d'actions pour inclure le setup ], } @@ -1999,6 +2967,24 @@ async def get_next_action(session_id: str, machine_id: str = "default"): autres queues de la MÊME machine (pas cross-machine). """ with _replay_lock: + # Verifier si le replay est en pause supervisee (target_not_found). + # Dans ce cas, NE PAS envoyer d'action — attendre l'intervention utilisateur. + for state in _replay_states.values(): + if (state["session_id"] == session_id + and state["status"] == "paused_need_help"): + logger.debug( + f"Replay {state['replay_id']} en pause supervisee " + f"pour session {session_id} — pas d'action envoyee" + ) + return { + "action": None, + "session_id": session_id, + "machine_id": machine_id, + "replay_paused": True, + "pause_message": state.get("pause_message", "Replay en pause"), + "replay_id": state["replay_id"], + } + queue = _replay_queues.get(session_id, []) # Log seulement quand il y a des actions à distribuer if queue: @@ -2345,11 +3331,62 @@ async def report_action_result(report: ReplayResultReport): f"action sans effet visible, on continue" ) + elif not report.success and (report.error or "") == "target_not_found": + # Cible non trouvée visuellement — PAUSE supervisée, PAS d'erreur fatale. + # L'utilisateur doit intervenir (naviguer vers le bon ecran, fermer une popup, etc.) + # On NE vide PAS la queue : les actions restantes seront reprises apres intervention. + target_desc = report.target_description or "élément inconnu" + replay_state["status"] = "paused_need_help" + replay_state["failed_action"] = { + "action_id": action_id, + "type": (original_action or {}).get("type", "unknown"), + "target_description": target_desc, + "screenshot_b64": screenshot_after or report.screenshot, + "target_spec": report.target_spec, + } + replay_state["pause_message"] = f"Je ne vois pas '{target_desc}' à l'écran" + error_entry = { + "action_id": action_id, + "error": f"target_not_found: {target_desc}", + "retry_count": 0, + "timestamp": time.time(), + } + replay_state["error_log"].append(error_entry) + logger.warning( + f"Replay PAUSE supervisée : cible '{target_desc}' non trouvée " + f"pour {action_id} — en attente d'intervention utilisateur" + ) + # Logger l'echec pour l'apprentissage futur + log_replay_failure( + replay_id=replay_state["replay_id"], + action_id=action_id, + target_spec=report.target_spec, + screenshot_b64=screenshot_after or report.screenshot, + resolution_attempts=[ + r for r in replay_state["results"] + if r.get("action_id") == action_id and r.get("resolution_method") + ], + error="target_not_found", + extra={ + "target_description": target_desc, + "actions_completed": replay_state["completed_actions"], + "actions_remaining": len(_replay_queues.get(session_id, [])), + }, + ) + elif not report.success and "visual resolve" in (report.error or "").lower(): - # Visual resolve échoué — STOP immédiat, pas de retry. - # L'élément n'est pas à l'écran, retrier ne changera rien. - replay_state["failed_actions"] += 1 - replay_state["status"] = "error" + # Visual resolve échoué (ancien format d'erreur) — PAUSE supervisée aussi. + # Compatibilité avec les agents qui n'envoient pas encore "target_not_found". + target_desc = report.target_description or (report.error or "Visual resolve échoué") + replay_state["status"] = "paused_need_help" + replay_state["failed_action"] = { + "action_id": action_id, + "type": (original_action or {}).get("type", "unknown"), + "target_description": target_desc, + "screenshot_b64": screenshot_after or report.screenshot, + "target_spec": report.target_spec, + } + replay_state["pause_message"] = f"Je ne vois pas '{target_desc}' à l'écran" error_entry = { "action_id": action_id, "error": report.error or "Visual resolve échoué", @@ -2357,12 +3394,18 @@ async def report_action_result(report: ReplayResultReport): "timestamp": time.time(), } replay_state["error_log"].append(error_entry) - with _replay_lock: - _replay_queues[session_id] = [] - logger.error( - f"Replay STOPPÉ : visual resolve échoué pour {action_id} — " + logger.warning( + f"Replay PAUSE supervisée (compat) : visual resolve échoué pour {action_id} — " f"{report.error}" ) + # Logger l'echec pour l'apprentissage futur + log_replay_failure( + replay_id=replay_state["replay_id"], + action_id=action_id, + target_spec=report.target_spec, + screenshot_b64=screenshot_after or report.screenshot, + error="visual_resolve_failed", + ) elif not report.success and retry_count < MAX_RETRIES_PER_ACTION: # Échec réel (pas juste screen inchangé ou visual) — retry @@ -2475,6 +3518,9 @@ def _create_replay_state( "error_log": [], # Liste des erreurs rencontrées "last_screenshot": None, # Path du dernier screenshot reçu "_last_screenshot_before": None, # Interne: screenshot avant la dernière action + # Champs pour pause supervisée (target_not_found) + "failed_action": None, # Contexte de l'action en echec (quand paused_need_help) + "pause_message": None, # Message a afficher a l'utilisateur } @@ -2616,7 +3662,12 @@ async def register_error_callback(config: ErrorCallbackConfig): @app.get("/api/v1/traces/stream/replay/{replay_id}") async def get_replay_status(replay_id: str): - """Consulter l'état d'un replay en cours ou terminé.""" + """Consulter l'etat d'un replay en cours ou termine. + + Quand le replay est en pause supervisee (paused_need_help), la reponse + inclut le contexte complet de l'echec : action echouee, screenshot, + target_spec, et message utilisateur. + """ with _replay_lock: state = _replay_states.get(replay_id) @@ -2625,8 +3676,19 @@ async def get_replay_status(replay_id: str): status_code=404, detail=f"Replay '{replay_id}' non trouvé" ) - # Filtrer les champs internes (préfixés par _) - return {k: v for k, v in state.items() if not k.startswith("_")} + # Filtrer les champs internes (prefixes par _) + result = {k: v for k, v in state.items() if not k.startswith("_")} + + # Enrichir avec le contexte de pause si applicable + if state["status"] == "paused_need_help": + session_id = state["session_id"] + remaining = len(_replay_queues.get(session_id, [])) + result["actions_completed"] = state["completed_actions"] + result["actions_remaining"] = remaining + result["message"] = state.get("pause_message", "Replay en pause") + # Le failed_action contient deja screenshot_b64 et target_spec + + return result @app.get("/api/v1/traces/stream/replays") @@ -2642,6 +3704,84 @@ async def list_replays(): } +@app.post("/api/v1/traces/stream/replay/{replay_id}/resume") +async def resume_replay(replay_id: str): + """Reprendre un replay en pause supervisee (paused_need_help). + + L'utilisateur a intervenu manuellement (naviguer vers le bon ecran, + fermer une popup, etc.) et veut relancer le replay. L'action echouee + est reinjectee en tete de queue pour etre re-tentee. + + Si le replay n'est pas en pause, retourne une erreur 409 (conflit). + """ + with _replay_lock: + state = _replay_states.get(replay_id) + + if not state: + raise HTTPException( + status_code=404, detail=f"Replay '{replay_id}' non trouvé" + ) + + if state["status"] != "paused_need_help": + raise HTTPException( + status_code=409, + detail=( + f"Replay '{replay_id}' n'est pas en pause " + f"(status actuel: {state['status']})" + ), + ) + + # Recuperer l'action echouee pour la reinjecter + failed_action = state.get("failed_action") + session_id = state["session_id"] + + # Remettre le replay en mode running + state["status"] = "running" + state["failed_action"] = None + state["pause_message"] = None + + # Reinjecter l'action echouee en tete de queue (sera re-tentee) + if failed_action and failed_action.get("action_id"): + # Reconstruire l'action a partir du retry_pending ou de l'original + original_action_id = failed_action["action_id"] + # Chercher l'action originale dans les retry_pending + original = _retry_pending.pop(original_action_id, {}).get("action") + if not original: + # Reconstruire un minimum depuis le failed_action context + original = { + "action_id": original_action_id, + "type": failed_action.get("type", "click"), + "target_spec": failed_action.get("target_spec"), + "visual_mode": True, + } + # Creer un nouvel action_id pour le tracking + resume_id = f"{original_action_id}_resume" + resume_action = dict(original) + resume_action["action_id"] = resume_id + # Stocker dans retry_pending pour le suivi + _retry_pending[resume_id] = { + "action": original, + "retry_count": 0, + "replay_id": replay_id, + "reason": "resume_after_pause", + } + queue = _replay_queues.get(session_id, []) + _replay_queues[session_id] = [resume_action] + queue + + remaining = len(_replay_queues.get(session_id, [])) + logger.info( + f"Replay {replay_id} repris apres pause supervisee — " + f"{remaining} actions en attente" + ) + + return { + "status": "resumed", + "replay_id": replay_id, + "session_id": session_id, + "remaining_actions": remaining, + } + + # ========================================================================= # Visual Replay — Résolution visuelle des cibles # ========================================================================= @@ -3158,6 +4298,7 @@ def _get_vlm_client(): Initialisation paresseuse : le client n'est créé qu'au premier appel, pas au démarrage du serveur (évite de bloquer si Ollama est down). + Le modèle est résolu automatiquement via vlm_config (RPA_VLM_MODEL). """ global _vlm_client if _vlm_client is not None: @@ -3167,12 +4308,14 @@ def _get_vlm_client(): return _vlm_client try: from core.detection.ollama_client import OllamaClient + from core.detection.vlm_config import get_vlm_model + _model = get_vlm_model() _vlm_client = OllamaClient( endpoint="http://localhost:11434", - model="qwen3-vl:8b", + model=_model, timeout=_VLM_QUICK_FIND_TIMEOUT, ) - logger.info("VLM Quick Find : client Ollama initialisé (qwen3-vl:8b)") + logger.info("VLM Quick Find : client Ollama initialisé (%s)", _model) except Exception as e: logger.warning(f"VLM Quick Find : impossible d'initialiser le client Ollama : {e}") return None @@ -3367,7 +4510,7 @@ def _vlm_quick_find( # --------------------------------------------------------------------------- -# Résolution par VLM Grounding Direct (Qwen2.5-VL) +# Résolution par VLM Grounding Direct (configurable via RPA_VLM_MODEL) # --------------------------------------------------------------------------- @@ -3377,11 +4520,12 @@ def _resolve_by_grounding( screen_width: int, screen_height: int, ) -> Optional[Dict[str, Any]]: - """Résoudre une cible via grounding VLM direct (Qwen2.5-VL). + """Résoudre une cible via grounding VLM direct. - Le VLM reçoit le screenshot + une description textuelle et retourne - directement les coordonnées (bbox_2d) de l'élément. Pas de SomEngine, - pas de numérotation — le VLM est entraîné pour le grounding UI. + Le modèle VLM (gemma4:e4b par défaut, configurable via RPA_VLM_MODEL) + reçoit le screenshot + une description textuelle et retourne + directement les coordonnées de l'élément. Pas de SomEngine, + pas de numérotation — le VLM fait du grounding UI natif. Approche plus fiable que SomEngine+VLM pour les icônes et éléments visuels sans texte (logo Windows, disquette, bouton fermer). @@ -3421,14 +4565,19 @@ def _resolve_by_grounding( logger.warning("Grounding : erreur redimensionnement — %s", e) return None - # Construire le prompt — Qwen2.5-VL retourne naturellement des bbox_2d + # Construire le prompt — format JSON universel (fonctionne avec gemma4, qwen2.5vl, qwen3) prompt = ( f"Look at this screenshot. Find: {description}\n" "Where is it? Give the center position as percentage of the image.\n" 'Answer ONLY with JSON: {"x": 0.XX, "y": 0.YY}' ) - # Appel VLM — vLLM (GPU, rapide) en priorité, Ollama (CPU) en fallback + # Le grounding nécessite un modèle entraîné pour les coordonnées (bbox_2d). + # Qwen2.5-VL est le seul qui retourne des positions précises. + # gemma4 comprend les images mais ne sait pas localiser en coordonnées. + _grounding_model = os.environ.get("RPA_GROUNDING_MODEL", "qwen2.5vl:7b") + + # Appel VLM — vLLM (GPU, rapide) en priorité, Ollama en fallback import requests as _requests content = "" @@ -3461,17 +4610,18 @@ def _resolve_by_grounding( except Exception as e: logger.debug("vLLM non disponible (%s), fallback Ollama", e) - # Essai 2 : Ollama (CPU, plus lent) + # Essai 2 : Ollama (qwen2.5vl:7b pour le grounding) if not content: try: resp = _requests.post("http://localhost:11434/api/chat", json={ - "model": "qwen2.5vl:7b", + "model": _grounding_model, "messages": [ - {"role": "system", "content": "You locate UI elements on screenshots. Return coordinates."}, + {"role": "system", "content": "You locate UI elements on screenshots. Return coordinates as JSON."}, {"role": "user", "content": prompt, "images": [shot_b64]}, ], "stream": False, - "options": {"temperature": 0.1, "num_predict": 80}, + "think": False, + "options": {"temperature": 0.1, "num_predict": 200}, }, timeout=60) content = resp.json().get("message", {}).get("content", "") except Exception as e: @@ -3480,7 +4630,7 @@ def _resolve_by_grounding( elapsed = time.time() - t0 - # Parser la réponse — Qwen2.5-VL retourne soit bbox_2d en pixels, soit JSON % + # Parser la réponse — supporte bbox_2d en pixels, JSON %, arrays bruts x_pct, y_pct = None, None # Format 1 : bbox_2d en pixels [x, y] ou [x1, y1, x2, y2] @@ -3537,7 +4687,7 @@ def _resolve_by_grounding( 'Return position: {"x": NNN, "y": NNN} in pixels of Image 1.' ) resp2 = _requests.post("http://localhost:11434/api/chat", json={ - "model": "qwen2.5vl:7b", + "model": _grounding_model, "messages": [ {"role": "user", "content": prompt_mi, "images": [shot_b64, anchor_b64]}, ], @@ -3580,8 +4730,8 @@ def _resolve_by_grounding( return None logger.info( - "Grounding OK [qwen2.5vl] : '%s' → (%.4f, %.4f) en %.1fs", - description[:50], x_pct, y_pct, elapsed, + "Grounding OK [%s] : '%s' → (%.4f, %.4f) en %.1fs", + _grounding_model, description[:50], x_pct, y_pct, elapsed, ) return {