From cbe8dc95d2b029ca6f39e144a3673d2a50a285ec Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 20 Apr 2026 21:52:45 +0200 Subject: [PATCH] =?UTF-8?q?feat(cognition):=20timing=20+=20=C3=A9cran=20at?= =?UTF-8?q?tendu=20+=20auto-apprentissage=20Shadow=20+=20VLM=20qwen2.5vl?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mémoire de travail enrichie : - Timing par étape (durée, moyenne, alerte si lent) - Écran attendu vs observation réelle - Contexte VLM étendu VLM reasoning : default qwen2.5vl:3b (gemma4 ne supporte pas vision) Auto-apprentissage Shadow : - stream_processor apprend les dialogues automatiquement - Clic utilisateur après dialogue → pattern mémorisé - Sauvegardé dans data/learned_patterns.json GUI-R1 : 10 patterns additionnels extraits du dataset Co-Authored-By: Claude Opus 4.6 (1M context) --- agent_v0/server_v1/stream_processor.py | 130 +++++++++++++++++++++++++ core/cognition/working_memory.py | 59 ++++++++++- core/execution/input_handler.py | 2 +- 3 files changed, 189 insertions(+), 2 deletions(-) diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py index ef83ff9a1..05e481492 100644 --- a/agent_v0/server_v1/stream_processor.py +++ b/agent_v0/server_v1/stream_processor.py @@ -1791,6 +1791,10 @@ class StreamProcessor: # Workflows construits (pour le matching) self._workflows: Dict[str, Any] = {} + # Shadow learning : dernier pattern UI détecté par session + # Stocke {session_id: {"pattern": str, "ocr_text": str, "screen_state": obj, "shot_id": str}} + self._pending_ui_patterns: Dict[str, Dict[str, Any]] = {} + # Charger les workflows existants depuis le disque self._load_persisted_workflows() @@ -1975,6 +1979,9 @@ class StreamProcessor: - key_combo/key_press avec uniquement des modificateurs seuls (ctrl, alt, shift, etc.) - key_combo/key_press avec liste de touches vide - text_input avec texte vide + + Shadow learning : quand un clic suit un pattern UI détecté, + on apprend l'association dialogue→bouton. """ if _is_parasitic_event(event_data): logger.debug( @@ -1982,9 +1989,119 @@ class StreamProcessor: f"type={event_data.get('type')}, data={event_data.get('keys', event_data.get('text', ''))}" ) return {"status": "event_filtered", "session_id": session_id, "reason": "parasitic"} + + # Shadow learning : si un pattern UI est en attente et qu'on reçoit un clic + if event_data.get("type") == "mouse_click": + self._try_shadow_learn(session_id, event_data) + self.session_manager.add_event(session_id, event_data) return {"status": "event_recorded", "session_id": session_id} + def _try_shadow_learn(self, session_id: str, click_event: Dict[str, Any]): + """Tente d'apprendre un pattern UI depuis un clic observé en Shadow. + + Quand un screenshot contenait un pattern UI détecté (dialogue) et que + l'utilisateur clique ensuite, on extrait le texte OCR au point de clic + pour apprendre l'association : "quand je vois ce texte → cliquer sur ce bouton". + """ + with self._data_lock: + pending = self._pending_ui_patterns.pop(session_id, None) + if not pending: + return + + screen_state = pending.get("screen_state") + if screen_state is None: + return + + # Extraire la position du clic (pixels absolus) + pos = click_event.get("pos", []) + if not pos or len(pos) != 2: + return + + click_x, click_y = pos[0], pos[1] + + # Trouver le texte OCR le plus proche du point de clic + # via les ui_elements du ScreenState (ils ont bbox + label) + clicked_label = self._find_label_at_position(screen_state, click_x, click_y) + if not clicked_label: + return + + # Extraire le trigger principal du texte OCR du dialogue + ocr_text = pending.get("ocr_text", "") + # Utiliser un extrait court comme trigger (max 80 chars, premier segment pertinent) + trigger_text = ocr_text[:80].strip().lower() + if not trigger_text: + return + + logger.info( + f"Shadow learning: pattern '{pending['pattern_name']}' " + f"→ utilisateur a cliqué '{clicked_label}' | trigger='{trigger_text[:40]}...'" + ) + + # Sauvegarder le pattern appris + try: + from core.knowledge.ui_patterns import UIPatternLibrary + lib = UIPatternLibrary() + lib.save_learned_pattern({ + "category": "dialog", + "triggers": [trigger_text], + "action": "click", + "target": clicked_label, + "os": "windows", + "confidence": 0.8, + }) + except Exception as e: + logger.warning(f"Shadow learning: échec sauvegarde pattern: {e}") + + @staticmethod + def _find_label_at_position(screen_state, click_x: int, click_y: int) -> Optional[str]: + """Trouve le label de l'élément UI le plus proche du point de clic. + + Parcourt les ui_elements du ScreenState et retourne le label de + l'élément dont la bbox contient le point, ou le plus proche si aucun + ne contient exactement le point. + """ + ui_elements = getattr(screen_state, "ui_elements", []) + if not ui_elements: + return None + + best_label = None + best_dist = float("inf") + + for elem in ui_elements: + bbox = getattr(elem, "bbox", None) + label = getattr(elem, "label", "") + if not bbox or not label: + continue + + # BBox = (x, y, width, height) — extraire les coordonnées + try: + bx, by = bbox.x, bbox.y + bw, bh = bbox.width, bbox.height + except AttributeError: + # Fallback si bbox est une liste/tuple + if hasattr(bbox, '__len__') and len(bbox) >= 4: + bx, by, bw, bh = bbox[0], bbox[1], bbox[2], bbox[3] + else: + continue + + # Vérifier si le clic est dans la bbox + if bx <= click_x <= bx + bw and by <= click_y <= by + bh: + return label.strip() + + # Sinon calculer la distance au centre + cx = bx + bw / 2 + cy = by + bh / 2 + dist = ((click_x - cx) ** 2 + (click_y - cy) ** 2) ** 0.5 + if dist < best_dist: + best_dist = dist + best_label = label.strip() + + # Ne retourner le plus proche que s'il est raisonnablement proche (< 100px) + if best_label and best_dist < 100: + return best_label + return None + # ========================================================================= # Screenshots # ========================================================================= @@ -2055,6 +2172,19 @@ class StreamProcessor: result["ui_pattern_action"] = pattern["action"] result["ui_pattern_target"] = pattern["target"] logger.info(f"Pattern UI détecté: {pattern['pattern']} → {pattern['target']}") + + # Shadow learning : mémoriser le pattern en attente du clic utilisateur + with self._data_lock: + self._pending_ui_patterns[session_id] = { + "pattern_name": pattern["pattern"], + "ocr_text": ocr_text, + "screen_state": screen_state, + "shot_id": shot_id, + } + else: + # Pas de pattern connu → effacer le pending (l'écran a changé) + with self._data_lock: + self._pending_ui_patterns.pop(session_id, None) except ImportError: pass except Exception as e: diff --git a/core/cognition/working_memory.py b/core/cognition/working_memory.py index 4a135e931..8562ef692 100644 --- a/core/cognition/working_memory.py +++ b/core/cognition/working_memory.py @@ -77,10 +77,15 @@ class CognitiveContext: needs_help: bool = False help_reason: str = "" - # Métadonnées + # Timing session_id: str = "" machine_id: str = "" started_at: Optional[datetime] = None + step_started_at: Optional[datetime] = None + step_durations: Dict[str, List[float]] = field(default_factory=dict) + + # Ce que Léa devrait voir à l'écran (comparaison attendu vs réel) + expected_screen: str = "" def record_action(self, action_type: str, target: str = "", result: str = "", success: bool = True, @@ -117,10 +122,47 @@ class CognitiveContext: def advance_step(self): """Passe à l'étape suivante du plan.""" + # Enregistrer la durée de l'étape précédente + if self.step_started_at: + duration = (datetime.now() - self.step_started_at).total_seconds() + step_key = self.current_step_description or f"step_{self.current_step}" + self.step_durations.setdefault(step_key, []).append(duration) + self.current_step += 1 + self.step_started_at = datetime.now() if self.remaining_steps: self.current_step_description = self.remaining_steps.pop(0) + def get_step_timing(self) -> Optional[Dict[str, Any]]: + """Retourne les infos de timing de l'étape en cours.""" + if not self.step_started_at: + return None + + elapsed = (datetime.now() - self.step_started_at).total_seconds() + step_key = self.current_step_description or f"step_{self.current_step}" + history = self.step_durations.get(step_key, []) + avg = sum(history) / len(history) if history else None + + result = {"elapsed_seconds": elapsed} + if avg: + result["avg_previous"] = avg + result["is_slow"] = elapsed > avg * 2 + return result + + def set_expected_screen(self, description: str): + """Définit ce que Léa devrait voir à l'écran pour cette étape.""" + self.expected_screen = description + + def check_screen_matches_expected(self) -> Optional[bool]: + """Compare l'observation actuelle avec l'écran attendu.""" + if not self.expected_screen or not self.current_observation: + return None + obs_text = (self.current_observation.window_title + " " + + self.current_observation.ocr_text).lower() + expected_words = self.expected_screen.lower().split() + matches = sum(1 for w in expected_words if w in obs_text) + return matches / max(len(expected_words), 1) > 0.3 + def learn(self, fact: str): """Enregistre un fait appris pendant l'exécution.""" if fact not in self.learned_facts: @@ -175,6 +217,21 @@ class CognitiveContext: for step in self.remaining_steps[:3]: lines.append(f" - {step}") + timing = self.get_step_timing() + if timing: + lines.append(f"TEMPS ÉTAPE : {timing['elapsed_seconds']:.1f}s") + if timing.get('avg_previous'): + lines.append(f"MOYENNE PRÉCÉDENTE : {timing['avg_previous']:.1f}s") + if timing.get('is_slow'): + lines.append("⚠ ÉTAPE ANORMALEMENT LENTE") + + if self.expected_screen: + match = self.check_screen_matches_expected() + if match is False: + lines.append(f"⚠ ÉCRAN INATTENDU (attendu: {self.expected_screen})") + elif match is True: + lines.append(f"ÉCRAN CONFORME : {self.expected_screen}") + lines.append(f"CONFIANCE : {self.confidence:.0%}") if self.needs_help: diff --git a/core/execution/input_handler.py b/core/execution/input_handler.py index 49a44d03a..82cc8b375 100644 --- a/core/execution/input_handler.py +++ b/core/execution/input_handler.py @@ -287,7 +287,7 @@ Si l'écran est normal sans action nécessaire, réponds action="nothing". Réponds UNIQUEMENT le JSON, pas d'explication.""" ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434") - model = os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b")) + model = os.environ.get("RPA_REASONING_MODEL", os.environ.get("RPA_VLM_MODEL", "qwen2.5vl:3b")) response = requests.post( f"{ollama_url}/api/generate",