diff --git a/core/execution/execution_loop.py b/core/execution/execution_loop.py index 50b1c04f5..f2d1bf3a8 100644 --- a/core/execution/execution_loop.py +++ b/core/execution/execution_loop.py @@ -151,6 +151,13 @@ class StepResult: duration_ms: float message: str screenshot_path: Optional[str] = None + # C1 — Instrumentation vision-aware + ocr_ms: float = 0.0 # Temps OCR du ScreenState de ce step + ui_ms: float = 0.0 # Temps détection UI de ce step + total_ms: float = 0.0 # Temps total (alias de duration_ms pour cohérence) + analyze_ms: float = 0.0 # Temps total analyse ScreenState (OCR + UI + reste) + cache_hit: bool = False # True si ScreenState vient du cache + degraded: bool = False # True si mode dégradé activé (timeout analyse) class ExecutionLoop: @@ -175,7 +182,13 @@ class ExecutionLoop: capture_interval_ms: int = 500, max_no_match_retries: int = 5, confirmation_callback: Optional[Callable[[str, Dict], bool]] = None, - coaching_callback: Optional[Callable[[str, Dict], "CoachingResponse"]] = None + coaching_callback: Optional[Callable[[str, Dict], "CoachingResponse"]] = None, + screen_analyzer: Optional[Any] = None, + screen_state_cache: Optional[Any] = None, + enable_ui_detection: bool = True, + enable_ocr: bool = True, + analyze_timeout_ms: int = 8000, + window_info_provider: Optional[Callable[[], Optional[Dict[str, Any]]]] = None, ): """ Initialiser la boucle d'exécution. @@ -188,6 +201,15 @@ class ExecutionLoop: max_no_match_retries: Nombre max de tentatives si pas de match confirmation_callback: Callback pour demander confirmation (SUPERVISED) coaching_callback: Callback pour décisions coaching (COACHING) + screen_analyzer: ScreenAnalyzer pour construire un ScreenState enrichi + (lazy init via singleton si None) + screen_state_cache: Cache perceptuel (lazy init via singleton si None) + enable_ui_detection: Active la détection UI (True par défaut, flag d'urgence) + enable_ocr: Active l'OCR (True par défaut) + analyze_timeout_ms: Timeout soft pour l'analyse d'un ScreenState. + Au-delà, on active le mode dégradé pour les steps suivants. + window_info_provider: Callable renvoyant un dict window_info. Si None, + on tente `screen_capturer.get_active_window()`. """ self.pipeline = pipeline self.action_executor = action_executor or ActionExecutor() @@ -204,6 +226,27 @@ class ExecutionLoop: self.confirmation_callback = confirmation_callback self.coaching_callback = coaching_callback + # C1 — Vision-aware execution + self._screen_analyzer = screen_analyzer # lazy init si None + self._screen_state_cache = screen_state_cache # lazy init si None + self.enable_ui_detection = enable_ui_detection + self.enable_ocr = enable_ocr + self.analyze_timeout_ms = analyze_timeout_ms + self._window_info_provider = window_info_provider + # Mode dégradé déclenché par un timeout analyse — persiste tant qu'un + # probe n'a pas démontré la récupération (voir ci-dessous). + self._degraded_mode = False + # Auto-rétablissement : compteur de steps rapides consécutifs. + # Si l'analyse tourne vite (< analyze_timeout_ms / 2) pendant + # _fast_steps_recovery_threshold steps → on quitte le mode dégradé. + self._successive_fast_steps = 0 + self._fast_steps_recovery_threshold = 3 + # En mode dégradé, on retente l'analyse tous les _probe_interval steps + # pour détecter la récupération (les autres steps restent en stub pour + # éviter de re-saturer le GPU). 10 par défaut = ~5s à 500ms/step. + self._probe_interval = 10 + self._degraded_step_counter = 0 + # État interne self.state = ExecutionState.IDLE self.context: Optional[ExecutionContext] = None @@ -464,15 +507,15 @@ class ExecutionLoop: }) # Notify Analytics about step completion + # C1 — transmet tous les champs vision-aware (ocr_ms, ui_ms, + # analyze_ms, cache_hit, degraded) au système analytics via + # on_step_result qui accepte un StepResult complet. if self._analytics_integration and step_result: try: - self._analytics_integration.on_step_complete( - workflow_id=self.context.workflow_id, + self._analytics_integration.on_step_result( execution_id=self.context.execution_id, - step_id=step_result.node_id, - success=step_result.success, - duration_ms=step_result.duration_ms, - confidence=step_result.match_confidence + workflow_id=self.context.workflow_id, + step_result=step_result, ) except Exception as e: logger.warning(f"Analytics step notification failed: {e}") @@ -505,25 +548,47 @@ class ExecutionLoop: self._notify_state_change(ExecutionState.STOPPED) # Notify Analytics about execution completion + # Contrat normalisé (Lot A) : duration_ms + status explicite + # au lieu du booléen success + duration ambigu. if self._analytics_integration and self.context: try: - success = self.state == ExecutionState.COMPLETED - duration_ms = (datetime.now() - self.context.started_at).total_seconds() * 1000 - + duration_ms = ( + datetime.now() - self.context.started_at + ).total_seconds() * 1000 + + # Mapping ExecutionState → status analytics + if self.state == ExecutionState.COMPLETED: + status = "completed" + elif self.state == ExecutionState.FAILED: + status = "failed" + elif self.state == ExecutionState.STOPPED: + status = "stopped" + elif self.state == ExecutionState.PAUSED: + # Pause non résolue à la sortie = blocage non récupéré + status = "blocked" + else: + status = self.state.value + + error_message = ( + None + if status == "completed" + else f"Execution ended in state: {self.state.value}" + ) + # Stop resource monitoring self._analytics_integration.stop_resource_monitoring( execution_id=self.context.execution_id ) - + self._analytics_integration.on_execution_complete( workflow_id=self.context.workflow_id, execution_id=self.context.execution_id, - success=success, duration_ms=duration_ms, - steps_executed=self.context.steps_executed, - steps_succeeded=self.context.steps_succeeded, + status=status, + steps_total=self.context.steps_executed, + steps_completed=self.context.steps_succeeded, steps_failed=self.context.steps_failed, - error_message=None if success else f"Execution ended in state: {self.state.value}" + error_message=error_message, ) except Exception as e: logger.warning(f"Analytics completion notification failed: {e}") @@ -533,56 +598,142 @@ class ExecutionLoop: def _execute_step(self) -> Optional[StepResult]: """ Exécuter une étape du workflow. - + Returns: StepResult ou None si pas de match """ start_time = time.time() - + # 1. Capturer l'écran screenshot_path = self._capture_screen() if not screenshot_path: logger.warning("Failed to capture screen") return None - + self.context.last_screenshot_path = screenshot_path - - # 2. Identifier l'état actuel (matching) - match = self.pipeline.match_current_state( - screenshot_path, - workflow_id=self.context.workflow_id + + # 1bis. Construire un ScreenState enrichi (C1) — avec cache perceptuel + screen_state, timings = self._build_screen_state(screenshot_path) + logger.debug( + f"[Step] ScreenState analyze={timings['analyze_ms']:.0f}ms " + f"ocr={timings['ocr_ms']:.0f}ms ui={timings['ui_ms']:.0f}ms " + f"cache_hit={timings['cache_hit']} degraded={timings['degraded']}" ) - + + # 2. Identifier l'état actuel (matching) + # + # Lot E — on consomme le ScreenState enrichi déjà construit en 1bis + # (avec ui_elements, detected_text, window_title réels) au lieu de + # laisser le pipeline reconstruire un stub avec window_title="Unknown". + # Premier vrai matching context-aware. + match = self.pipeline.match_current_state_from_state( + screen_state, + workflow_id=self.context.workflow_id, + ) + if not match: logger.debug("No match found for current screen") return None - + current_node_id = match["node_id"] confidence = match["confidence"] self.context.current_node_id = current_node_id self.context.last_match_confidence = confidence - + logger.info(f"Matched node: {current_node_id} (confidence: {confidence:.3f})") - - # 3. Obtenir la prochaine action + + # 3. Obtenir la prochaine action (C3 : sélection d'edge robuste) + # + # Lot A — contrat dict avec status explicite : + # "terminal" → fin légitime du workflow (success=True) + # "blocked" → pause supervisée (plus JAMAIS traité comme un succès + # pour ne pas déclencher un faux _is_workflow_complete) + # "selected" → action à exécuter + # + # Lot B — on propage la confidence du match courant (source_similarity) + # pour que l'EdgeScorer puisse vérifier la précondition + # `min_source_similarity` de chaque edge. Sans cette propagation, la + # contrainte était silencieusement désactivée (hardcodé à 1.0). next_action = self.pipeline.get_next_action( self.context.workflow_id, - current_node_id + current_node_id, + screen_state=screen_state, + source_similarity=confidence, ) - - if not next_action: - # Pas d'action suivante = fin du workflow ou node terminal + + # Rétrocompat défensive : si un pipeline legacy renvoie None ou un dict + # sans status, on considère ça comme un blocage (safe default). + if not isinstance(next_action, dict) or "status" not in next_action: + logger.error( + "get_next_action a renvoyé un résultat sans status " + f"(legacy?). Valeur reçue: {next_action!r}" + ) + next_action = {"status": "blocked", "reason": "legacy_none_return"} + + action_status = next_action.get("status") + + if action_status == "terminal": + # Fin légitime : aucun outgoing_edge sur le node courant + total_ms = (time.time() - start_time) * 1000 return StepResult( success=True, node_id=current_node_id, edge_id=None, action_result=None, match_confidence=confidence, - duration_ms=(time.time() - start_time) * 1000, - message="No next action (terminal node)", - screenshot_path=screenshot_path + duration_ms=total_ms, + message="Workflow terminated (terminal node)", + screenshot_path=screenshot_path, + ocr_ms=timings["ocr_ms"], + ui_ms=timings["ui_ms"], + analyze_ms=timings["analyze_ms"], + total_ms=total_ms, + cache_hit=timings["cache_hit"], + degraded=timings["degraded"], ) - + + if action_status == "blocked": + # Blocage : des edges existent mais aucun n'est valide. + # On déclenche une pause supervisée (paused_need_help) et on + # remonte l'erreur. On ne retourne PAS success=True. + reason = next_action.get("reason", "unknown") + logger.warning( + f"ExecutionLoop bloqué sur {current_node_id}: {reason} " + f"→ pause supervisée demandée" + ) + # On bascule en PAUSED et on arme _pause_requested pour que la + # boucle principale attende un resume() humain. + self.state = ExecutionState.PAUSED + self._pause_requested = True + self._notify_state_change(ExecutionState.PAUSED) + if self._on_error: + try: + self._on_error( + "blocked", + Exception(f"No valid edge from {current_node_id}: {reason}"), + ) + except Exception as cb_err: + logger.debug(f"on_error callback failed: {cb_err}") + + total_ms = (time.time() - start_time) * 1000 + return StepResult( + success=False, + node_id=current_node_id, + edge_id=None, + action_result=None, + match_confidence=confidence, + duration_ms=total_ms, + message=f"Blocked: {reason}", + screenshot_path=screenshot_path, + ocr_ms=timings["ocr_ms"], + ui_ms=timings["ui_ms"], + analyze_ms=timings["analyze_ms"], + total_ms=total_ms, + cache_hit=timings["cache_hit"], + degraded=timings["degraded"], + ) + + # À partir d'ici, on est forcément en status="selected" edge_id = next_action["edge_id"] self.context.current_edge_id = edge_id @@ -604,7 +755,7 @@ class ExecutionLoop: if coaching_response.decision == CoachingDecision.ACCEPT: # Utilisateur accepte : exécuter l'action suggérée self._coaching_stats['accepted'] += 1 - action_result = self._execute_action(next_action) + action_result = self._execute_action(next_action, screen_state=screen_state) self._record_coaching_feedback( next_action, coaching_response, action_result, success=True ) @@ -615,15 +766,22 @@ class ExecutionLoop: self._record_coaching_feedback( next_action, coaching_response, None, success=False ) + total_ms = (time.time() - start_time) * 1000 return StepResult( success=False, node_id=current_node_id, edge_id=edge_id, action_result=None, match_confidence=confidence, - duration_ms=(time.time() - start_time) * 1000, + duration_ms=total_ms, message="Action rejected by user in COACHING mode", - screenshot_path=screenshot_path + screenshot_path=screenshot_path, + ocr_ms=timings["ocr_ms"], + ui_ms=timings["ui_ms"], + analyze_ms=timings["analyze_ms"], + total_ms=total_ms, + cache_hit=timings["cache_hit"], + degraded=timings["degraded"], ) elif coaching_response.decision == CoachingDecision.CORRECT: @@ -632,7 +790,7 @@ class ExecutionLoop: corrected_action = self._apply_coaching_correction( next_action, coaching_response.correction ) - action_result = self._execute_action(corrected_action) + action_result = self._execute_action(corrected_action, screen_state=screen_state) self._record_coaching_feedback( next_action, coaching_response, action_result, success=action_result.status == ExecutionStatus.SUCCESS if action_result else False @@ -658,33 +816,40 @@ class ExecutionLoop: # Mode supervisé : demander confirmation if not self._request_confirmation(next_action): logger.info("Action rejected by user") + total_ms = (time.time() - start_time) * 1000 return StepResult( success=False, node_id=current_node_id, edge_id=edge_id, action_result=None, match_confidence=confidence, - duration_ms=(time.time() - start_time) * 1000, + duration_ms=total_ms, message="Action rejected by user", - screenshot_path=screenshot_path + screenshot_path=screenshot_path, + ocr_ms=timings["ocr_ms"], + ui_ms=timings["ui_ms"], + analyze_ms=timings["analyze_ms"], + total_ms=total_ms, + cache_hit=timings["cache_hit"], + degraded=timings["degraded"], ) - + # Exécuter l'action - action_result = self._execute_action(next_action) - + action_result = self._execute_action(next_action, screen_state=screen_state) + elif self.context.mode == ExecutionMode.AUTOMATIC: # Mode automatique : exécuter directement - action_result = self._execute_action(next_action) - + action_result = self._execute_action(next_action, screen_state=screen_state) + # 5. Mettre à jour les compteurs self.context.steps_executed += 1 if action_result and action_result.status == ExecutionStatus.SUCCESS: self.context.steps_succeeded += 1 elif action_result: self.context.steps_failed += 1 - + duration_ms = (time.time() - start_time) * 1000 - + return StepResult( success=action_result.status == ExecutionStatus.SUCCESS if action_result else True, node_id=current_node_id, @@ -693,7 +858,13 @@ class ExecutionLoop: match_confidence=confidence, duration_ms=duration_ms, message=action_result.message if action_result else "Observed", - screenshot_path=screenshot_path + screenshot_path=screenshot_path, + ocr_ms=timings["ocr_ms"], + ui_ms=timings["ui_ms"], + analyze_ms=timings["analyze_ms"], + total_ms=duration_ms, + cache_hit=timings["cache_hit"], + degraded=timings["degraded"], ) # ========================================================================= @@ -718,61 +889,45 @@ class ExecutionLoop: logger.error(f"Screen capture failed: {e}") return None - def _execute_action(self, action_info: Dict[str, Any]) -> ExecutionResult: - """Exécuter une action via l'ActionExecutor.""" + def _execute_action( + self, + action_info: Dict[str, Any], + screen_state: Optional[Any] = None, + ) -> ExecutionResult: + """ + Exécuter une action via l'ActionExecutor. + + Args: + action_info: dict action {edge_id, action, target_node, ...} + screen_state: ScreenState enrichi (si None, fallback stub minimal) + """ try: # Charger le workflow et l'edge workflow = self.pipeline.load_workflow(self.context.workflow_id) edge = workflow.get_edge(action_info["edge_id"]) - + if not edge: return ExecutionResult( status=ExecutionStatus.FAILED, message=f"Edge not found: {action_info['edge_id']}", duration_ms=0 ) - - # Créer un ScreenState minimal pour l'exécution - from core.models.screen_state import ( - ScreenState, WindowContext, RawLevel, PerceptionLevel, - ContextLevel, EmbeddingRef - ) - - screen_state = ScreenState( - screen_state_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - timestamp=datetime.now(), - session_id=self.context.execution_id, - window=WindowContext( - app_name="unknown", - window_title="Unknown", - screen_resolution=[1920, 1080], - workspace="main" - ), - raw=RawLevel( - screenshot_path=self.context.last_screenshot_path or "", - capture_method="execution", - file_size_bytes=0 - ), - perception=PerceptionLevel( - embedding=EmbeddingRef(provider="", vector_id="", dimensions=512), - detected_text=[], - text_detection_method="none", - confidence_avg=0.0 - ), - context=ContextLevel(), - ui_elements=[] - ) - + + # Utiliser le ScreenState enrichi fourni par le loop ; fallback minimal + # uniquement si on n'en a pas (legacy, tests). + if screen_state is None: + screen_state = self._build_stub_screen_state() + # Exécuter l'action result = self.action_executor.execute_edge( edge, screen_state, context=self.context.variables ) - + logger.info(f"Action executed: {result.status.value} - {result.message}") return result - + except Exception as e: logger.exception(f"Action execution failed: {e}") return ExecutionResult( @@ -781,6 +936,286 @@ class ExecutionLoop: duration_ms=0, error=e ) + + # ========================================================================= + # C1 — Construction du ScreenState (vision-aware) + # ========================================================================= + + def _get_screen_analyzer(self): + """ + Récupérer le ScreenAnalyzer (singleton partagé, lazy). + + Retourne None si indisponible (import error, etc.) — le loop + bascule alors en fallback stub. + + Note Lot C : on ne passe plus `session_id` au singleton. Le session_id + est désormais un paramètre d'appel de `analyze()`, pour éviter que deux + ExecutionLoop partageant le même analyzer se marchent dessus. + """ + if self._screen_analyzer is not None: + return self._screen_analyzer + try: + from core.pipeline import get_screen_analyzer + self._screen_analyzer = get_screen_analyzer() + return self._screen_analyzer + except Exception as e: + logger.warning(f"ScreenAnalyzer indisponible: {e}") + return None + + def _get_screen_state_cache(self): + """Récupérer le cache de ScreenState (singleton partagé, lazy).""" + if self._screen_state_cache is not None: + return self._screen_state_cache + try: + from core.pipeline import get_screen_state_cache + self._screen_state_cache = get_screen_state_cache() + return self._screen_state_cache + except Exception as e: + logger.warning(f"ScreenStateCache indisponible: {e}") + return None + + def _resolve_window_info(self) -> Optional[Dict[str, Any]]: + """ + Récupérer les infos de la fenêtre active. + + Ordre de préférence : + 1. `window_info_provider` fourni au constructeur + 2. `screen_capturer.get_active_window()` + 3. None → ScreenAnalyzer utilisera les valeurs par défaut + """ + if self._window_info_provider is not None: + try: + return self._window_info_provider() + except Exception as e: + logger.debug(f"window_info_provider failed: {e}") + + try: + raw = self.screen_capturer.get_active_window() + if raw: + # Normaliser vers le format attendu par ScreenAnalyzer + return { + "title": raw.get("title", "Unknown"), + "app_name": raw.get("app", "unknown"), + "window_bounds": [ + raw.get("x", 0), + raw.get("y", 0), + raw.get("width", 0), + raw.get("height", 0), + ], + } + except Exception as e: + logger.debug(f"get_active_window failed: {e}") + return None + + def _build_screen_state( + self, + screenshot_path: str, + ) -> tuple: + """ + Construire un ScreenState enrichi depuis un screenshot. + + Logique : + - Si enable_ui_detection=False ET enable_ocr=False → stub + - Si analyseur indisponible → stub + - Sinon : cache.get_or_compute(analyzer.analyze) + - Timeout soft : si l'analyse dépasse `analyze_timeout_ms`, on log + un warning et on active le mode dégradé pour les prochains steps. + + Returns: + (screen_state, timings_dict) + timings_dict: { + "analyze_ms", "ocr_ms", "ui_ms", "cache_hit", "degraded" + } + """ + timings = { + "analyze_ms": 0.0, + "ocr_ms": 0.0, + "ui_ms": 0.0, + "cache_hit": False, + "degraded": False, + } + + # Mode "tout désactivé" (flag d'urgence) → stub + if not self.enable_ui_detection and not self.enable_ocr: + timings["degraded"] = True + return self._build_stub_screen_state(screenshot_path), timings + + analyzer = self._get_screen_analyzer() + if analyzer is None: + timings["degraded"] = True + return self._build_stub_screen_state(screenshot_path), timings + + # Mode dégradé : on reste sur stub, sauf "probe" périodique qui teste + # si le GPU est redevenu performant. Si oui, on accumule les steps + # rapides ; après _fast_steps_recovery_threshold probes rapides + # consécutifs on retourne en mode complet. + if self._degraded_mode: + self._degraded_step_counter += 1 + if self._degraded_step_counter < self._probe_interval: + timings["degraded"] = True + return self._build_stub_screen_state(screenshot_path), timings + # Sinon on tente un probe réel ci-dessous + self._degraded_step_counter = 0 + + cache = self._get_screen_state_cache() + + # Invalidation proactive : si l'écran a massivement changé depuis + # la dernière entrée du cache, on purge. Le TTL seul (2s) laisserait + # passer des entrées obsolètes sur des changements rapides (popup, nav). + if cache is not None: + try: + cache.invalidate_if_changed(screenshot_path, threshold=0.3) + except Exception as e: + logger.debug(f"invalidate_if_changed a échoué: {e}") + + window_info = self._resolve_window_info() + + # Fonction de calcul (cache miss) + # Les flags runtime (enable_ocr, enable_ui_detection) et le session_id + # sont passés en kwargs-only à analyze() : AUCUNE mutation de l'analyseur + # singleton (Lot C — thread-safety, deux ExecutionLoop peuvent partager + # le même analyzer sans se contaminer). + execution_id = self.context.execution_id if self.context else "" + + def compute(path: str): + t_start = time.time() + state = analyzer.analyze( + path, + window_info=window_info, + enable_ocr=self.enable_ocr, + enable_ui_detection=self.enable_ui_detection, + session_id=execution_id, + ) + elapsed = (time.time() - t_start) * 1000 + # Annoter le temps dans les métadonnées + if hasattr(state, "metadata"): + state.metadata["analyze_ms"] = elapsed + return state + + t0 = time.time() + try: + if cache is not None: + # Lot D — clé composite context-aware : deux contextes + # différents partageant le même screenshot n'entrent plus + # en collision. Le workflow_id isole les replays par workflow, + # les flags différencient les modes d'analyse (OCR on/off, + # UI on/off), et le (window_title, app_name) distingue deux + # applications qui présenteraient un rendu visuel similaire. + ctx_window_title = (window_info or {}).get("title", "") or "" + ctx_app_name = (window_info or {}).get("app_name", "") or "" + ctx_workflow_id = ( + self.context.workflow_id if self.context else "" + ) + state, cache_hit, _ = cache.get_or_compute( + screenshot_path, + compute, + window_title=ctx_window_title, + app_name=ctx_app_name, + enable_ocr=self.enable_ocr, + enable_ui_detection=self.enable_ui_detection, + workflow_id=ctx_workflow_id, + ) + else: + state = compute(screenshot_path) + cache_hit = False + except Exception as e: + logger.warning(f"ScreenState build failed: {e} — fallback stub") + timings["degraded"] = True + return self._build_stub_screen_state(screenshot_path), timings + + analyze_ms = (time.time() - t0) * 1000 + timings["analyze_ms"] = analyze_ms + timings["cache_hit"] = cache_hit + + # Décomposer OCR vs UI si possible (métadonnées) + meta = getattr(state, "metadata", {}) or {} + timings["ocr_ms"] = float(meta.get("ocr_ms", 0.0)) + timings["ui_ms"] = float(meta.get("ui_ms", 0.0)) + + # Timeout soft : activer le mode dégradé si > seuil + # (cache_hit ignoré : un hit ne prouve rien sur la santé du GPU) + if analyze_ms > self.analyze_timeout_ms and not cache_hit: + logger.warning( + f"ScreenState analysis slow: {analyze_ms:.0f}ms > " + f"{self.analyze_timeout_ms}ms → activation mode dégradé" + ) + self._degraded_mode = True + self._successive_fast_steps = 0 + timings["degraded"] = True + else: + # Step "rapide" : incrémenter le compteur si < timeout / 2. + # On ignore les cache hits (pas représentatifs de la perf GPU). + fast_threshold_ms = self.analyze_timeout_ms / 2 + if not cache_hit and analyze_ms < fast_threshold_ms: + self._successive_fast_steps += 1 + + # Auto-rétablissement : si on était en dégradé et qu'on a + # enchaîné assez de steps rapides → retour en mode complet. + if ( + self._degraded_mode + and self._successive_fast_steps + >= self._fast_steps_recovery_threshold + ): + logger.info( + "Mode complet restauré après %d steps rapides " + "(dernier analyze_ms=%.0fms < seuil=%.0fms)", + self._successive_fast_steps, + analyze_ms, + fast_threshold_ms, + ) + self._degraded_mode = False + self._successive_fast_steps = 0 + elif not cache_hit: + # Step ni lent ni rapide (entre timeout/2 et timeout) : reset + self._successive_fast_steps = 0 + + # On propage l'état dégradé courant dans les timings (utile pour le + # StepResult : tant qu'on n'a pas récupéré assez de steps rapides, + # on continue à signaler "degraded=True"). + timings["degraded"] = self._degraded_mode + + return state, timings + + def _build_stub_screen_state(self, screenshot_path: Optional[str] = None): + """ + Construire un ScreenState minimal (fallback legacy). + + Utilisé quand l'analyseur est indisponible ou que tous les flags + de détection sont désactivés (flag d'urgence). + """ + from core.models.screen_state import ( + ScreenState, WindowContext, RawLevel, PerceptionLevel, + ContextLevel, EmbeddingRef + ) + + path = screenshot_path or ( + self.context.last_screenshot_path if self.context else "" + ) or "" + + return ScreenState( + screen_state_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}", + timestamp=datetime.now(), + session_id=self.context.execution_id if self.context else "stub", + window=WindowContext( + app_name="unknown", + window_title="Unknown", + screen_resolution=[1920, 1080], + workspace="main", + ), + raw=RawLevel( + screenshot_path=path, + capture_method="execution", + file_size_bytes=0, + ), + perception=PerceptionLevel( + embedding=EmbeddingRef(provider="", vector_id="", dimensions=512), + detected_text=[], + text_detection_method="none", + confidence_avg=0.0, + ), + context=ContextLevel(), + ui_elements=[], + ) def _request_confirmation(self, action_info: Dict[str, Any]) -> bool: """Demander confirmation à l'utilisateur.""" diff --git a/tests/unit/test_workflow_pipeline_match_from_state.py b/tests/unit/test_workflow_pipeline_match_from_state.py new file mode 100644 index 000000000..c67803b0f --- /dev/null +++ b/tests/unit/test_workflow_pipeline_match_from_state.py @@ -0,0 +1,400 @@ +""" +Tests unitaires du matching context-aware — Lot E. + +Vérifient que ``WorkflowPipeline.match_current_state_from_state`` : + - Consomme réellement le ``ScreenState`` fourni (window_title, + detected_text, ui_elements) au lieu de le reconstruire en stub. + - Ne réinvoque PAS ``ScreenAnalyzer.analyze`` (le state est déjà prêt). + - Préfère le matching hiérarchique si un workflow est chargeable. + - Retombe sur FAISS quand le hiérarchique n'est pas applicable. + +Vérifient aussi que l'ancienne API ``match_current_state(screenshot_path, ...)`` +continue à fonctionner comme un **wrapper** qui invoque bien le +``ScreenAnalyzer`` puis délègue à ``match_current_state_from_state``. +""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from PIL import Image + +from core.models.screen_state import ( + ContextLevel, + EmbeddingRef, + PerceptionLevel, + RawLevel, + ScreenState, + WindowContext, +) +from core.pipeline.workflow_pipeline import WorkflowPipeline + + +# ----------------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------------- + + +def _make_enriched_state( + *, + window_title: str = "Bloc-Notes - Sans titre", + app_name: str = "notepad", + detected_text=None, + ui_elements=None, + screenshot_path: str = "", +) -> ScreenState: + """ScreenState enrichi utilisé pour simuler ce que ExecutionLoop fournit.""" + return ScreenState( + screen_state_id="state_lot_e", + timestamp=datetime.now(), + session_id="sess_lot_e", + window=WindowContext( + app_name=app_name, + window_title=window_title, + screen_resolution=[1920, 1080], + ), + raw=RawLevel( + screenshot_path=screenshot_path, + capture_method="test", + file_size_bytes=0, + ), + perception=PerceptionLevel( + embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512), + detected_text=detected_text if detected_text is not None else ["Fichier", "Édition"], + text_detection_method="test", + confidence_avg=0.9, + ), + context=ContextLevel(), + ui_elements=ui_elements if ui_elements is not None else [], + ) + + +def _make_pipeline_with_mocks(tmp_path) -> WorkflowPipeline: + """Construit une WorkflowPipeline minimale avec composants mockés. + + On évite d'instancier réellement CLIPEmbedder / UIDetector / VLM : + on bypass ``__init__`` et on injecte directement les collaborateurs + mockés. Plus rapide et plus déterministe. + """ + pipe = WorkflowPipeline.__new__(WorkflowPipeline) + + # Répertoires + pipe.data_dir = Path(tmp_path) + pipe.workflows_dir = pipe.data_dir / "workflows" + pipe.workflows_dir.mkdir(parents=True, exist_ok=True) + pipe.embeddings_dir = pipe.data_dir / "embeddings" + pipe.embeddings_dir.mkdir(parents=True, exist_ok=True) + pipe.screenshots_dir = pipe.data_dir / "screenshots" + pipe.screenshots_dir.mkdir(parents=True, exist_ok=True) + + # Collaborateurs mockés + fake_embedding = MagicMock() + fake_embedding.get_vector.return_value = [0.0] * 8 + fake_embedding.embedding_id = "emb_test" + + pipe.embedding_builder = MagicMock() + pipe.embedding_builder.build.return_value = fake_embedding + + pipe.faiss_manager = MagicMock() + pipe.faiss_manager.search.return_value = [] + + pipe.hierarchical_matcher = MagicMock() + + pipe.clip_embedder = MagicMock() + pipe.fusion_engine = MagicMock() + pipe.ui_detector = None + pipe.vlm_client = None + pipe.graph_builder = MagicMock() + pipe.node_matcher = MagicMock() + pipe.learning_manager = MagicMock() + pipe.target_resolver = MagicMock() + pipe.error_handler = MagicMock() + pipe.action_executor = MagicMock() + + pipe._workflows = {} + pipe._temporal_context = {} + + return pipe + + +def _fake_hierarchical_result( + node_id: str = "node_ok", + confidence: float = 0.82, +): + """Construit un MatchResult factice (compat avec l'API du HierarchicalMatcher).""" + result = MagicMock() + result.node_id = node_id + result.confidence = confidence + result.window_confidence = 0.9 + result.region_confidence = 0.8 + result.element_confidence = 0.85 + result.temporal_boost = 0.0 + result.matched_variant = None + result.alternatives = [] + result.match_time_ms = 1.0 + return result + + +# ----------------------------------------------------------------------------- +# 1. match_current_state_from_state — chemin hiérarchique +# ----------------------------------------------------------------------------- + + +class TestMatchFromStateHierarchical: + """ + Quand un workflow est chargeable, on passe par le HierarchicalMatcher + qui consomme window_title + ui_elements — c'est le cœur du Lot E. + """ + + def test_match_from_state_uses_provided_window_title(self, tmp_path): + """Le window_title fourni (Bloc-Notes) est transmis au matcher, + pas un stub "Unknown".""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result() + + state = _make_enriched_state(window_title="Bloc-Notes - Sans titre") + result = pipe.match_current_state_from_state(state, workflow_id="wf1") + + assert result is not None + # Le matcher hiérarchique a été appelé avec le vrai window_title + call_kwargs = pipe.hierarchical_matcher.match.call_args.kwargs + window_info = call_kwargs["window_info"] + assert window_info["title"] == "Bloc-Notes - Sans titre" + assert window_info["window_title"] == "Bloc-Notes - Sans titre" + # Pas de "Unknown" + assert "Unknown" not in window_info["title"] + + def test_match_from_state_uses_ui_elements(self, tmp_path): + """Les ui_elements du ScreenState sont transmis au matcher comme + detected_elements, pas remplacés par [].""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result() + + # 3 éléments factices + ui_elements = [MagicMock(), MagicMock(), MagicMock()] + state = _make_enriched_state(ui_elements=ui_elements) + pipe.match_current_state_from_state(state, workflow_id="wf1") + + call_kwargs = pipe.hierarchical_matcher.match.call_args.kwargs + passed_elements = call_kwargs["detected_elements"] + assert len(passed_elements) == 3 + assert passed_elements == ui_elements + + def test_match_from_state_uses_detected_text(self, tmp_path): + """Un ScreenState avec detected_text non vide doit être entièrement + transmis (pas remplacé par un stub vide).""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result() + + detected_text = ["Fichier", "Édition", "Affichage", "Aide"] + state = _make_enriched_state(detected_text=detected_text) + pipe.match_current_state_from_state(state, workflow_id="wf1") + + # Le state lui-même n'est pas passé directement au matcher, mais il + # ne doit pas avoir été réécrit en stub avant : on le vérifie + # indirectement via ses propriétés conservées. Le state original + # doit rester enrichi. + assert state.perception.detected_text == detected_text + assert state.perception.detected_text != [] + + def test_match_from_state_no_reconstruction(self, tmp_path): + """``ScreenAnalyzer.analyze`` ne doit PAS être appelé par + ``match_current_state_from_state`` — le state est déjà construit.""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result() + + state = _make_enriched_state() + + # On patche get_screen_analyzer globalement : si la nouvelle méthode + # invoque l'analyseur, le mock sera appelé. Attente : ZÉRO appel. + with patch( + "core.pipeline.get_screen_analyzer" + ) as mock_get_analyzer: + fake_analyzer = MagicMock() + mock_get_analyzer.return_value = fake_analyzer + + pipe.match_current_state_from_state(state, workflow_id="wf1") + + # get_screen_analyzer peut ou non être appelé (pas de garantie + # forte), mais en tout cas analyze() ne doit PAS l'être. + fake_analyzer.analyze.assert_not_called() + + def test_match_from_state_below_threshold_returns_none(self, tmp_path): + """Si le hiérarchique rend une confidence < min_similarity, on + retombe sur FAISS ; si FAISS ne trouve rien non plus, None.""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result( + confidence=0.1 + ) + pipe.faiss_manager.search.return_value = [] + + state = _make_enriched_state() + result = pipe.match_current_state_from_state( + state, workflow_id="wf1", min_similarity=0.5 + ) + assert result is None + + def test_match_from_state_returns_hierarchical_metadata(self, tmp_path): + """Le résultat doit inclure les confidences par niveau (window, + region, element) quand on passe par le hiérarchique.""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result( + node_id="node_42", confidence=0.77 + ) + + state = _make_enriched_state() + result = pipe.match_current_state_from_state( + state, workflow_id="wf1", min_similarity=0.5 + ) + assert result is not None + assert result["node_id"] == "node_42" + assert result["confidence"] == 0.77 + assert result["workflow_id"] == "wf1" + assert result["match_type"] == "hierarchical" + assert "window_confidence" in result + assert "region_confidence" in result + assert "element_confidence" in result + + +# ----------------------------------------------------------------------------- +# 2. match_current_state_from_state — fallback FAISS +# ----------------------------------------------------------------------------- + + +class TestMatchFromStateFAISSFallback: + """Si aucun workflow n'est chargeable, on tombe sur FAISS avec le state fourni.""" + + def test_fallback_faiss_when_no_workflow_id(self, tmp_path): + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.faiss_manager.search.return_value = [ + { + "similarity": 0.91, + "metadata": {"node_id": "n_faiss", "workflow_id": None}, + } + ] + + state = _make_enriched_state() + result = pipe.match_current_state_from_state(state, workflow_id=None) + + # Pas de hiérarchique (pas de workflow_id) + pipe.hierarchical_matcher.match.assert_not_called() + # FAISS a reçu le vecteur calculé sur le state enrichi + pipe.embedding_builder.build.assert_called_once_with(state) + assert result is not None + assert result["node_id"] == "n_faiss" + assert result["match_type"] == "faiss" + + def test_faiss_returns_none_below_threshold(self, tmp_path): + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.faiss_manager.search.return_value = [ + { + "similarity": 0.6, # < 0.85 + "metadata": {"node_id": "n_low", "workflow_id": None}, + } + ] + + state = _make_enriched_state() + result = pipe.match_current_state_from_state(state, workflow_id=None) + assert result is None + + +# ----------------------------------------------------------------------------- +# 3. Wrapper legacy match_current_state(screenshot_path, ...) +# ----------------------------------------------------------------------------- + + +class TestLegacyWrapper: + """ + L'ancienne API ``match_current_state(screenshot_path, ...)`` doit : + 1. Appeler ScreenAnalyzer.analyze pour enrichir le state. + 2. Déléguer à match_current_state_from_state. + """ + + def test_match_current_state_wrapper_calls_analyzer(self, tmp_path): + """Le wrapper legacy DOIT appeler ScreenAnalyzer.analyze.""" + pipe = _make_pipeline_with_mocks(tmp_path) + pipe.load_workflow = MagicMock(return_value=MagicMock(nodes=[MagicMock()])) + pipe.hierarchical_matcher.match.return_value = _fake_hierarchical_result() + + # Préparer un vrai fichier image pour le wrapper + shot = tmp_path / "shot.png" + Image.new("RGB", (64, 64), color=(100, 100, 100)).save(str(shot)) + + # Patcher l'analyseur partagé pour vérifier l'appel + fake_analyzer = MagicMock() + fake_analyzer.analyze.return_value = _make_enriched_state( + window_title="Calc", screenshot_path=str(shot) + ) + + with patch( + "core.pipeline.get_screen_analyzer", + return_value=fake_analyzer, + ): + result = pipe.match_current_state(str(shot), workflow_id="wf1") + + # L'analyseur a été invoqué + fake_analyzer.analyze.assert_called_once() + # Et on a un résultat (le hiérarchique a été appelé derrière) + assert result is not None + assert pipe.hierarchical_matcher.match.called + + def test_match_current_state_wrapper_delegates_to_from_state(self, tmp_path): + """Le wrapper délègue bien à match_current_state_from_state.""" + pipe = _make_pipeline_with_mocks(tmp_path) + + shot = tmp_path / "shot.png" + Image.new("RGB", (32, 32), color=(50, 50, 50)).save(str(shot)) + + fake_analyzer = MagicMock() + fake_analyzer.analyze.return_value = _make_enriched_state( + screenshot_path=str(shot) + ) + + # Espionner la nouvelle méthode (elle existe, on wrap) + with patch( + "core.pipeline.get_screen_analyzer", + return_value=fake_analyzer, + ), patch.object( + pipe, + "match_current_state_from_state", + return_value={"node_id": "x", "workflow_id": "wf1", "confidence": 0.9}, + ) as mock_from_state: + result = pipe.match_current_state(str(shot), workflow_id="wf1") + + mock_from_state.assert_called_once() + # Le state passé (args ou kwargs) est bien celui renvoyé par l'analyseur + call = mock_from_state.call_args + passed_state = call.args[0] if call.args else call.kwargs["screen_state"] + assert passed_state is fake_analyzer.analyze.return_value + assert result == {"node_id": "x", "workflow_id": "wf1", "confidence": 0.9} + + def test_wrapper_fallback_to_stub_when_analyzer_fails(self, tmp_path): + """Si l'analyseur est indisponible/plante, le wrapper retombe sur un + stub minimal pour garder la rétrocompat.""" + pipe = _make_pipeline_with_mocks(tmp_path) + + shot = tmp_path / "shot.png" + Image.new("RGB", (32, 32)).save(str(shot)) + + with patch( + "core.pipeline.get_screen_analyzer", + side_effect=RuntimeError("analyzer down"), + ), patch.object( + pipe, "match_current_state_from_state", return_value=None + ) as mock_from_state: + pipe.match_current_state(str(shot), workflow_id="wf1", window_title="Hint") + + mock_from_state.assert_called_once() + # Le state passé (args ou kwargs) est un stub (hint window_title respecté) + call = mock_from_state.call_args + passed_state = call.args[0] if call.args else call.kwargs["screen_state"] + assert passed_state.window.window_title == "Hint"