diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 811a30a43..8c2069d55 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -4548,21 +4548,35 @@ def _resolve_by_grounding( else: return None - # Redimensionner le screenshot (800px de large pour le VLM) + # Utiliser la capture fenêtre si disponible (plus ciblée, moins de bruit) + # Sinon fallback sur le full screen + window_capture = target_spec.get("window_capture", {}) + window_rect = window_capture.get("rect") # [x1, y1, x2, y2] écran + try: from PIL import Image as PILImage - img = PILImage.open(screenshot_path) + from pathlib import Path + + # Chercher le screenshot fenêtre (_window.png) + full_path = Path(screenshot_path) + win_path = full_path.parent / full_path.name.replace("_full.png", "_window.png") + + if win_path.is_file() and window_rect: + img = PILImage.open(str(win_path)) + using_window = True + logger.debug("Grounding : image fenêtre %s (%dx%d)", win_path.name, *img.size) + else: + img = PILImage.open(screenshot_path) + using_window = False + orig_w, orig_h = img.size - target_w = 800 - ratio = target_w / orig_w - img_small = img.resize((target_w, int(orig_h * ratio))) - small_w, small_h = img_small.size + small_w, small_h = orig_w, orig_h # pas de redimensionnement buf = io.BytesIO() - img_small.save(buf, format="JPEG", quality=75) + img.save(buf, format="JPEG", quality=80) shot_b64 = base64.b64encode(buf.getvalue()).decode() except Exception as e: - logger.warning("Grounding : erreur redimensionnement — %s", e) + logger.warning("Grounding : erreur chargement image — %s", e) return None # Prompt natif Qwen2.5-VL — format bbox_2d (le seul fiable) @@ -4723,10 +4737,25 @@ def _resolve_by_grounding( logger.info("Grounding : coordonnées hors bornes (%.3f, %.3f)", x_pct, y_pct) return None - logger.info( - "Grounding OK [%s] : '%s' → (%.4f, %.4f) en %.1fs", - _grounding_model, description[:50], x_pct, y_pct, elapsed, - ) + # Convertir coordonnées fenêtre → coordonnées écran + if using_window and window_rect: + win_x1, win_y1, win_x2, win_y2 = window_rect + win_w = win_x2 - win_x1 + win_h = win_y2 - win_y1 + # x_pct/y_pct sont relatifs à la fenêtre, convertir en relatif à l'écran + abs_x = win_x1 + x_pct * win_w + abs_y = win_y1 + y_pct * win_h + x_pct = abs_x / screen_width + y_pct = abs_y / screen_height + logger.info( + "Grounding OK [%s/window] : '%s' → (%.4f, %.4f) en %.1fs", + _grounding_model, description[:50], x_pct, y_pct, elapsed, + ) + else: + logger.info( + "Grounding OK [%s/full] : '%s' → (%.4f, %.4f) en %.1fs", + _grounding_model, description[:50], x_pct, y_pct, elapsed, + ) return { "resolved": True, diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py index 44595b6e7..714e5a223 100644 --- a/agent_v0/server_v1/stream_processor.py +++ b/agent_v0/server_v1/stream_processor.py @@ -459,11 +459,13 @@ def _vlm_identify_element(anchor_b64: str, window_title: str = "") -> str: tmp_path = tmp.name import requests as _requests + from core.detection.vlm_config import get_vlm_model + _enrich_model = get_vlm_model() context = f" from the window '{window_title}'" if window_title else "" - # Utiliser Qwen2.5-VL (meilleur pour l'identification UI que qwen3-vl) + # Modèle VLM configurable (gemma4:e4b par défaut) crop_b64 = base64.b64encode(open(tmp_path, "rb").read()).decode() resp = _requests.post("http://localhost:11434/api/chat", json={ - "model": "qwen2.5vl:7b", + "model": _enrich_model, "messages": [ {"role": "system", "content": "You name UI elements in 2-5 words. No explanation."}, {"role": "user", "content": ( @@ -771,6 +773,160 @@ def _load_crop_for_event( return None +# --------------------------------------------------------------------------- +# Enrichissement VLM partagé — utilisé par build_replay ET reprocess_session +# --------------------------------------------------------------------------- + +def enrich_click_from_screenshot( + screenshot_path: Path, + click_x: int, + click_y: int, + screen_w: int, + screen_h: int, + window_title: str = "", + vision_info: Optional[dict] = None, + session_dir: Optional[Path] = None, + screenshot_id: str = "", +) -> Dict[str, Any]: + """Enrichir un clic avec les informations visuelles extraites du screenshot. + + Fonction partagée entre build_replay_from_raw_events() et + _enrich_workflow_targets() pour éviter la duplication de logique. + + Étapes : + 1. Crop 80x80 autour du clic → anchor_image_base64 + 2. SomEngine → détection de l'élément cliqué (label, type, bbox) + 3. Description VLM positionnelle (haut/bas, gauche/droite) + 4. Texte de l'élément (vision_info OCR > SomEngine label) + + Args: + screenshot_path: Chemin vers le screenshot full (PNG). + click_x: Position X du clic en pixels. + click_y: Position Y du clic en pixels. + screen_w: Largeur de l'écran en pixels. + screen_h: Hauteur de l'écran en pixels. + window_title: Titre de la fenêtre active au moment du clic. + vision_info: Dict vision_info de l'événement original (optionnel). + session_dir: Répertoire de la session (pour le cache SomEngine). + screenshot_id: Identifiant du screenshot (pour le cache SomEngine). + + Returns: + Dict avec les clés : anchor_image_base64, by_text, by_text_source, + by_role, vlm_description, window_title, original_position, by_position, + som_element (optionnel). Retourne un dict vide si le screenshot est + introuvable ou illisible. + """ + import io + + if not screenshot_path or not Path(screenshot_path).is_file(): + return {} + + # ── 1. Crop 80x80 centré sur le clic (anchor_image_base64) ── + anchor_b64 = "" + try: + from PIL import Image + img = Image.open(screenshot_path) + crop_size = 40 + x1 = max(0, click_x - crop_size) + y1 = max(0, click_y - crop_size) + x2 = min(img.width, click_x + crop_size) + y2 = min(img.height, click_y + crop_size) + cropped = img.crop((x1, y1, x2, y2)) + buf = io.BytesIO() + cropped.save(buf, format="PNG") + anchor_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") + except Exception as e: + logger.debug("enrich_click: crop échoué pour %s : %s", screenshot_path, e) + + if not anchor_b64: + return {} + + # ── 2. Position relative dans l'écran ── + y_relative = "" + x_relative = "" + if screen_h > 0: + y_relative = ( + "en bas" if click_y / screen_h > 0.8 + else "en haut" if click_y / screen_h < 0.2 + else "au milieu" + ) + if screen_w > 0: + x_relative = ( + "à gauche" if click_x / screen_w < 0.3 + else "à droite" if click_x / screen_w > 0.7 + else "au centre" + ) + + # ── 3. Description VLM positionnelle ── + vlm_parts = [] + if window_title: + vlm_parts.append(f"Dans la fenêtre '{window_title}'") + position_desc = " ".join(p for p in [y_relative, x_relative] if p) + if position_desc: + vlm_parts.append(f"l'élément cliqué se trouve {position_desc} de l'écran") + + # Ajouter le texte visible (vision_info OCR) + if isinstance(vision_info, dict): + vis_text = vision_info.get("text", "") + vis_type = vision_info.get("type", "") + if vis_text: + vlm_parts.append(f"le texte visible est '{vis_text}'") + if vis_type: + vlm_parts.append(f"c'est un élément de type '{vis_type}'") + vlm_description = ", ".join(vlm_parts) if vlm_parts else "" + + # ── 4. SomEngine : identifier l'élément cliqué ── + som_elem = None + if session_dir and screenshot_id: + # Appeler _som_identify_clicked_element via un event_data minimal + fake_event = { + "screenshot_id": screenshot_id, + "pos": [click_x, click_y], + } + som_elem = _som_identify_clicked_element( + fake_event, session_dir, screen_w, screen_h, + ) + + # ── 5. Déterminer le texte et le type de l'élément ── + element_text = "" + element_type = "" + text_source = "" + if isinstance(vision_info, dict): + element_text = vision_info.get("text", "") + element_type = vision_info.get("type", "") + if element_text: + text_source = "ocr" + if not element_text and som_elem and som_elem.get("label"): + element_text = som_elem["label"] + text_source = "ocr" + + # ── 6. Coordonnées normalisées ── + by_position = [ + round(click_x / screen_w, 6) if screen_w > 0 else 0.0, + round(click_y / screen_h, 6) if screen_h > 0 else 0.0, + ] + + # ── Assembler le résultat ── + result = { + "anchor_image_base64": anchor_b64, + "by_text": element_text, + "by_text_source": text_source, + "by_role": element_type or (som_elem.get("source", "") if som_elem else ""), + "vlm_description": vlm_description, + "window_title": window_title, + "original_position": { + "x_relative": x_relative, + "y_relative": y_relative, + }, + "by_position": by_position, + } + + if som_elem: + result["som_element"] = som_elem + + return result + + def _attach_expected_screenshots( actions: list, raw_events: list, session_dir: Path, ) -> None: @@ -941,6 +1097,8 @@ def build_replay_from_raw_events( # sont convertis en text_input pour être fusionnés avec le texte adjacent. # Seul un changement de fenêtre (window_title différent) coupe la fusion. merged_events = [] + _altgr_seq_got_char = False # Vrai si on a déjà capturé le caractère d'une séquence AltGr + _in_altgr_seq = False # Vrai si on est dans une séquence AltGr (après un modifier-only ctrl+alt_gr) for evt in actionable_events: evt_type = evt.get("type", "") evt_ts = float(evt.get("timestamp", 0)) @@ -950,17 +1108,47 @@ def build_replay_from_raw_events( # Ex: AltGr+0 capturé comme ['ctrl', '@'] → text_input '@' if evt_type in ("key_combo", "key_press"): keys = _sanitize_keys(evt.get("keys", [])) + + # Ignorer les key_combo modifier-only (ex: ['ctrl', 'alt_gr'] seul) + non_mods = [k for k in keys if k.lower() not in _MODIFIER_ONLY_KEYS] + if not non_mods: + _in_altgr_seq = True # Début de séquence AltGr + _altgr_seq_got_char = False + continue + printable = _key_combo_printable_char(keys) - if not printable: + + # Filtrer les fantômes AltGr : sur AZERTY, AltGr+touche produit + # 3 events (ctrl+alt_gr, ctrl+@, ctrl+]). Le 3ème est un fantôme + # du release de la touche physique. On garde SEULEMENT le 1er + # caractère et on ignore les suivants de la même séquence. + if _in_altgr_seq and printable: + if not _altgr_seq_got_char: + # Premier caractère de la séquence AltGr → on le garde + _altgr_seq_got_char = True + evt = dict(evt, type="text_input", text=printable) + evt_type = "text_input" + else: + # Fantôme AltGr → ignorer complètement + continue + elif printable: + _in_altgr_seq = False + _altgr_seq_got_char = False + evt = dict(evt, type="text_input", text=printable) + evt_type = "text_input" + else: + _in_altgr_seq = False + _altgr_seq_got_char = False + + if not printable and evt_type != "text_input": # AltGr seul (AZERTY) : le caractère est dans les raw_keys - # du prochain text_input. Extraire depuis les raw_keys de cet event. raw_keys = evt.get("raw_keys", []) for rk in raw_keys: ch = rk.get("char", "") if ch and len(ch) == 1 and ch.isprintable() and rk.get("action") == "release": printable = ch break - if printable: + if printable and evt_type != "text_input": # Transformer en text_input pour fusion evt = dict(evt, type="text_input", text=printable) evt_type = "text_input" @@ -1077,103 +1265,61 @@ def build_replay_from_raw_events( if window.get("title"): action["window_title"] = window["title"] - # ── Visual replay : attacher le crop de référence (anchor) ── + # ── Visual replay : enrichissement VLM du clic ── if session_dir_path: - anchor_b64 = _load_crop_for_event( + # Stratégie de crop : d'abord chercher un crop pré-existant + # (vision_info.crop, screenshot_id_crop.png, focus_XXXX.png) + # puis fallback vers le crop 80x80 du full screenshot + anchor_b64_preexist = _load_crop_for_event( evt, session_dir_path, screen_w, screen_h, ) - if anchor_b64: + + # Vérifier si un enrichissement temps réel existe déjà + # (calculé par SomEngine pendant l'enregistrement via api_stream) + enrichment = evt.get("enrichment") + if enrichment: + logger.debug( + "Enrichissement temps réel trouvé pour %s (by_text='%s')", + evt.get("screenshot_id", "?"), + enrichment.get("by_text", ""), + ) + else: + # Pas d'enrichissement pré-calculé → appel SomEngine classique + screenshot_id = evt.get("screenshot_id", "") + full_path = session_dir_path / "shots" / f"{screenshot_id}_full.png" if screenshot_id else None + enrichment = enrich_click_from_screenshot( + screenshot_path=full_path, + click_x=int(pos[0]), + click_y=int(pos[1]), + screen_w=screen_w, + screen_h=screen_h, + window_title=window.get("title", ""), + vision_info=evt.get("vision_info"), + session_dir=session_dir_path, + screenshot_id=screenshot_id, + ) + + # Préférer le crop pré-existant (plus fiable : crop agent, focus) + if anchor_b64_preexist and enrichment: + enrichment["anchor_image_base64"] = anchor_b64_preexist + # Si enrich_click n'a pas pu cropper mais qu'on a un crop pré-existant + elif anchor_b64_preexist and not enrichment: + enrichment = {"anchor_image_base64": anchor_b64_preexist} + + if enrichment and enrichment.get("anchor_image_base64"): action["visual_mode"] = True - - # Construire une description VLM riche pour le replay VLM-first - window_title = window.get("title", "") - x_pos, y_pos = pos[0], pos[1] - - # Position relative dans l'écran - if screen_h > 0: - y_relative = ( - "en bas" if y_pos / screen_h > 0.8 - else "en haut" if y_pos / screen_h < 0.2 - else "au milieu" - ) - else: - y_relative = "" - if screen_w > 0: - x_relative = ( - "à gauche" if x_pos / screen_w < 0.3 - else "à droite" if x_pos / screen_w > 0.7 - else "au centre" - ) - else: - x_relative = "" - - # Description riche pour le VLM - vlm_parts = [] - if window_title: - vlm_parts.append( - f"Dans la fenêtre '{window_title}'" - ) - position_desc = " ".join( - p for p in [y_relative, x_relative] if p - ) - if position_desc: - vlm_parts.append( - f"l'élément cliqué se trouve {position_desc} de l'écran" - ) - # Ajouter le texte visible (vision_info ou OCR) - vision_info = evt.get("vision_info", {}) - if isinstance(vision_info, dict): - vis_text = vision_info.get("text", "") - vis_type = vision_info.get("type", "") - if vis_text: - vlm_parts.append( - f"le texte visible est '{vis_text}'" - ) - if vis_type: - vlm_parts.append( - f"c'est un élément de type '{vis_type}'" - ) - vlm_description = ", ".join(vlm_parts) if vlm_parts else "" - - # ── SomEngine : identifier l'élément cliqué ── - som_elem = _som_identify_clicked_element( - evt, session_dir_path, screen_w, screen_h, - ) - - # Déterminer le texte de l'élément cliqué (by_text) - # Priorité : vision_info.text > som_element.label - # Source "ocr" = fiable (texte réel), "vlm" = bavardage non-fiable - element_text = "" - element_type = "" - text_source = "" # "ocr" ou "vlm" - if isinstance(vision_info, dict): - element_text = vision_info.get("text", "") - element_type = vision_info.get("type", "") - if element_text: - text_source = "ocr" - if not element_text and som_elem and som_elem.get("label"): - element_text = som_elem["label"] - text_source = "ocr" - - # Icônes sans texte OCR → NE PAS utiliser le VLM pour nommer - # (descriptions non-déterministes qui font échouer le grounding) - # Le template matching du crop 80x80 sera utilisé à la place - action["target_spec"] = { - "anchor_image_base64": anchor_b64, - "by_text": element_text, - "by_text_source": text_source, # "ocr" = fiable, "" = icône - "by_role": element_type or (som_elem.get("source", "") if som_elem else ""), - "vlm_description": vlm_description, - "window_title": window_title, - "original_position": { - "x_relative": x_relative, - "y_relative": y_relative, - }, + k: v for k, v in enrichment.items() + if k != "by_position" # by_position est déjà dans x_pct/y_pct } - - if som_elem: - action["target_spec"]["som_element"] = som_elem + # Ajouter les métadonnées fenêtre pour le grounding ciblé + wc = evt.get("window_capture", {}) + if wc.get("rect"): + action["target_spec"]["window_capture"] = { + "rect": wc["rect"], + "window_size": wc.get("window_size"), + "click_relative": wc.get("click_relative"), + } elif evt_type == "text_input": text = evt.get("text", "") @@ -1288,7 +1434,11 @@ class StreamProcessor: def __init__(self, data_dir: str = "data/training"): self.data_dir = Path(data_dir) persist_dir = str(self.data_dir / "streaming_sessions") - self.session_manager = LiveSessionManager(persist_dir=persist_dir) + live_sessions_dir = str(self.data_dir / "live_sessions") + self.session_manager = LiveSessionManager( + persist_dir=persist_dir, + live_sessions_dir=live_sessions_dir, + ) self._lock = threading.Lock() # Core components (chargés paresseusement pour éviter les imports lourds au démarrage) @@ -1717,6 +1867,21 @@ class StreamProcessor: with self._data_lock: precomputed_embs = list(self._embeddings.get(session_id, [])) + # Enrichir les ScreenStates avec les timestamps des événements. + # Nécessaire pour que _find_transition_events() puisse associer + # les actions utilisateur aux bonnes transitions. + session_state = self.session_manager.get_session(session_id) + shot_ts_map = getattr(session_state, '_shot_ts_map', {}) if session_state else {} + if shot_ts_map: + self._enrich_states_with_timestamps(states, shot_ts_map) + + # Mode séquentiel : pour les enregistrements single-pass, chaque + # screenshot est une étape distincte du workflow. + # Le clustering DBSCAN fusionne les screenshots similaires + # (ex: plusieurs vues de Notepad) en un seul node → perte d'actions. + # Le mode séquentiel préserve toutes les étapes. + use_sequential = len(raw_session.events) > 0 + # Injecter les ScreenStates et embeddings pré-calculés pour éviter # de re-analyser et de recalculer les embeddings (triple calcul) workflow = builder.build_from_session( @@ -1724,6 +1889,7 @@ class StreamProcessor: workflow_name=workflow_name, precomputed_states=states, precomputed_embeddings=precomputed_embs if len(precomputed_embs) == len(states) else None, + sequential=use_sequential, ) with self._data_lock: @@ -1805,6 +1971,223 @@ class StreamProcessor: return None + # ========================================================================= + # Enrichissement VLM des workflows (target_spec sur chaque edge) + # ========================================================================= + + def _enrich_workflow_targets( + self, + workflow, + session_dir: Path, + ) -> int: + """Enrichir les target_spec des edges d'un workflow avec les données VLM. + + Pour chaque edge dont l'action est un clic (mouse_click ou compound + avec un step mouse_click), cette méthode : + 1. Trouve le screenshot correspondant (via le from_node → shot_XXXX_full.png) + 2. Extrait la position du clic depuis action.parameters.position + 3. Appelle enrich_click_from_screenshot() pour obtenir : + - anchor_image_base64 (crop 80x80) + - by_text (texte OCR de l'élément) + - by_role (type de l'élément) + - vlm_description (description positionnelle) + - som_element (détection SomEngine) + 4. Met à jour le TargetSpec de l'edge et stocke les données + supplémentaires dans context_hints et edge.metadata + + Args: + workflow: Objet Workflow avec ses edges à enrichir. + session_dir: Répertoire de la session (contient shots/). + + Returns: + Nombre d'edges enrichis. + """ + shots_dir = session_dir / "shots" + if not shots_dir.is_dir(): + logger.warning( + "enrich_workflow_targets: dossier shots/ introuvable dans %s", + session_dir, + ) + return 0 + + # ── Résolution d'écran depuis les événements ou fallback ── + screen_w, screen_h = self._get_screen_resolution_from_session(session_dir) + + # ── Mapping node_id → screenshot_id ── + # En mode séquentiel, node_000 → premier screenshot, node_001 → deuxième, etc. + # On reconstruit ce mapping depuis les fichiers shot_XXXX_full.png triés. + all_shots = sorted(shots_dir.glob("shot_*_full.png")) + node_to_shot: Dict[str, Path] = {} + for i, shot_path in enumerate(all_shots): + node_id = f"node_{i:03d}" + node_to_shot[node_id] = shot_path + + # ── Enrichir chaque edge ── + enriched_count = 0 + + for edge in workflow.edges: + # Trouver le screenshot du node source (là où le clic a lieu) + shot_path = node_to_shot.get(edge.from_node) + if not shot_path or not shot_path.is_file(): + logger.debug( + "enrich: pas de screenshot pour node %s (edge %s)", + edge.from_node, edge.edge_id, + ) + continue + + # Extraire le screenshot_id depuis le nom de fichier + # shot_0001_full.png → shot_0001 + screenshot_id = shot_path.stem.replace("_full", "") + + # ── Extraire la position du clic ── + click_positions = self._extract_click_positions(edge) + if not click_positions: + continue + + # Enrichir le premier clic (action principale) + click_x, click_y = click_positions[0] + + # Extraire le titre de fenêtre depuis le node source si disponible + window_title = "" + source_node = workflow.get_node(edge.from_node) + if source_node and hasattr(source_node, 'template'): + tpl = source_node.template + # WindowConstraint dans le template + if hasattr(tpl, 'window') and tpl.window: + if tpl.window.title_pattern: + window_title = tpl.window.title_pattern + elif tpl.window.title_contains: + window_title = tpl.window.title_contains + elif tpl.window.process_name: + window_title = tpl.window.process_name + # Fallback : contraintes de l'edge + if not window_title and edge.constraints: + window_title = ( + edge.constraints.required_window_title + or edge.constraints.required_app_name + or "" + ) + + # ── Appel à la fonction partagée d'enrichissement ── + enrichment = enrich_click_from_screenshot( + screenshot_path=shot_path, + click_x=int(click_x), + click_y=int(click_y), + screen_w=screen_w, + screen_h=screen_h, + window_title=window_title, + session_dir=session_dir, + screenshot_id=screenshot_id, + ) + + if not enrichment: + continue + + # ── Mettre à jour le TargetSpec de l'edge ── + target = edge.action.target + + # Champs first-class du TargetSpec + if enrichment.get("by_text"): + target.by_text = enrichment["by_text"] + if enrichment.get("by_role"): + target.by_role = enrichment["by_role"] + if enrichment.get("by_position"): + target.by_position = tuple(enrichment["by_position"]) + + # Champs supplémentaires dans context_hints + if not target.context_hints: + target.context_hints = {} + if enrichment.get("vlm_description"): + target.context_hints["vlm_description"] = enrichment["vlm_description"] + if enrichment.get("window_title"): + target.context_hints["window_title"] = enrichment["window_title"] + if enrichment.get("original_position"): + target.context_hints["original_position"] = enrichment["original_position"] + if enrichment.get("anchor_image_base64"): + target.context_hints["anchor_image_base64"] = enrichment["anchor_image_base64"] + if enrichment.get("by_text_source"): + target.context_hints["by_text_source"] = enrichment["by_text_source"] + if enrichment.get("som_element"): + target.context_hints["som_element"] = enrichment["som_element"] + + # Marquer l'edge comme enrichi dans les métadonnées + edge.metadata["vlm_enriched"] = True + edge.metadata["enrichment_source"] = "reprocess_session" + + enriched_count += 1 + logger.debug( + "Edge %s enrichi : by_text='%s', by_role='%s', anchor=%s", + edge.edge_id, + enrichment.get("by_text", ""), + enrichment.get("by_role", ""), + "oui" if enrichment.get("anchor_image_base64") else "non", + ) + + logger.info( + "Enrichissement VLM terminé : %d/%d edges enrichis pour workflow '%s'", + enriched_count, len(workflow.edges), workflow.name, + ) + return enriched_count + + def _extract_click_positions(self, edge) -> List[tuple]: + """Extraire les positions de clic depuis un edge du workflow. + + Supporte les actions simples (mouse_click) et compound (steps). + + Returns: + Liste de tuples (x, y) en pixels. Peut être vide si pas de clic. + """ + action = edge.action + positions = [] + + if action.type == "mouse_click": + pos = action.parameters.get("position", []) + if pos and len(pos) == 2: + positions.append((pos[0], pos[1])) + + elif action.type == "compound": + # Chercher les steps de type mouse_click + for step in action.parameters.get("steps", []): + if step.get("type") == "mouse_click": + pos = step.get("position", []) + if pos and len(pos) == 2: + positions.append((pos[0], pos[1])) + + return positions + + def _get_screen_resolution_from_session( + self, session_dir: Path, + ) -> tuple: + """Extraire la résolution d'écran depuis les événements d'une session. + + Lit live_events.jsonl et cherche screen_metadata.screen_resolution + ou infère depuis les positions des clics. + + Returns: + Tuple (width, height). Fallback: (1920, 1080). + """ + import json as _json + + events_file = session_dir / "live_events.jsonl" + if not events_file.exists(): + return (1920, 1080) + + events = [] + try: + for line in events_file.read_text().splitlines(): + if not line.strip(): + continue + try: + events.append(_json.loads(line)) + except _json.JSONDecodeError: + continue + except Exception: + return (1920, 1080) + + if events: + return _extract_screen_resolution(events) + return (1920, 1080) + # ========================================================================= # Retraitement (appelé par le SessionWorker) # ========================================================================= @@ -1867,6 +2250,11 @@ class StreamProcessor: # pour que ScreenAnalyzer crée des ScreenStates avec les bons titres de fenêtre self._restore_window_events(session_id, session_dir) + # Restaurer les événements utilisateur (mouse_click, text_input, key_press) + # depuis live_events.jsonl → session.events, pour que to_raw_session() + # puisse les passer au GraphBuilder (construction des edges/actions) + self._restore_user_events(session_id, session_dir) + # Nettoyer les données en mémoire (au cas où un traitement précédent a échoué) with self._data_lock: self._screen_states.pop(session_id, None) @@ -1948,18 +2336,48 @@ class StreamProcessor: # a été marquée comme finalisée. On n'a pas besoin de le refaire. # finalize_session() utilise les screen_states accumulés. result = self.finalize_session(session_id) + + # ── Enrichissement VLM des target_spec du workflow ── + # Après la construction du workflow, enrichir chaque edge avec les + # informations visuelles (by_text, by_role, vlm_description, anchor_image) + # extraites des screenshots via SomEngine. + if result.get("status") == "workflow_built" and result.get("workflow_id"): + workflow_id = result["workflow_id"] + with self._data_lock: + workflow = self._workflows.get(workflow_id) + if workflow and session_dir: + try: + enriched_count = self._enrich_workflow_targets(workflow, session_dir) + result["enriched_edges"] = enriched_count + result["total_edges"] = len(workflow.edges) + + # Re-sauvegarder le workflow enrichi sur disque + if enriched_count > 0 and result.get("saved_path"): + saved_path = Path(result["saved_path"]) + workflow.save_to_file(saved_path) + logger.info( + "Workflow enrichi re-sauvegardé : %s " + "(%d/%d edges enrichis)", + saved_path, enriched_count, len(workflow.edges), + ) + except Exception as e: + logger.error( + "Erreur enrichissement VLM du workflow %s : %s", + workflow_id, e, + ) + result["enrichment_error"] = str(e) + return result def _select_key_screenshots(self, session_id: str, shot_paths: List[Path]) -> List[Path]: """Sélectionner uniquement les screenshots significatifs pour éviter les analyses redondantes. Critères : - 1. Garder le premier et le dernier screenshot (toujours) - 2. Comparer chaque screenshot au précédent via hash perceptuel (32x32 grayscale) - 3. Si l'image est identique au précédent → skip (même écran, pas de changement) - 4. Privilégier les screenshots d'action (shot_*_full) vs heartbeat - - Réduit typiquement 12 screenshots à 3-4 screenshots utiles. + 1. Les screenshots d'action (shot_*_full) sont TOUJOURS conservés + car chacun correspond à une action utilisateur et est nécessaire + pour le mode séquentiel du GraphBuilder. + 2. Pour les heartbeats ou autres, comparer au précédent via hash perceptuel. + 3. Garder le premier et le dernier screenshot (toujours). """ if len(shot_paths) <= 2: return list(shot_paths) @@ -1972,28 +2390,32 @@ class StreamProcessor: for path in shot_paths: basename = os.path.basename(str(path)) - # Les screenshots d'action sont prioritaires + # Les screenshots d'action (shot_*_full) sont systématiquement gardés. + # Chacun correspond à un clic ou une saisie clavier et constitue + # une étape distincte du workflow — ne pas les dédupliquer. is_action = 'shot_' in basename and '_full' in basename + if is_action: + selected.append(path) + # Mettre à jour le hash pour la comparaison des heartbeats suivants + try: + img = Image.open(str(path)).resize((32, 32)).convert('L') + last_hash = hashlib.md5(img.tobytes()).hexdigest() + except Exception: + pass + continue - # Hash perceptuel : redimensionner à 32x32 en niveaux de gris - # Assez discriminant pour détecter les changements d'état de l'UI + # Hash perceptuel pour les non-actions (heartbeats, etc.) try: img = Image.open(str(path)).resize((32, 32)).convert('L') current_hash = hashlib.md5(img.tobytes()).hexdigest() except Exception as e: logger.debug(f"Impossible de hasher {basename}: {e}") - # En cas d'erreur, inclure le screenshot par sécurité selected.append(path) continue - # Inclure si : premier screenshot, hash différent, ou screenshot d'action if last_hash is None or current_hash != last_hash: selected.append(path) last_hash = current_hash - elif is_action: - # Action mais visuellement identique — skip quand même - # car l'état de l'écran n'a pas changé - logger.debug(f"Screenshot d'action {basename} identique au précédent, skip") # Garantir que le premier et le dernier sont toujours inclus if shot_paths[0] not in selected: @@ -2087,19 +2509,124 @@ class StreamProcessor: except Exception as e: logger.warning(f"Erreur restauration window events pour {session_id}: {e}") + def _restore_user_events(self, session_id: str, session_dir: Path): + """Restaurer les événements utilisateur depuis live_events.jsonl. + + Charge les événements d'action (mouse_click, text_input, key_press) + dans session.events via session_manager.add_event(). + Sans cela, to_raw_session() retourne une liste d'events vide, + et le GraphBuilder ne peut pas construire les actions des edges. + + Construit aussi un mapping shot_id → timestamp pour enrichir les + ScreenStates avec le bon event_time (nécessaire pour + _find_transition_events). + + Note : vide d'abord les events existants de la session pour + éviter les doublons (la session peut avoir été restaurée depuis + un fichier de persistance au démarrage). + """ + import json as _json + + events_file = session_dir / "live_events.jsonl" + if not events_file.exists(): + logger.debug(f"Pas de live_events.jsonl pour {session_id}") + return + + try: + # Vider les events existants pour éviter les doublons + # (la session peut avoir été pré-chargée depuis la persistance) + session = self.session_manager.get_session(session_id) + if session and session.events: + logger.debug( + f"Nettoyage de {len(session.events)} events " + f"pré-existants pour {session_id}" + ) + session.events.clear() + + action_count = 0 + shot_ts_map = {} # shot_id → timestamp de l'événement d'action + + for line in events_file.read_text().splitlines(): + if not line.strip(): + continue + try: + raw = _json.loads(line) + except _json.JSONDecodeError: + continue + + event_data = raw.get("event", raw) + evt_type = event_data.get("type", "") + ts = float(event_data.get("timestamp", raw.get("timestamp", 0))) + + if evt_type not in ("mouse_click", "text_input", "key_press"): + continue + + # Construire le dict d'événement pour add_event() + evt_dict = { + "type": evt_type, + "timestamp": ts, + } + + # Copier les données spécifiques au type + if evt_type == "mouse_click": + evt_dict["pos"] = event_data.get("pos", [0, 0]) + evt_dict["button"] = event_data.get("button", "left") + elif evt_type == "text_input": + evt_dict["text"] = event_data.get("text", "") + elif evt_type == "key_press": + evt_dict["keys"] = event_data.get("keys", []) + + # Copier window info si disponible + window = event_data.get("window") + if window: + evt_dict["window"] = window + + # Copier screenshot_id si disponible + shot_id = event_data.get("screenshot_id") + if shot_id: + evt_dict["screenshot_id"] = shot_id + # Mapper shot_id → timestamp pour enrichir les ScreenStates + shot_ts_map[shot_id] = ts + + # Copier screen_metadata si disponible + screen_meta = event_data.get("screen_metadata") + if screen_meta: + # Propager la résolution pour to_raw_session() + if "screen_resolution" in screen_meta: + evt_dict["screen_resolution"] = screen_meta["screen_resolution"] + + self.session_manager.add_event(session_id, evt_dict) + action_count += 1 + + # Stocker le mapping shot → timestamp dans la session + # pour enrichir les ScreenStates dans finalize_session() + session = self.session_manager.get_session(session_id) + if session: + session._shot_ts_map = shot_ts_map + + logger.info( + f"User events restaurés pour {session_id}: " + f"{action_count} actions chargées, " + f"{len(shot_ts_map)} shots avec timestamp" + ) + + except Exception as e: + logger.warning(f"Erreur restauration user events pour {session_id}: {e}") + def _find_session_dir(self, session_id: str) -> Optional[Path]: """Trouver le dossier d'une session sur disque. - Cherche dans : - 1. data/training/live_sessions/{session_id}/ - 2. data/training/live_sessions/{machine_id}/{session_id}/ (multi-machine) + Cherche dans (par ordre de priorité) : + 1. data/training/{session_id}/ + 2. data/training/{subdir}/{session_id}/ (ex: live_sessions) + 3. data/training/{subdir}/{machine_id}/{session_id}/ (multi-machine) """ # Chemin direct direct = self.data_dir / session_id if direct.is_dir() and (direct / "shots").exists(): return direct - # Chercher dans les sous-dossiers (machine_id) + # Chercher dans les sous-dossiers (1 niveau : live_sessions/{session_id}) parent = self.data_dir if parent.exists(): for subdir in parent.iterdir(): @@ -2107,6 +2634,13 @@ class StreamProcessor: candidate = subdir / session_id if candidate.is_dir() and (candidate / "shots").exists(): return candidate + # Chercher 1 niveau plus profond (live_sessions/{machine_id}/{session_id}) + if subdir.is_dir(): + for machine_dir in subdir.iterdir(): + if machine_dir.is_dir(): + candidate = machine_dir / session_id + if candidate.is_dir() and (candidate / "shots").exists(): + return candidate # Chercher aussi dans le parent du data_dir (cas où data_dir = streaming_sessions) parent_parent = self.data_dir.parent @@ -2205,6 +2739,81 @@ class StreamProcessor: return session_id in self._processed_sessions_cache + def _enrich_states_with_timestamps( + self, + states: List, + shot_ts_map: Dict[str, float], + ): + """Enrichir les ScreenStates avec les timestamps des événements d'action. + + Le GraphBuilder utilise metadata['shot_timestamp'] (ou 'event_time') + pour associer les événements utilisateur aux transitions entre states. + Sans cette info, _find_transition_events() ne sait pas quels événements + appartiennent à quelle transition. + + On fait un mapping par nom de shot : + - shot_XXXX → ScreenState dont le screen_state_id contient "XXXX" + ou par index séquentiel (shot_0001 → state[0], etc.) + + Args: + states: Liste de ScreenStates à enrichir (modifiés in-place) + shot_ts_map: Mapping {shot_id: timestamp_epoch} + """ + if not shot_ts_map: + return + + import re + + # Trier les shot_ids par numéro pour correspondre à l'ordre des states + sorted_shots = sorted( + shot_ts_map.items(), + key=lambda x: x[0], + ) + + # Stratégie 1 : mapping par index séquentiel + # shot_0001 → state[0], shot_0002 → state[1], etc. + enriched = 0 + for i, (shot_id, ts) in enumerate(sorted_shots): + if i < len(states): + state = states[i] + if state.metadata is None: + state.metadata = {} + state.metadata["shot_timestamp"] = ts + state.metadata["shot_id"] = shot_id + enriched += 1 + + # Pour les states restants (sans shot correspondant), + # interpoler entre les timestamps voisins + for i, state in enumerate(states): + if state.metadata and state.metadata.get("shot_timestamp"): + continue + if state.metadata is None: + state.metadata = {} + # Chercher les timestamps voisins + prev_ts = 0 + next_ts = float('inf') + for j in range(i - 1, -1, -1): + t = states[j].metadata.get("shot_timestamp") if states[j].metadata else None + if t: + prev_ts = t + break + for j in range(i + 1, len(states)): + t = states[j].metadata.get("shot_timestamp") if states[j].metadata else None + if t: + next_ts = t + break + if prev_ts > 0 and next_ts < float('inf'): + state.metadata["shot_timestamp"] = (prev_ts + next_ts) / 2 + elif prev_ts > 0: + state.metadata["shot_timestamp"] = prev_ts + 1.0 + elif next_ts < float('inf'): + state.metadata["shot_timestamp"] = next_ts - 1.0 + + logger.debug( + f"Timestamps enrichis: {enriched}/{len(states)} states " + f"depuis {len(shot_ts_map)} shots" + ) + def _cleanup_session_data(self, session_id: str): """Libérer la mémoire des ScreenStates et embeddings après finalization.""" with self._data_lock: