#!/usr/bin/env python3 """ Session Cleaner -- Outil leger de nettoyage de sessions avant replay. Petit serveur Flask standalone qui permet de : - Lister les sessions enregistrees recentes - Visualiser chaque session avec ses screenshots (crop + full) - Marquer les clics parasites a supprimer (auto-detection des toasts, clics droit, fenetres Lea/systray, derniers 3 evenements) - Re-construire un replay nettoye et l'injecter dans la queue Option A du rapport audit VWB. Port : 5006 """ import json import logging import os import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from flask import ( Flask, redirect, render_template_string, request, send_from_directory, url_for, ) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- STREAMING_SERVER = os.environ.get("RPA_STREAMING_SERVER", "http://localhost:5005") LIVE_SESSIONS_DIR = os.environ.get( "RPA_LIVE_SESSIONS_DIR", os.path.join(os.path.dirname(__file__), "..", "data", "training", "live_sessions"), ) PORT = int(os.environ.get("SESSION_CLEANER_PORT", "5006")) # Charger le token API depuis l'environnement ou .env.local API_TOKEN = os.environ.get("RPA_API_TOKEN", "") if not API_TOKEN: env_local = os.path.join(os.path.dirname(__file__), "..", ".env.local") if os.path.isfile(env_local): try: with open(env_local, encoding="utf-8") as f: for line in f: line = line.strip() if line.startswith("RPA_API_TOKEN="): API_TOKEN = line.split("=", 1)[1].strip().strip('"').strip("'") break except OSError: pass # --------------------------------------------------------------------------- # Import optionnel de build_replay_from_raw_events # --------------------------------------------------------------------------- _build_replay_fn = None try: from agent_v0.server_v1.stream_processor import build_replay_from_raw_events _build_replay_fn = build_replay_from_raw_events except ImportError: pass # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("session_cleaner") # --------------------------------------------------------------------------- # Application Flask # --------------------------------------------------------------------------- app = Flask(__name__) # --------------------------------------------------------------------------- # Utilitaires # --------------------------------------------------------------------------- # Fenetres considerees comme parasites _PARASITIC_WINDOW_PATTERNS = [ "program manager", "fenetre de depassement", "fenêtre de dépassement", "léa", "lea", "assistant", "activer windows", ] # Types d'evenements exploitables (affiches a l'utilisateur) _ACTIONABLE_TYPES = frozenset({"mouse_click", "text_input", "key_combo", "key_press", "type"}) def _resolve_sessions_dir() -> Path: """Resoudre le repertoire racine des live_sessions.""" return Path(LIVE_SESSIONS_DIR).resolve() def _discover_sessions(limit: int = 50) -> List[Dict[str, Any]]: """Decouvrir les sessions recentes. Parcourt deux niveaux : - //sess_* (format actuel) - /sess_* (ancien format, sessions au niveau racine) """ base = _resolve_sessions_dir() if not base.is_dir(): logger.warning("Repertoire live_sessions introuvable : %s", base) return [] sessions: List[Dict[str, Any]] = [] for item in base.iterdir(): if not item.is_dir(): continue # Sessions directement a la racine (ancien format) if item.name.startswith("sess_"): jsonl = item / "live_events.jsonl" if jsonl.is_file(): sessions.append(_build_session_info("(racine)", item.name, item, jsonl)) continue # Ignorer les dossiers systeme if item.name.startswith(".") or item.name in ("embeddings", "streaming_sessions", "workflows", "test_gpu"): continue # Sous-dossiers machine_id for sub in item.iterdir(): if sub.is_dir() and sub.name.startswith("sess_"): jsonl = sub / "live_events.jsonl" if jsonl.is_file(): sessions.append(_build_session_info(item.name, sub.name, sub, jsonl)) # Tri par date decroissante (mtime du JSONL) sessions.sort(key=lambda s: s["mtime"], reverse=True) return sessions[:limit] def _build_session_info(machine_id: str, session_id: str, session_dir: Path, jsonl_path: Path) -> Dict[str, Any]: """Construire les metadonnees d'une session.""" mtime = jsonl_path.stat().st_mtime event_count = 0 try: with open(jsonl_path, encoding="utf-8") as f: for line in f: if line.strip(): event_count += 1 except OSError: pass # Extraire la date depuis le nom de session (sess_YYYYMMDDTHHMMSS_...) date_str = "" try: parts = session_id.split("_") if len(parts) >= 2: raw = parts[1] # 20260410T222352 dt = datetime.strptime(raw, "%Y%m%dT%H%M%S") date_str = dt.strftime("%d/%m/%Y %H:%M:%S") except (ValueError, IndexError): date_str = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S") return { "machine_id": machine_id, "session_id": session_id, "session_dir": str(session_dir), "date_str": date_str, "event_count": event_count, "mtime": mtime, } def _load_events(session_dir: Path) -> List[Dict[str, Any]]: """Charger les evenements depuis live_events.jsonl.""" jsonl = session_dir / "live_events.jsonl" events: List[Dict[str, Any]] = [] if not jsonl.is_file(): return events try: with open(jsonl, encoding="utf-8") as f: for line in f: line = line.strip() if line: try: events.append(json.loads(line)) except json.JSONDecodeError: continue except OSError as e: logger.error("Erreur lecture %s : %s", jsonl, e) return events def _get_window_title(event: Dict[str, Any]) -> str: """Extraire le titre de fenetre d'un evenement. Les evenements plus recents stockent la fenetre dans event.window.title, les anciens dans event.active_window_title. """ inner = event.get("event", {}) # Format actuel : inner.window.title window = inner.get("window") or {} if isinstance(window, dict) and window.get("title"): return window["title"] # Ancien format return inner.get("active_window_title", "") def _get_shot_filename(click_index: int, session_dir: Path) -> Optional[str]: """Trouver le fichier screenshot pour un clic donne. Essaie dans l'ordre : 1. shot_XXXX_crop.png (ancien format) 2. shot_XXXX_full.png (ancien format) 3. res_shot_XXXX.png (format recent — resultat post-action) ``click_index`` est 1-based (premier clic = 1). """ shots_dir = session_dir / "shots" if not shots_dir.is_dir(): return None shot_id = f"shot_{click_index:04d}" # Priorite au crop (plus informatif en thumbnail) for pattern in [f"{shot_id}_crop.png", f"{shot_id}_full.png", f"res_{shot_id}.png"]: if (shots_dir / pattern).is_file(): return pattern return None def _is_parasitic(event: Dict[str, Any], index: int, total: int) -> bool: """Determiner si un evenement est probablement parasite. Criteres : - Fenetre contenant un pattern parasite (systray, Program Manager, Lea, etc.) - Clic droit - Types non-exploitables (heartbeat, focus_change, action_result) - Parmi les 3 derniers evenements (souvent = arret enregistrement) """ inner = event.get("event", {}) etype = inner.get("type", "") # Types toujours parasites if etype in ("heartbeat", "focus_change", "window_focus_change", "action_result", "screenshot", "status", "ping", "pong"): return True # Clics droit if etype == "mouse_click" and inner.get("button") == "right": return True # Fenetre parasite win_title = _get_window_title(event).lower() if win_title: for pattern in _PARASITIC_WINDOW_PATTERNS: if pattern in win_title: return True # Derniers 3 evenements exploitables de la session # (on les marque UNIQUEMENT si c'est un evenement exploitable, pas un heartbeat) if etype in _ACTIONABLE_TYPES and index >= total - 3: return True return False def _parse_actions(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict[str, Any]]: """Convertir les evenements bruts en liste d'actions affichables. Retourne une liste de dicts avec : index_global, type, position, fenetre, texte, touches, shot_file, is_parasitic, etc. """ actions: List[Dict[str, Any]] = [] click_count = 0 total_events = len(events) # Pre-calculer les 3 derniers indices d'evenements exploitables actionable_indices = [ i for i, ev in enumerate(events) if ev.get("event", {}).get("type", "") in _ACTIONABLE_TYPES ] last_3_actionable = set(actionable_indices[-3:]) if len(actionable_indices) >= 3 else set(actionable_indices) for i, event in enumerate(events): inner = event.get("event", {}) etype = inner.get("type", "") # Ne montrer que les evenements exploitables if etype not in _ACTIONABLE_TYPES: continue action: Dict[str, Any] = { "global_index": i, "type": etype, "position": "", "window_title": _get_window_title(event), "text": "", "keys": "", "shot_file": None, "is_parasitic": False, } # Position (pour les clics) pos = inner.get("pos") if pos and isinstance(pos, (list, tuple)) and len(pos) >= 2: action["position"] = f"({pos[0]}, {pos[1]})" # Bouton de clic if etype == "mouse_click": action["button"] = inner.get("button", "left") click_count += 1 action["shot_file"] = _get_shot_filename(click_count, session_dir) action["click_number"] = click_count # Texte tape if etype in ("text_input", "type"): action["text"] = inner.get("text", "") # Touches pour key_combo / key_press if etype in ("key_combo", "key_press"): keys = inner.get("keys", []) if isinstance(keys, list): action["keys"] = " + ".join(str(k) for k in keys) else: action["keys"] = str(inner.get("key", keys)) # Detection parasite # Utiliser les 3 derniers indices exploitables (pas les indices globaux) parasitic = False inner_type = etype # Clic droit if inner_type == "mouse_click" and inner.get("button") == "right": parasitic = True # Fenetre parasite win_lower = action["window_title"].lower() if win_lower: for pattern in _PARASITIC_WINDOW_PATTERNS: if pattern in win_lower: parasitic = True break # Derniers 3 evenements exploitables if i in last_3_actionable: parasitic = True action["is_parasitic"] = parasitic actions.append(action) return actions # --------------------------------------------------------------------------- # Templates HTML # --------------------------------------------------------------------------- _BASE_CSS = """ body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; color: #333; } h1 { color: #2c3e50; border-bottom: 2px solid #3498db; padding-bottom: 10px; } h2 { color: #34495e; } a { color: #2980b9; text-decoration: none; } a:hover { text-decoration: underline; } table { border-collapse: collapse; width: 100%; background: white; border-radius: 6px; overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,0.12); } th { background: #2c3e50; color: white; padding: 12px 15px; text-align: left; } td { padding: 10px 15px; border-bottom: 1px solid #eee; } tr:hover { background: #f0f7ff; } .btn { display: inline-block; padding: 10px 20px; background: #e74c3c; color: white; border: none; border-radius: 4px; cursor: pointer; font-size: 14px; } .btn:hover { background: #c0392b; } .btn-secondary { background: #3498db; } .btn-secondary:hover { background: #2980b9; } .info-box { background: #eaf4ff; border: 1px solid #b8d4f0; border-radius: 6px; padding: 15px; margin: 15px 0; } .warning-box { background: #fff3cd; border: 1px solid #ffc107; border-radius: 6px; padding: 15px; margin: 15px 0; } .success-box { background: #d4edda; border: 1px solid #28a745; border-radius: 6px; padding: 15px; margin: 15px 0; } .error-box { background: #f8d7da; border: 1px solid #dc3545; border-radius: 6px; padding: 15px; margin: 15px 0; } .parasitic { background: #ffe0e0; } .normal { background: #e0ffe0; } .counter { font-size: 18px; font-weight: bold; margin: 15px 0; } .counter .remove { color: #e74c3c; } .counter .total { color: #2c3e50; } img.thumb { max-height: 80px; border: 1px solid #ccc; border-radius: 4px; cursor: pointer; } img.thumb:hover { box-shadow: 0 2px 8px rgba(0,0,0,0.3); } .nav { margin-bottom: 20px; } .mono { font-family: 'Fira Code', 'Consolas', monospace; font-size: 13px; } label { cursor: pointer; } """ _INDEX_TEMPLATE = """ Session Cleaner -- Lea

Session Cleaner

Outil de nettoyage des sessions avant replay. Selectionnez une session pour voir ses actions.

{% if sessions %} {% for s in sessions %} {% endfor %}
Date Machine Session ID Evenements Action
{{ s.date_str }} {{ s.machine_id }} {{ s.session_id }} {{ s.event_count }} Voir
{% else %}

Aucune session trouvee dans {{ sessions_dir }}.

Lancez un enregistrement depuis l'Agent V1 pour creer des sessions.

{% endif %} """ _SESSION_TEMPLATE = """ Session {{ session_id }} -- Session Cleaner

Session : {{ session_id }}

Machine : {{ machine_id }} | Date : {{ date_str }} | Evenements bruts : {{ total_events }}
{% if actions %}
{{ parasitic_count }} actions a supprimer / {{ actions|length }} total
{% for a in actions %} {% endfor %}
Supprimer # Type Position Fenetre Texte / Touches Screenshot
{{ loop.index }} {{ a.type }} {% if a.button is defined and a.button == 'right' %} (droit) {% endif %} {{ a.position }} {{ a.window_title|truncate(40) }} {% if a.text %}{{ a.text|truncate(60) }}{% endif %} {% if a.keys %}{{ a.keys }}{% endif %} {% if a.shot_file %} Screenshot action {{ loop.index }} {% else %} -- {% endif %}
{% else %}
Aucune action exploitable dans cette session.
{% endif %}
Screenshot agrandi
""" _RESULT_TEMPLATE = """ Replay lance -- Session Cleaner

Replay lance

{% if success %}

Replay demarre avec succes.

Replay ID : {{ replay_id }}

Session : {{ session_id }}

Machine cible : {{ machine_id }}

Actions injectees : {{ action_count }}

Actions supprimees : {{ removed_count }}

{% else %}

Erreur lors du lancement du replay.

{{ error_message }}

{% endif %} """ # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.route("/") def index(): """Page d'accueil : liste des sessions recentes.""" sessions = _discover_sessions(limit=50) return render_template_string( _INDEX_TEMPLATE, sessions=sessions, sessions_dir=str(_resolve_sessions_dir()), css=_BASE_CSS, ) @app.route("/session//") def view_session(machine_id: str, session_id: str): """Vue detaillee d'une session avec ses actions.""" session_dir = _find_session_dir(machine_id, session_id) if session_dir is None: return render_template_string( """ Session introuvable

Session {{ sid }} introuvable pour la machine {{ mid }}.

Retour""", sid=session_id, mid=machine_id, css=_BASE_CSS, ), 404 events = _load_events(session_dir) actions = _parse_actions(events, session_dir) # Compter les parasites et collecter leurs indices globaux parasitic_count = sum(1 for a in actions if a["is_parasitic"]) parasitic_indices = [a["global_index"] for a in actions if a["is_parasitic"]] # Date depuis le nom de session date_str = "" try: parts = session_id.split("_") if len(parts) >= 2: dt = datetime.strptime(parts[1], "%Y%m%dT%H%M%S") date_str = dt.strftime("%d/%m/%Y %H:%M:%S") except (ValueError, IndexError): date_str = "?" return render_template_string( _SESSION_TEMPLATE, session_id=session_id, machine_id=machine_id, date_str=date_str, total_events=len(events), actions=actions, parasitic_count=parasitic_count, parasitic_indices=parasitic_indices, css=_BASE_CSS, ) @app.route("/shots///") def serve_shot(machine_id: str, session_id: str, filename: str): """Servir un fichier screenshot.""" session_dir = _find_session_dir(machine_id, session_id) if session_dir is None: return "Session introuvable", 404 shots_dir = session_dir / "shots" if not shots_dir.is_dir(): return "Repertoire shots introuvable", 404 # Securite : empecher la traversee de repertoire safe_name = Path(filename).name if safe_name != filename: return "Nom de fichier invalide", 400 target = shots_dir / safe_name if not target.is_file(): return "Fichier introuvable", 404 return send_from_directory(str(shots_dir), safe_name, mimetype="image/png") @app.route("/clean-and-replay", methods=["POST"]) def clean_and_replay(): """Nettoyer les evenements et lancer un replay.""" session_id = request.form.get("session_id", "") machine_id = request.form.get("machine_id", "") remove_indices_raw = request.form.getlist("remove_indices") # Convertir les indices en entiers remove_indices = set() for idx_str in remove_indices_raw: try: remove_indices.add(int(idx_str)) except ValueError: continue # Trouver le repertoire de session session_dir = _find_session_dir(machine_id, session_id) if session_dir is None: return render_template_string( _RESULT_TEMPLATE, success=False, error_message=f"Session {session_id} introuvable pour la machine {machine_id}.", replay_id="", session_id=session_id, machine_id=machine_id, action_count=0, removed_count=0, css=_BASE_CSS, ) # Charger les evenements et filtrer all_events = _load_events(session_dir) cleaned_events = [ ev for i, ev in enumerate(all_events) if i not in remove_indices ] removed_count = len(all_events) - len(cleaned_events) logger.info( "Nettoyage session %s : %d evenements -> %d (suppression de %d)", session_id, len(all_events), len(cleaned_events), removed_count, ) # Construire les actions de replay replay_actions = None error_message = "" # Pour le session_cleaner, on utilise TOUJOURS le fallback simple. # build_replay_from_raw_events transforme les events (réordonne, # injecte du setup "ouvrir l'app", fusionne des actions) — ce qui # décale les clics par rapport à l'enregistrement original. # Le fallback simple reproduit les events 1:1 en coords brutes, # ce qui est exactement ce qu'on veut pour "nettoyer et rejouer". if not replay_actions: # Fallback : filtrage simple et conversion directe. # Se declenche si build_replay_from_raw_events a crashe OU # retourne une liste vide OU n'est pas disponible. try: replay_actions = _simple_build_replay(cleaned_events, session_dir) logger.info("Fallback simple_build_replay a produit %d actions", len(replay_actions)) error_message = "" # le fallback a reussi, on efface l'erreur precedente except Exception as e: logger.error("Erreur fallback simple_build_replay : %s", e) error_message = f"Erreur lors de la construction du replay (fallback) : {e}" if not replay_actions: if not error_message: error_message = "Aucune action exploitable apres nettoyage." return render_template_string( _RESULT_TEMPLATE, success=False, error_message=error_message, replay_id="", session_id=session_id, machine_id=machine_id, action_count=0, removed_count=removed_count, css=_BASE_CSS, ) # Envoyer au streaming server replay_id = f"replay_clean_{uuid.uuid4().hex[:8]}" try: import requests as _requests headers = {"Content-Type": "application/json"} if API_TOKEN: headers["Authorization"] = f"Bearer {API_TOKEN}" payload = { "session_id": session_id, "actions": replay_actions, "machine_id": machine_id if machine_id != "(racine)" else "", "task_description": f"Replay nettoye de {session_id} ({removed_count} actions supprimees)", } resp = _requests.post( f"{STREAMING_SERVER}/api/v1/traces/stream/replay/raw", json=payload, headers=headers, timeout=30, ) if resp.status_code == 200: data = resp.json() replay_id = data.get("replay_id", replay_id) logger.info("Replay lance : %s (%d actions)", replay_id, len(replay_actions)) return render_template_string( _RESULT_TEMPLATE, success=True, replay_id=replay_id, session_id=session_id, machine_id=machine_id, action_count=len(replay_actions), removed_count=removed_count, error_message="", css=_BASE_CSS, ) else: error_message = f"Serveur streaming a repondu {resp.status_code} : {resp.text[:300]}" logger.error("Erreur POST replay : %s", error_message) except ImportError: error_message = ( "Module 'requests' non disponible. " "Installez-le avec : pip install requests" ) except Exception as e: error_message = f"Erreur de connexion au serveur streaming ({STREAMING_SERVER}) : {e}" logger.error("Erreur connexion streaming : %s", e) return render_template_string( _RESULT_TEMPLATE, success=False, error_message=error_message, replay_id="", session_id=session_id, machine_id=machine_id, action_count=0, removed_count=removed_count, css=_BASE_CSS, ) # --------------------------------------------------------------------------- # Helpers internes # --------------------------------------------------------------------------- def _find_session_dir(machine_id: str, session_id: str) -> Optional[Path]: """Trouver le repertoire d'une session. Cherche dans : 1. /// 2. // (ancien format, racine) """ base = _resolve_sessions_dir() # Sous machine_id if machine_id and machine_id != "(racine)": candidate = base / machine_id / session_id if candidate.is_dir(): return candidate # Directement a la racine candidate = base / session_id if candidate.is_dir(): return candidate # Recherche exhaustive (au cas ou le machine_id a change) for item in base.iterdir(): if item.is_dir() and not item.name.startswith("."): candidate = item / session_id if candidate.is_dir(): return candidate return None def _load_crop_as_base64(session_dir: Path, screenshot_id: str) -> str: """Charger un crop screenshot et le retourner en base64. Le crop (80x80 autour du clic) sert d'ancre pour le template matching — le GroundingEngine compare cette vignette a l'ecran actuel via OpenCV. """ if not screenshot_id: return "" crop_path = session_dir / "shots" / f"{screenshot_id}_crop.png" if not crop_path.is_file(): return "" try: import base64 data = crop_path.read_bytes() return base64.b64encode(data).decode("ascii") except Exception: return "" def _build_vlm_description( uia_snapshot: Dict[str, Any], window_info: Dict[str, Any], ) -> str: """Construire une description naturelle pour le VLM. Le VLM recoit cette phrase + le screenshot actuel et doit localiser l'element decrit. Plus la description est precise, meilleur le grounding. """ name = uia_snapshot.get("name", "") control_type = uia_snapshot.get("control_type", "") window_title = window_info.get("title", "") if window_info else "" parts = [] if control_type: parts.append(f"le {control_type}") if name: parts.append(f"'{name}'") if window_title and window_title != "unknown_window": parts.append(f"dans la fenetre '{window_title}'") if parts: return " ".join(parts) return "" def _build_full_target_spec( event: Dict[str, Any], session_dir: Path, ) -> Dict[str, Any]: """Construire un target_spec complet pour la cascade de resolution visuelle. Exploite TOUTES les donnees capturees pendant l'enregistrement : - uia_snapshot → resolution UIA locale (lea_uia.exe, 10-20ms) - crop screenshot → template matching OpenCV (~100ms) - nom UIA + window_title → OCR docTR + VLM grounding (1-5s) La cascade : UIA → template → serveur (docTR+VLM) → VLM local. Si tout echoue → pause supervisee (pas de clic aveugle). """ uia_snapshot = event.get("uia_snapshot", {}) window_info = event.get("window", {}) vision_info = event.get("vision_info", {}) screenshot_id = event.get("screenshot_id", "") name = uia_snapshot.get("name", "") if uia_snapshot else "" control_type = uia_snapshot.get("control_type", "") if uia_snapshot else "" automation_id = uia_snapshot.get("automation_id", "") if uia_snapshot else "" parent_path = uia_snapshot.get("parent_path", []) if uia_snapshot else [] window_title = window_info.get("title", "") if window_info else "" # Cascade de resolution — UIA d'abord (rapide), puis vision resolve_order = [] # UIA : disponible si on a un nom ou automation_id has_uia = bool(name or automation_id) if has_uia: resolve_order.append("uia") # Template matching : disponible si on a un crop anchor_b64 = _load_crop_as_base64(session_dir, screenshot_id) if anchor_b64: resolve_order.append("template") # Serveur (docTR OCR + SomEngine + VLM) : toujours en fallback resolve_order.append("server") # VLM local : dernier recours resolve_order.append("vlm_local") if not resolve_order: return {} target_spec: Dict[str, Any] = { "resolve_order": resolve_order, "window_title": window_title, } # UIA target if has_uia: target_spec["uia_target"] = { "name": name, "control_type": control_type, "automation_id": automation_id, "parent_path": parent_path, } # Anchor pour template matching if anchor_b64: target_spec["anchor_image_base64"] = anchor_b64 # Texte pour OCR (docTR sur le serveur) if name: target_spec["by_text"] = name # Description VLM vlm_desc = _build_vlm_description(uia_snapshot or {}, window_info or {}) if vlm_desc: target_spec["vlm_description"] = vlm_desc return target_spec def _build_desktop_cleanup_actions(screen_w: int, screen_h: int) -> List[Dict[str, Any]]: """Construire les actions de nettoyage bureau AVANT le replay. Sur Windows 11, un clic sur l'extreme droite de la barre des taches (le pixel invisible 'Afficher le bureau') minimise toutes les fenetres. C'est exactement ce qu'un humain ferait avant de commencer un travail : repartir d'un bureau propre. 100% visuel — pas de raccourci clavier injecte (cf feedback_100pct_visual). """ # Le bouton 'Afficher le bureau' est au pixel tout en bas a droite # de la taskbar. Sur Win11, c'est une fine bande cliquable. x_pct = round((screen_w - 2) / screen_w, 6) # avant-dernier pixel y_pct = round((screen_h - 2) / screen_h, 6) # idem vertical return [ { "action_id": f"act_setup_desktop_{uuid.uuid4().hex[:6]}", "type": "click", "x_pct": x_pct, "y_pct": y_pct, "button": "left", "visual_mode": False, # position fixe, pas besoin de grounding "wait_before": 0.3, "_setup_action": True, # marqueur pour le distinguer des vrais clics }, { "action_id": f"act_setup_wait_{uuid.uuid4().hex[:6]}", "type": "wait", "duration_ms": 1000, "wait_before": 0, "_setup_action": True, }, ] def _simple_build_replay(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict[str, Any]]: """Construire un replay visuel depuis les evenements bruts. Chaque clic est enrichi avec un target_spec complet qui alimente la cascade de resolution du GroundingEngine : UIA local (10ms) → template matching (100ms) → serveur docTR/VLM (2-5s) Les coordonnees x_pct/y_pct sont incluses comme hint de derniere chance. Lea ne clique pas en aveugle — elle VOIT l'ecran et CHERCHE l'element. Le replay commence par un nettoyage du bureau (clic 'Afficher le bureau') pour partir d'un etat propre — exactement comme un humain. """ actions: List[Dict[str, Any]] = [] click_count = 0 # Essayer d'extraire la resolution d'ecran screen_w, screen_h = 1920, 1080 for ev in events: inner = ev.get("event", {}) meta = inner.get("screen_metadata", {}) res = meta.get("screen_resolution") if res and isinstance(res, (list, tuple)) and len(res) >= 2: screen_w, screen_h = int(res[0]), int(res[1]) break # ── Étape 0 : nettoyer le bureau ── actions.extend(_build_desktop_cleanup_actions(screen_w, screen_h)) for ev in events: inner = ev.get("event", {}) etype = inner.get("type", "") if etype not in _ACTIONABLE_TYPES: continue action_id = f"act_clean_{uuid.uuid4().hex[:6]}" if etype == "mouse_click": pos = inner.get("pos", [0, 0]) click_count += 1 x_pct = round(pos[0] / screen_w, 6) if screen_w else 0.0 y_pct = round(pos[1] / screen_h, 6) if screen_h else 0.0 action = { "action_id": action_id, "type": "click", "x_pct": x_pct, "y_pct": y_pct, "button": inner.get("button", "left"), "wait_before": 0.5, } # Enrichir avec la cascade visuelle complete target_spec = _build_full_target_spec(inner, session_dir) if target_spec: action["visual_mode"] = True action["target_spec"] = target_spec uia_name = inner.get("uia_snapshot", {}).get("name", "?") methods = target_spec.get("resolve_order", []) logger.info( "Action %s enrichie [%s] : '%s' (%s)", action_id, "+".join(methods), uia_name, inner.get("uia_snapshot", {}).get("control_type", "?"), ) else: # Pas de donnee visuelle du tout → coords brutes en dernier recours action["visual_mode"] = False logger.warning("Action %s : aucune donnee visuelle, coords brutes", action_id) actions.append(action) elif etype in ("text_input", "type"): text = inner.get("text", "") if text: action = { "action_id": action_id, "type": "type", "text": text, "wait_before": 0.3, } actions.append(action) elif etype in ("key_combo", "key_press"): keys = inner.get("keys", []) if isinstance(keys, str): keys = [keys] key_single = inner.get("key", "") if not keys and key_single: keys = [key_single] if keys: action = { "action_id": action_id, "type": "key_combo", "keys": keys, "wait_before": 0.3, } actions.append(action) # ── Étape finale : détecter les blocs conditionnels (dialogues) ── # Quand le window_title change entre deux actions, les actions dans # la nouvelle fenêtre sont conditionnelles : elles ne s'exécutent que # si le dialogue apparaît effectivement au replay. # Ex: Ctrl+S → "Enregistrer sous" (conditionnel) → retour app actions = _mark_conditional_blocks(actions, events) return actions def _mark_conditional_blocks( actions: List[Dict[str, Any]], events: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: """Marquer les actions qui appartiennent a un dialogue conditionnel. Detecte les dialogues systeme transitoires (Enregistrer sous, Ouvrir, Confirmer, etc.) qui n'apparaissent que dans certains contextes. Au replay, si le dialogue n'est pas present → skip tout le bloc. Methode : un dialogue systeme est une fenetre qui : 1. N'a PAS de separateur " – " ou " - " (pas une app) 2. N'apparait que pour 1-3 actions consecutives 3. Est encadree par des actions dans une vraie app """ # Extraire le window_title de chaque evenement actionnable event_windows: List[str] = [] for ev in events: inner = ev.get("event", {}) etype = inner.get("type", "") if etype not in _ACTIONABLE_TYPES: continue win = inner.get("window", {}).get("title", "") event_windows.append(win) def _is_app_window(title): """True si le titre ressemble a une fenetre d'application (pas un dialogue).""" if not title or title == "unknown_window": return False # Les apps ont un separateur : "fichier.txt – Bloc-notes" return any(sep in title for sep in [" – ", " - ", " — "]) def _is_known_dialog(title): """True si le titre est un dialogue systeme connu.""" if not title: return False title_lower = title.lower().strip() dialog_patterns = ( "enregistrer sous", "save as", "ouvrir", "open", "imprimer", "print", "confirmer", "confirmation", "confirm", "voulez-vous", "do you want", "avertissement", "warning", "erreur", "error", "propriétés", "properties", ) return any(p in title_lower for p in dialog_patterns) # Parcourir les actions et marquer les dialogues action_idx = 0 n_setup = sum(1 for a in actions if a.get("_setup_action")) for i, action in enumerate(actions): if action.get("_setup_action"): continue if action_idx >= len(event_windows): break win = event_windows[action_idx] action_idx += 1 if not win or win == "unknown_window": continue # Marquer si c'est un dialogue connu OU une fenetre sans separateur app # entouree de fenetres d'app (transitoire) if _is_known_dialog(win): action["conditional_on_window"] = win logger.debug( "Action %s conditionnelle (dialogue connu) : '%s'", action.get("action_id", "?"), win, ) # Log resume n_conditional = sum(1 for a in actions if a.get("conditional_on_window")) if n_conditional: logger.info( "Blocs conditionnels : %d actions sur %d marquees comme dialogues", n_conditional, len(actions) - n_setup, ) return actions # --------------------------------------------------------------------------- # Point d'entree # --------------------------------------------------------------------------- def main(): """Demarrer le serveur Session Cleaner.""" import argparse parser = argparse.ArgumentParser( description="Session Cleaner -- Nettoyage de sessions avant replay", ) parser.add_argument( "--port", type=int, default=PORT, help=f"Port du serveur (defaut: {PORT})", ) parser.add_argument( "--host", default="0.0.0.0", help="Adresse d'ecoute (defaut: 0.0.0.0)", ) parser.add_argument( "--debug", action="store_true", help="Mode debug Flask", ) args = parser.parse_args() logger.info("Session Cleaner demarre sur http://%s:%d", args.host, args.port) logger.info("Repertoire sessions : %s", _resolve_sessions_dir()) logger.info("Serveur streaming : %s", STREAMING_SERVER) logger.info("Token API : %s", "configure" if API_TOKEN else "non configure") app.run(host=args.host, port=args.port, debug=args.debug) if __name__ == "__main__": main()