""" ExecutionLoop - Boucle d'exécution temps réel du RPA Vision V3 C'est le cœur du système qui fait tourner le RPA : 1. Capture l'écran en continu 2. Identifie l'état actuel (matching) 3. Détermine la prochaine action 4. Exécute l'action 5. Vérifie le résultat 6. Boucle jusqu'à la fin du workflow Modes d'exécution: - OBSERVATION: Observe sans agir (shadow mode) - COACHING: Suggère les actions, l'utilisateur exécute - SUPERVISED: Exécute avec confirmation à chaque étape - AUTOMATIC: Exécute automatiquement (pilote automatique) """ import logging import time import threading from typing import Optional, Dict, Any, Callable, List from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path import tempfile from core.models.workflow_graph import Workflow, WorkflowEdge, LearningState from core.models.screen_state import ScreenState from core.execution.action_executor import ActionExecutor, ExecutionResult, ExecutionStatus from core.capture.screen_capturer import ScreenCapturer, CaptureFrame from core.execution.target_resolver import TargetResolver, ResolvedTarget # Import tardif pour éviter import circulaire from typing import TYPE_CHECKING if TYPE_CHECKING: from core.pipeline.workflow_pipeline import WorkflowPipeline # GPU Resource Manager integration try: from core.gpu import GPUResourceManager, ExecutionMode as GPUExecutionMode, get_gpu_resource_manager GPU_MANAGER_AVAILABLE = True except ImportError: GPU_MANAGER_AVAILABLE = False # Continuous Learner integration try: from core.learning.continuous_learner import ContinuousLearner, DriftStatus CONTINUOUS_LEARNER_AVAILABLE = True except ImportError: CONTINUOUS_LEARNER_AVAILABLE = False # Execution Robustness integration try: from core.execution.execution_robustness import ExecutionRobustness, RetryManager ROBUSTNESS_AVAILABLE = True except ImportError: ROBUSTNESS_AVAILABLE = False # Analytics integration try: from core.analytics.analytics_system import get_analytics_system from core.analytics.integration.execution_integration import AnalyticsExecutionIntegration ANALYTICS_AVAILABLE = True except ImportError: ANALYTICS_AVAILABLE = False # Feedback and Correction integration try: from core.learning.feedback_processor import FeedbackProcessor, FeedbackType from core.corrections import get_correction_pack_integration, CorrectionPackIntegration FEEDBACK_AVAILABLE = True except ImportError: FEEDBACK_AVAILABLE = False # Training Data Collector integration try: from core.training.training_data_collector import TrainingDataCollector TRAINING_COLLECTOR_AVAILABLE = True except ImportError: TRAINING_COLLECTOR_AVAILABLE = False logger = logging.getLogger(__name__) class ExecutionMode(str, Enum): """Modes d'exécution du RPA""" OBSERVATION = "observation" # Observer sans agir COACHING = "coaching" # Suggérer, utilisateur exécute SUPERVISED = "supervised" # Exécuter avec confirmation AUTOMATIC = "automatic" # Pilote automatique class ExecutionState(str, Enum): """États de la boucle d'exécution""" IDLE = "idle" # En attente RUNNING = "running" # En cours d'exécution PAUSED = "paused" # En pause WAITING_CONFIRMATION = "waiting" # Attend confirmation utilisateur WAITING_COACHING = "coaching" # Attend décision coaching utilisateur COMPLETED = "completed" # Terminé avec succès FAILED = "failed" # Terminé en erreur STOPPED = "stopped" # Arrêté manuellement class CoachingDecision(str, Enum): """Décisions possibles en mode COACHING""" ACCEPT = "accept" # Accepter et exécuter l'action suggérée REJECT = "reject" # Rejeter l'action (passer ou terminer) CORRECT = "correct" # Fournir une correction EXECUTE_MANUAL = "manual" # L'utilisateur a exécuté manuellement SKIP = "skip" # Sauter cette action @dataclass class CoachingResponse: """Réponse de l'utilisateur en mode COACHING""" decision: CoachingDecision correction: Optional[Dict[str, Any]] = None # Correction si decision == CORRECT feedback: Optional[str] = None # Commentaire utilisateur executed_manually: bool = False # True si l'utilisateur a fait l'action lui-même @dataclass class ExecutionContext: """Contexte d'une exécution de workflow""" workflow_id: str execution_id: str mode: ExecutionMode started_at: datetime current_node_id: Optional[str] = None current_edge_id: Optional[str] = None steps_executed: int = 0 steps_succeeded: int = 0 steps_failed: int = 0 last_screenshot_path: Optional[str] = None last_match_confidence: float = 0.0 variables: Dict[str, Any] = field(default_factory=dict) history: List[Dict[str, Any]] = field(default_factory=list) @dataclass class StepResult: """Résultat d'une étape d'exécution""" success: bool node_id: str edge_id: Optional[str] action_result: Optional[ExecutionResult] match_confidence: float duration_ms: float message: str screenshot_path: Optional[str] = None # C1 — Instrumentation vision-aware ocr_ms: float = 0.0 # Temps OCR du ScreenState de ce step ui_ms: float = 0.0 # Temps détection UI de ce step total_ms: float = 0.0 # Temps total (alias de duration_ms pour cohérence) analyze_ms: float = 0.0 # Temps total analyse ScreenState (OCR + UI + reste) cache_hit: bool = False # True si ScreenState vient du cache degraded: bool = False # True si mode dégradé activé (timeout analyse) class ExecutionLoop: """ Boucle d'exécution principale du RPA. Gère le cycle complet : Capture → Matching → Action → Vérification → Boucle Example: >>> loop = ExecutionLoop(pipeline) >>> loop.start("workflow_login", mode=ExecutionMode.SUPERVISED) >>> # ... le RPA s'exécute ... >>> loop.stop() """ def __init__( self, pipeline: "WorkflowPipeline", action_executor: Optional[ActionExecutor] = None, screen_capturer: Optional[ScreenCapturer] = None, capture_interval_ms: int = 500, max_no_match_retries: int = 5, confirmation_callback: Optional[Callable[[str, Dict], bool]] = None, coaching_callback: Optional[Callable[[str, Dict], "CoachingResponse"]] = None, screen_analyzer: Optional[Any] = None, screen_state_cache: Optional[Any] = None, enable_ui_detection: bool = True, enable_ocr: bool = True, analyze_timeout_ms: int = 8000, window_info_provider: Optional[Callable[[], Optional[Dict[str, Any]]]] = None, ): """ Initialiser la boucle d'exécution. Args: pipeline: WorkflowPipeline pour matching et gestion workflows action_executor: Exécuteur d'actions (créé si None) screen_capturer: Capturer d'écran (créé si None) capture_interval_ms: Intervalle entre captures (ms) max_no_match_retries: Nombre max de tentatives si pas de match confirmation_callback: Callback pour demander confirmation (SUPERVISED) coaching_callback: Callback pour décisions coaching (COACHING) screen_analyzer: ScreenAnalyzer pour construire un ScreenState enrichi (lazy init via singleton si None) screen_state_cache: Cache perceptuel (lazy init via singleton si None) enable_ui_detection: Active la détection UI (True par défaut, flag d'urgence) enable_ocr: Active l'OCR (True par défaut) analyze_timeout_ms: Timeout soft pour l'analyse d'un ScreenState. Au-delà, on active le mode dégradé pour les steps suivants. window_info_provider: Callable renvoyant un dict window_info. Si None, on tente `screen_capturer.get_active_window()`. """ self.pipeline = pipeline self.action_executor = action_executor or ActionExecutor() self.screen_capturer = screen_capturer or ScreenCapturer( buffer_size=20, detect_changes=True ) self.target_resolver = TargetResolver( similarity_threshold=0.75, use_embedding_fallback=True ) self.capture_interval_ms = capture_interval_ms self.max_no_match_retries = max_no_match_retries self.confirmation_callback = confirmation_callback self.coaching_callback = coaching_callback # C1 — Vision-aware execution self._screen_analyzer = screen_analyzer # lazy init si None self._screen_state_cache = screen_state_cache # lazy init si None self.enable_ui_detection = enable_ui_detection self.enable_ocr = enable_ocr self.analyze_timeout_ms = analyze_timeout_ms self._window_info_provider = window_info_provider # Mode dégradé déclenché par un timeout analyse — persiste tant qu'un # probe n'a pas démontré la récupération (voir ci-dessous). self._degraded_mode = False # Auto-rétablissement : compteur de steps rapides consécutifs. # Si l'analyse tourne vite (< analyze_timeout_ms / 2) pendant # _fast_steps_recovery_threshold steps → on quitte le mode dégradé. self._successive_fast_steps = 0 self._fast_steps_recovery_threshold = 3 # En mode dégradé, on retente l'analyse tous les _probe_interval steps # pour détecter la récupération (les autres steps restent en stub pour # éviter de re-saturer le GPU). 10 par défaut = ~5s à 500ms/step. self._probe_interval = 10 self._degraded_step_counter = 0 # État interne self.state = ExecutionState.IDLE self.context: Optional[ExecutionContext] = None self._stop_requested = False self._pause_requested = False self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() # Mode capture continue self._use_continuous_capture = True self._last_frame: Optional[CaptureFrame] = None self._frame_queue: List[CaptureFrame] = [] self._frame_queue_lock = threading.Lock() # Callbacks pour événements self._on_step_complete: Optional[Callable[[StepResult], None]] = None self._on_state_change: Optional[Callable[[ExecutionState], None]] = None self._on_error: Optional[Callable[[str, Exception], None]] = None # Répertoire temporaire pour screenshots self._temp_dir = Path(tempfile.mkdtemp(prefix="rpa_vision_")) # GPU Resource Manager self._gpu_manager = None if GPU_MANAGER_AVAILABLE: try: self._gpu_manager = get_gpu_resource_manager() logger.info("GPU Resource Manager integrated") except Exception as e: logger.warning(f"GPU Resource Manager not available: {e}") # Continuous Learner pour apprentissage continu self._continuous_learner = None if CONTINUOUS_LEARNER_AVAILABLE: try: self._continuous_learner = ContinuousLearner() logger.info("Continuous Learner integrated") except Exception as e: logger.warning(f"Continuous Learner not available: {e}") # Historique des confidences pour détection de dérive self._confidence_history: Dict[str, List[float]] = {} # Execution Robustness pour retry et récupération self._robustness = None if ROBUSTNESS_AVAILABLE: try: self._robustness = ExecutionRobustness(pipeline) logger.info("Execution Robustness integrated") except Exception as e: logger.warning(f"Execution Robustness not available: {e}") # Analytics integration pour métriques et insights self._analytics_integration = None if ANALYTICS_AVAILABLE: try: analytics_system = get_analytics_system() self._analytics_integration = AnalyticsExecutionIntegration(analytics_system) logger.info("Analytics System integrated") except Exception as e: logger.warning(f"Analytics System not available: {e}") # Feedback and Correction Pack integration pour mode COACHING self._feedback_processor = None self._correction_integration = None if FEEDBACK_AVAILABLE: try: self._feedback_processor = FeedbackProcessor(auto_integrate_corrections=True) self._correction_integration = get_correction_pack_integration() logger.info("Feedback and Correction Pack integration enabled") except Exception as e: logger.warning(f"Feedback integration not available: {e}") # Training Data Collector pour enregistrement des sessions COACHING self._training_collector = None if TRAINING_COLLECTOR_AVAILABLE: try: self._training_collector = TrainingDataCollector( output_dir="data/coaching_sessions", auto_integrate_corrections=True ) logger.info("Training Data Collector integrated") except Exception as e: logger.warning(f"Training Data Collector not available: {e}") # Coaching state self._coaching_response: Optional[CoachingResponse] = None self._coaching_stats = { 'suggestions_made': 0, 'accepted': 0, 'rejected': 0, 'corrected': 0, 'manual_executions': 0 } logger.info("ExecutionLoop initialized") # ========================================================================= # Contrôle de l'exécution # ========================================================================= def start( self, workflow_id: str, mode: Optional[ExecutionMode] = None, start_node_id: Optional[str] = None, variables: Optional[Dict[str, Any]] = None ) -> bool: """ Démarrer l'exécution d'un workflow. Args: workflow_id: ID du workflow à exécuter mode: Mode d'exécution (auto-détecté si None) start_node_id: Node de départ (entry_node si None) variables: Variables initiales Returns: True si démarré avec succès """ with self._lock: if self.state == ExecutionState.RUNNING: logger.warning("Execution already running") return False # Charger le workflow workflow = self.pipeline.load_workflow(workflow_id) if not workflow: logger.error(f"Workflow not found: {workflow_id}") return False # Déterminer le mode d'exécution if mode is None: mode = self._determine_execution_mode(workflow) # Créer le contexte self.context = ExecutionContext( workflow_id=workflow_id, execution_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}", mode=mode, started_at=datetime.now(), current_node_id=start_node_id or (workflow.entry_nodes[0] if workflow.entry_nodes else None), variables=variables or {} ) self._stop_requested = False self._pause_requested = False self.state = ExecutionState.RUNNING # Notify GPU Resource Manager about execution mode self._update_gpu_mode(mode) # Notify Analytics about execution start if self._analytics_integration: try: self._analytics_integration.on_execution_start( workflow_id=workflow_id, execution_id=self.context.execution_id, mode=mode.value ) # Start resource monitoring for this execution self._analytics_integration.start_resource_monitoring( execution_id=self.context.execution_id ) except Exception as e: logger.warning(f"Analytics notification failed: {e}") # Démarrer le thread d'exécution self._thread = threading.Thread(target=self._execution_loop, daemon=True) self._thread.start() logger.info(f"Started execution of {workflow_id} in {mode.value} mode") self._notify_state_change(ExecutionState.RUNNING) return True def stop(self) -> None: """Arrêter l'exécution.""" with self._lock: self._stop_requested = True logger.info("Stop requested") def pause(self) -> None: """Mettre en pause l'exécution.""" with self._lock: self._pause_requested = True self.state = ExecutionState.PAUSED logger.info("Pause requested") self._notify_state_change(ExecutionState.PAUSED) def resume(self) -> None: """Reprendre l'exécution après pause.""" with self._lock: self._pause_requested = False self.state = ExecutionState.RUNNING logger.info("Resumed") self._notify_state_change(ExecutionState.RUNNING) def confirm_action(self, approved: bool = True) -> None: """Confirmer ou rejeter une action en attente.""" with self._lock: if self.state == ExecutionState.WAITING_CONFIRMATION: self._confirmation_result = approved self.state = ExecutionState.RUNNING # ========================================================================= # Boucle principale # ========================================================================= def _execution_loop(self) -> None: """Boucle principale d'exécution (tourne dans un thread).""" no_match_count = 0 # Démarrer la capture continue si activée if self._use_continuous_capture: self.screen_capturer.start_continuous( callback=self._on_frame_captured, interval_ms=self.capture_interval_ms, skip_unchanged=True ) logger.info("Started continuous screen capture") try: while not self._stop_requested: # Vérifier pause while self._pause_requested and not self._stop_requested: time.sleep(0.1) if self._stop_requested: break # Exécuter une étape step_result = self._execute_step() if step_result is None: # Pas de match trouvé no_match_count += 1 if no_match_count >= self.max_no_match_retries: logger.error(f"No match found after {no_match_count} retries") self._handle_no_match() break time.sleep(self.capture_interval_ms / 1000.0) continue no_match_count = 0 # Reset counter # Notifier le résultat if self._on_step_complete: self._on_step_complete(step_result) # Enregistrer dans l'historique self.context.history.append({ "timestamp": datetime.now().isoformat(), "node_id": step_result.node_id, "edge_id": step_result.edge_id, "success": step_result.success, "confidence": step_result.match_confidence }) # Notify Analytics about step completion # C1 — transmet tous les champs vision-aware (ocr_ms, ui_ms, # analyze_ms, cache_hit, degraded) au système analytics via # on_step_result qui accepte un StepResult complet. if self._analytics_integration and step_result: try: self._analytics_integration.on_step_result( execution_id=self.context.execution_id, workflow_id=self.context.workflow_id, step_result=step_result, ) except Exception as e: logger.warning(f"Analytics step notification failed: {e}") # Vérifier si workflow terminé if self._is_workflow_complete(): logger.info("Workflow completed successfully") self.state = ExecutionState.COMPLETED self._notify_state_change(ExecutionState.COMPLETED) break # Attendre avant prochaine capture time.sleep(self.capture_interval_ms / 1000.0) except Exception as e: logger.exception(f"Execution loop error: {e}") self.state = ExecutionState.FAILED self._notify_state_change(ExecutionState.FAILED) if self._on_error: self._on_error("execution_loop", e) finally: # Arrêter la capture continue if self._use_continuous_capture: self.screen_capturer.stop_continuous() logger.info("Stopped continuous screen capture") if self._stop_requested: self.state = ExecutionState.STOPPED self._notify_state_change(ExecutionState.STOPPED) # Notify Analytics about execution completion # Contrat normalisé (Lot A) : duration_ms + status explicite # au lieu du booléen success + duration ambigu. if self._analytics_integration and self.context: try: duration_ms = ( datetime.now() - self.context.started_at ).total_seconds() * 1000 # Mapping ExecutionState → status analytics if self.state == ExecutionState.COMPLETED: status = "completed" elif self.state == ExecutionState.FAILED: status = "failed" elif self.state == ExecutionState.STOPPED: status = "stopped" elif self.state == ExecutionState.PAUSED: # Pause non résolue à la sortie = blocage non récupéré status = "blocked" else: status = self.state.value error_message = ( None if status == "completed" else f"Execution ended in state: {self.state.value}" ) # Stop resource monitoring self._analytics_integration.stop_resource_monitoring( execution_id=self.context.execution_id ) self._analytics_integration.on_execution_complete( workflow_id=self.context.workflow_id, execution_id=self.context.execution_id, duration_ms=duration_ms, status=status, steps_total=self.context.steps_executed, steps_completed=self.context.steps_succeeded, steps_failed=self.context.steps_failed, error_message=error_message, ) except Exception as e: logger.warning(f"Analytics completion notification failed: {e}") logger.info(f"Execution loop ended: {self.state.value}") def _execute_step(self) -> Optional[StepResult]: """ Exécuter une étape du workflow. Returns: StepResult ou None si pas de match """ start_time = time.time() # 1. Capturer l'écran screenshot_path = self._capture_screen() if not screenshot_path: logger.warning("Failed to capture screen") return None self.context.last_screenshot_path = screenshot_path # 1bis. Construire un ScreenState enrichi (C1) — avec cache perceptuel screen_state, timings = self._build_screen_state(screenshot_path) logger.debug( f"[Step] ScreenState analyze={timings['analyze_ms']:.0f}ms " f"ocr={timings['ocr_ms']:.0f}ms ui={timings['ui_ms']:.0f}ms " f"cache_hit={timings['cache_hit']} degraded={timings['degraded']}" ) # 2. Identifier l'état actuel (matching) # # Lot E — on consomme le ScreenState enrichi déjà construit en 1bis # (avec ui_elements, detected_text, window_title réels) au lieu de # laisser le pipeline reconstruire un stub avec window_title="Unknown". # Premier vrai matching context-aware. match = self.pipeline.match_current_state_from_state( screen_state, workflow_id=self.context.workflow_id, ) if not match: logger.debug("No match found for current screen") return None current_node_id = match["node_id"] confidence = match["confidence"] self.context.current_node_id = current_node_id self.context.last_match_confidence = confidence logger.info(f"Matched node: {current_node_id} (confidence: {confidence:.3f})") # 3. Obtenir la prochaine action (C3 : sélection d'edge robuste) # # Lot A — contrat dict avec status explicite : # "terminal" → fin légitime du workflow (success=True) # "blocked" → pause supervisée (plus JAMAIS traité comme un succès # pour ne pas déclencher un faux _is_workflow_complete) # "selected" → action à exécuter # # Lot B — on propage la confidence du match courant (source_similarity) # pour que l'EdgeScorer puisse vérifier la précondition # `min_source_similarity` de chaque edge. Sans cette propagation, la # contrainte était silencieusement désactivée (hardcodé à 1.0). next_action = self.pipeline.get_next_action( self.context.workflow_id, current_node_id, screen_state=screen_state, source_similarity=confidence, ) # Rétrocompat défensive : si un pipeline legacy renvoie None ou un dict # sans status, on considère ça comme un blocage (safe default). if not isinstance(next_action, dict) or "status" not in next_action: logger.error( "get_next_action a renvoyé un résultat sans status " f"(legacy?). Valeur reçue: {next_action!r}" ) next_action = {"status": "blocked", "reason": "legacy_none_return"} action_status = next_action.get("status") if action_status == "terminal": # Fin légitime : aucun outgoing_edge sur le node courant total_ms = (time.time() - start_time) * 1000 return StepResult( success=True, node_id=current_node_id, edge_id=None, action_result=None, match_confidence=confidence, duration_ms=total_ms, message="Workflow terminated (terminal node)", screenshot_path=screenshot_path, ocr_ms=timings["ocr_ms"], ui_ms=timings["ui_ms"], analyze_ms=timings["analyze_ms"], total_ms=total_ms, cache_hit=timings["cache_hit"], degraded=timings["degraded"], ) if action_status == "blocked": # Blocage : des edges existent mais aucun n'est valide. # On déclenche une pause supervisée (paused_need_help) et on # remonte l'erreur. On ne retourne PAS success=True. reason = next_action.get("reason", "unknown") logger.warning( f"ExecutionLoop bloqué sur {current_node_id}: {reason} " f"→ pause supervisée demandée" ) # On bascule en PAUSED et on arme _pause_requested pour que la # boucle principale attende un resume() humain. self.state = ExecutionState.PAUSED self._pause_requested = True self._notify_state_change(ExecutionState.PAUSED) if self._on_error: try: self._on_error( "blocked", Exception(f"No valid edge from {current_node_id}: {reason}"), ) except Exception as cb_err: logger.debug(f"on_error callback failed: {cb_err}") total_ms = (time.time() - start_time) * 1000 return StepResult( success=False, node_id=current_node_id, edge_id=None, action_result=None, match_confidence=confidence, duration_ms=total_ms, message=f"Blocked: {reason}", screenshot_path=screenshot_path, ocr_ms=timings["ocr_ms"], ui_ms=timings["ui_ms"], analyze_ms=timings["analyze_ms"], total_ms=total_ms, cache_hit=timings["cache_hit"], degraded=timings["degraded"], ) # À partir d'ici, on est forcément en status="selected" edge_id = next_action["edge_id"] self.context.current_edge_id = edge_id # 4. Selon le mode, exécuter ou suggérer action_result = None if self.context.mode == ExecutionMode.OBSERVATION: # Mode observation : ne rien faire logger.info(f"[OBSERVATION] Would execute: {next_action['action']}") elif self.context.mode == ExecutionMode.COACHING: # Mode coaching : suggérer l'action et attendre décision utilisateur logger.info(f"[COACHING] Suggested action: {next_action['action']}") self._coaching_stats['suggestions_made'] += 1 # Demander la décision à l'utilisateur coaching_response = self._request_coaching_decision(next_action, screenshot_path) if coaching_response.decision == CoachingDecision.ACCEPT: # Utilisateur accepte : exécuter l'action suggérée self._coaching_stats['accepted'] += 1 action_result = self._execute_action(next_action, screen_state=screen_state) self._record_coaching_feedback( next_action, coaching_response, action_result, success=True ) elif coaching_response.decision == CoachingDecision.REJECT: # Utilisateur rejette : ne pas exécuter self._coaching_stats['rejected'] += 1 self._record_coaching_feedback( next_action, coaching_response, None, success=False ) total_ms = (time.time() - start_time) * 1000 return StepResult( success=False, node_id=current_node_id, edge_id=edge_id, action_result=None, match_confidence=confidence, duration_ms=total_ms, message="Action rejected by user in COACHING mode", screenshot_path=screenshot_path, ocr_ms=timings["ocr_ms"], ui_ms=timings["ui_ms"], analyze_ms=timings["analyze_ms"], total_ms=total_ms, cache_hit=timings["cache_hit"], degraded=timings["degraded"], ) elif coaching_response.decision == CoachingDecision.CORRECT: # Utilisateur fournit une correction self._coaching_stats['corrected'] += 1 corrected_action = self._apply_coaching_correction( next_action, coaching_response.correction ) action_result = self._execute_action(corrected_action, screen_state=screen_state) self._record_coaching_feedback( next_action, coaching_response, action_result, success=action_result.status == ExecutionStatus.SUCCESS if action_result else False ) elif coaching_response.decision == CoachingDecision.EXECUTE_MANUAL: # Utilisateur a exécuté manuellement self._coaching_stats['manual_executions'] += 1 self._record_coaching_feedback( next_action, coaching_response, None, success=True, manual=True ) # Pas besoin d'exécuter, l'utilisateur l'a fait elif coaching_response.decision == CoachingDecision.SKIP: # Sauter cette action self._record_coaching_feedback( next_action, coaching_response, None, success=True ) # Continuer sans exécuter elif self.context.mode == ExecutionMode.SUPERVISED: # Mode supervisé : demander confirmation if not self._request_confirmation(next_action): logger.info("Action rejected by user") total_ms = (time.time() - start_time) * 1000 return StepResult( success=False, node_id=current_node_id, edge_id=edge_id, action_result=None, match_confidence=confidence, duration_ms=total_ms, message="Action rejected by user", screenshot_path=screenshot_path, ocr_ms=timings["ocr_ms"], ui_ms=timings["ui_ms"], analyze_ms=timings["analyze_ms"], total_ms=total_ms, cache_hit=timings["cache_hit"], degraded=timings["degraded"], ) # Exécuter l'action action_result = self._execute_action(next_action, screen_state=screen_state) elif self.context.mode == ExecutionMode.AUTOMATIC: # Mode automatique : exécuter directement action_result = self._execute_action(next_action, screen_state=screen_state) # 5. Mettre à jour les compteurs self.context.steps_executed += 1 if action_result and action_result.status == ExecutionStatus.SUCCESS: self.context.steps_succeeded += 1 elif action_result: self.context.steps_failed += 1 duration_ms = (time.time() - start_time) * 1000 return StepResult( success=action_result.status == ExecutionStatus.SUCCESS if action_result else True, node_id=current_node_id, edge_id=edge_id, action_result=action_result, match_confidence=confidence, duration_ms=duration_ms, message=action_result.message if action_result else "Observed", screenshot_path=screenshot_path, ocr_ms=timings["ocr_ms"], ui_ms=timings["ui_ms"], analyze_ms=timings["analyze_ms"], total_ms=duration_ms, cache_hit=timings["cache_hit"], degraded=timings["degraded"], ) # ========================================================================= # Méthodes utilitaires # ========================================================================= def _capture_screen(self) -> Optional[str]: """Capturer l'écran et sauvegarder dans un fichier temporaire.""" try: screenshot = self.screen_capturer.capture_screen() if screenshot is None: return None # Sauvegarder dans fichier temporaire timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") screenshot_path = self._temp_dir / f"capture_{timestamp}.png" screenshot.save(str(screenshot_path)) return str(screenshot_path) except Exception as e: logger.error(f"Screen capture failed: {e}") return None def _execute_action( self, action_info: Dict[str, Any], screen_state: Optional[Any] = None, ) -> ExecutionResult: """ Exécuter une action via l'ActionExecutor. Args: action_info: dict action {edge_id, action, target_node, ...} screen_state: ScreenState enrichi (si None, fallback stub minimal) """ try: # Charger le workflow et l'edge workflow = self.pipeline.load_workflow(self.context.workflow_id) edge = workflow.get_edge(action_info["edge_id"]) if not edge: return ExecutionResult( status=ExecutionStatus.FAILED, message=f"Edge not found: {action_info['edge_id']}", duration_ms=0 ) # Utiliser le ScreenState enrichi fourni par le loop ; fallback minimal # uniquement si on n'en a pas (legacy, tests). if screen_state is None: screen_state = self._build_stub_screen_state() # Exécuter l'action result = self.action_executor.execute_edge( edge, screen_state, context=self.context.variables ) logger.info(f"Action executed: {result.status.value} - {result.message}") return result except Exception as e: logger.exception(f"Action execution failed: {e}") return ExecutionResult( status=ExecutionStatus.FAILED, message=str(e), duration_ms=0, error=e ) # ========================================================================= # C1 — Construction du ScreenState (vision-aware) # ========================================================================= def _get_screen_analyzer(self): """ Récupérer le ScreenAnalyzer (singleton partagé, lazy). Retourne None si indisponible (import error, etc.) — le loop bascule alors en fallback stub. Note Lot C : on ne passe plus `session_id` au singleton. Le session_id est désormais un paramètre d'appel de `analyze()`, pour éviter que deux ExecutionLoop partageant le même analyzer se marchent dessus. """ if self._screen_analyzer is not None: return self._screen_analyzer try: from core.pipeline import get_screen_analyzer self._screen_analyzer = get_screen_analyzer() return self._screen_analyzer except Exception as e: logger.warning(f"ScreenAnalyzer indisponible: {e}") return None def _get_screen_state_cache(self): """Récupérer le cache de ScreenState (singleton partagé, lazy).""" if self._screen_state_cache is not None: return self._screen_state_cache try: from core.pipeline import get_screen_state_cache self._screen_state_cache = get_screen_state_cache() return self._screen_state_cache except Exception as e: logger.warning(f"ScreenStateCache indisponible: {e}") return None def _resolve_window_info(self) -> Optional[Dict[str, Any]]: """ Récupérer les infos de la fenêtre active. Ordre de préférence : 1. `window_info_provider` fourni au constructeur 2. `screen_capturer.get_active_window()` 3. None → ScreenAnalyzer utilisera les valeurs par défaut """ if self._window_info_provider is not None: try: return self._window_info_provider() except Exception as e: logger.debug(f"window_info_provider failed: {e}") try: raw = self.screen_capturer.get_active_window() if raw: # Normaliser vers le format attendu par ScreenAnalyzer return { "title": raw.get("title", "Unknown"), "app_name": raw.get("app", "unknown"), "window_bounds": [ raw.get("x", 0), raw.get("y", 0), raw.get("width", 0), raw.get("height", 0), ], } except Exception as e: logger.debug(f"get_active_window failed: {e}") return None def _build_screen_state( self, screenshot_path: str, ) -> tuple: """ Construire un ScreenState enrichi depuis un screenshot. Logique : - Si enable_ui_detection=False ET enable_ocr=False → stub - Si analyseur indisponible → stub - Sinon : cache.get_or_compute(analyzer.analyze) - Timeout soft : si l'analyse dépasse `analyze_timeout_ms`, on log un warning et on active le mode dégradé pour les prochains steps. Returns: (screen_state, timings_dict) timings_dict: { "analyze_ms", "ocr_ms", "ui_ms", "cache_hit", "degraded" } """ timings = { "analyze_ms": 0.0, "ocr_ms": 0.0, "ui_ms": 0.0, "cache_hit": False, "degraded": False, } # Mode "tout désactivé" (flag d'urgence) → stub if not self.enable_ui_detection and not self.enable_ocr: timings["degraded"] = True return self._build_stub_screen_state(screenshot_path), timings analyzer = self._get_screen_analyzer() if analyzer is None: timings["degraded"] = True return self._build_stub_screen_state(screenshot_path), timings # Mode dégradé : on reste sur stub, sauf "probe" périodique qui teste # si le GPU est redevenu performant. Si oui, on accumule les steps # rapides ; après _fast_steps_recovery_threshold probes rapides # consécutifs on retourne en mode complet. if self._degraded_mode: self._degraded_step_counter += 1 if self._degraded_step_counter < self._probe_interval: timings["degraded"] = True return self._build_stub_screen_state(screenshot_path), timings # Sinon on tente un probe réel ci-dessous self._degraded_step_counter = 0 cache = self._get_screen_state_cache() # Invalidation proactive : si l'écran a massivement changé depuis # la dernière entrée du cache, on purge. Le TTL seul (2s) laisserait # passer des entrées obsolètes sur des changements rapides (popup, nav). if cache is not None: try: cache.invalidate_if_changed(screenshot_path, threshold=0.3) except Exception as e: logger.debug(f"invalidate_if_changed a échoué: {e}") window_info = self._resolve_window_info() # Fonction de calcul (cache miss) # Les flags runtime (enable_ocr, enable_ui_detection) et le session_id # sont passés en kwargs-only à analyze() : AUCUNE mutation de l'analyseur # singleton (Lot C — thread-safety, deux ExecutionLoop peuvent partager # le même analyzer sans se contaminer). execution_id = self.context.execution_id if self.context else "" def compute(path: str): t_start = time.time() state = analyzer.analyze( path, window_info=window_info, enable_ocr=self.enable_ocr, enable_ui_detection=self.enable_ui_detection, session_id=execution_id, ) elapsed = (time.time() - t_start) * 1000 # Annoter le temps dans les métadonnées if hasattr(state, "metadata"): state.metadata["analyze_ms"] = elapsed return state t0 = time.time() try: if cache is not None: # Lot D — clé composite context-aware : deux contextes # différents partageant le même screenshot n'entrent plus # en collision. Le workflow_id isole les replays par workflow, # les flags différencient les modes d'analyse (OCR on/off, # UI on/off), et le (window_title, app_name) distingue deux # applications qui présenteraient un rendu visuel similaire. ctx_window_title = (window_info or {}).get("title", "") or "" ctx_app_name = (window_info or {}).get("app_name", "") or "" ctx_workflow_id = ( self.context.workflow_id if self.context else "" ) state, cache_hit, _ = cache.get_or_compute( screenshot_path, compute, window_title=ctx_window_title, app_name=ctx_app_name, enable_ocr=self.enable_ocr, enable_ui_detection=self.enable_ui_detection, workflow_id=ctx_workflow_id, ) else: state = compute(screenshot_path) cache_hit = False except Exception as e: logger.warning(f"ScreenState build failed: {e} — fallback stub") timings["degraded"] = True return self._build_stub_screen_state(screenshot_path), timings analyze_ms = (time.time() - t0) * 1000 timings["analyze_ms"] = analyze_ms timings["cache_hit"] = cache_hit # Décomposer OCR vs UI si possible (métadonnées) meta = getattr(state, "metadata", {}) or {} timings["ocr_ms"] = float(meta.get("ocr_ms", 0.0)) timings["ui_ms"] = float(meta.get("ui_ms", 0.0)) # Timeout soft : activer le mode dégradé si > seuil # (cache_hit ignoré : un hit ne prouve rien sur la santé du GPU) if analyze_ms > self.analyze_timeout_ms and not cache_hit: logger.warning( f"ScreenState analysis slow: {analyze_ms:.0f}ms > " f"{self.analyze_timeout_ms}ms → activation mode dégradé" ) self._degraded_mode = True self._successive_fast_steps = 0 timings["degraded"] = True else: # Step "rapide" : incrémenter le compteur si < timeout / 2. # On ignore les cache hits (pas représentatifs de la perf GPU). fast_threshold_ms = self.analyze_timeout_ms / 2 if not cache_hit and analyze_ms < fast_threshold_ms: self._successive_fast_steps += 1 # Auto-rétablissement : si on était en dégradé et qu'on a # enchaîné assez de steps rapides → retour en mode complet. if ( self._degraded_mode and self._successive_fast_steps >= self._fast_steps_recovery_threshold ): logger.info( "Mode complet restauré après %d steps rapides " "(dernier analyze_ms=%.0fms < seuil=%.0fms)", self._successive_fast_steps, analyze_ms, fast_threshold_ms, ) self._degraded_mode = False self._successive_fast_steps = 0 elif not cache_hit: # Step ni lent ni rapide (entre timeout/2 et timeout) : reset self._successive_fast_steps = 0 # On propage l'état dégradé courant dans les timings (utile pour le # StepResult : tant qu'on n'a pas récupéré assez de steps rapides, # on continue à signaler "degraded=True"). timings["degraded"] = self._degraded_mode return state, timings def _build_stub_screen_state(self, screenshot_path: Optional[str] = None): """ Construire un ScreenState minimal (fallback legacy). Utilisé quand l'analyseur est indisponible ou que tous les flags de détection sont désactivés (flag d'urgence). """ from core.models.screen_state import ( ScreenState, WindowContext, RawLevel, PerceptionLevel, ContextLevel, EmbeddingRef ) path = screenshot_path or ( self.context.last_screenshot_path if self.context else "" ) or "" return ScreenState( screen_state_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}", timestamp=datetime.now(), session_id=self.context.execution_id if self.context else "stub", window=WindowContext( app_name="unknown", window_title="Unknown", screen_resolution=[1920, 1080], workspace="main", ), raw=RawLevel( screenshot_path=path, capture_method="execution", file_size_bytes=0, ), perception=PerceptionLevel( embedding=EmbeddingRef(provider="", vector_id="", dimensions=512), detected_text=[], text_detection_method="none", confidence_avg=0.0, ), context=ContextLevel(), ui_elements=[], ) def _request_confirmation(self, action_info: Dict[str, Any]) -> bool: """Demander confirmation à l'utilisateur.""" if self.confirmation_callback: return self.confirmation_callback( f"Execute action: {action_info['action']}?", action_info ) # Sans callback, attendre la confirmation via confirm_action() self.state = ExecutionState.WAITING_CONFIRMATION self._notify_state_change(ExecutionState.WAITING_CONFIRMATION) self._confirmation_result = None # Attendre la confirmation (timeout 60s) timeout = 60 start = time.time() while self._confirmation_result is None and (time.time() - start) < timeout: if self._stop_requested: return False time.sleep(0.1) return self._confirmation_result or False # ========================================================================= # Méthodes COACHING # ========================================================================= def _request_coaching_decision( self, action_info: Dict[str, Any], screenshot_path: Optional[str] = None ) -> CoachingResponse: """ Demander une décision à l'utilisateur en mode COACHING. Args: action_info: Information sur l'action suggérée screenshot_path: Chemin vers la capture d'écran actuelle Returns: CoachingResponse avec la décision de l'utilisateur """ # Si callback fourni, l'utiliser if self.coaching_callback: try: return self.coaching_callback( f"Suggested action: {action_info.get('action', 'unknown')}", { **action_info, 'screenshot_path': screenshot_path, 'context': { 'workflow_id': self.context.workflow_id if self.context else None, 'execution_id': self.context.execution_id if self.context else None, 'step': self.context.steps_executed if self.context else 0 } } ) except Exception as e: logger.error(f"Coaching callback error: {e}") # En cas d'erreur, retourner SKIP par défaut return CoachingResponse(decision=CoachingDecision.SKIP) # Sans callback, attendre la réponse via submit_coaching_decision() self.state = ExecutionState.WAITING_COACHING self._notify_state_change(ExecutionState.WAITING_COACHING) self._coaching_response = None # Attendre la réponse (timeout 120s pour COACHING car plus complexe) timeout = 120 start = time.time() while self._coaching_response is None and (time.time() - start) < timeout: if self._stop_requested: return CoachingResponse(decision=CoachingDecision.REJECT) time.sleep(0.1) # Timeout = SKIP par défaut if self._coaching_response is None: logger.warning("Coaching decision timeout - skipping action") return CoachingResponse(decision=CoachingDecision.SKIP) return self._coaching_response def submit_coaching_decision(self, response: CoachingResponse) -> bool: """ Soumettre une décision COACHING depuis l'extérieur (API, UI). Args: response: Réponse COACHING de l'utilisateur Returns: True si la décision a été acceptée """ if self.state != ExecutionState.WAITING_COACHING: logger.warning(f"Cannot submit coaching decision in state {self.state}") return False self._coaching_response = response logger.info(f"Coaching decision received: {response.decision.value}") return True def _apply_coaching_correction( self, original_action: Dict[str, Any], correction: Optional[Dict[str, Any]] ) -> Dict[str, Any]: """ Appliquer une correction utilisateur à une action. Args: original_action: Action originale suggérée correction: Correction fournie par l'utilisateur Returns: Action corrigée """ if not correction: return original_action corrected = {**original_action} # Appliquer les corrections de cible if 'target' in correction: corrected['target'] = correction['target'] if 'corrected_target' in correction: corrected['target'] = correction['corrected_target'] # Appliquer les corrections de paramètres if 'params' in correction: corrected['params'] = {**corrected.get('params', {}), **correction['params']} if 'corrected_params' in correction: corrected['params'] = {**corrected.get('params', {}), **correction['corrected_params']} # Appliquer changement d'action if 'action' in correction: corrected['action'] = correction['action'] # Appliquer timing if 'wait_before' in correction: corrected['wait_before'] = correction['wait_before'] if 'wait_after' in correction: corrected['wait_after'] = correction['wait_after'] logger.info(f"Applied coaching correction: {list(correction.keys())}") return corrected def _record_coaching_feedback( self, action_info: Dict[str, Any], response: CoachingResponse, result: Optional[ExecutionResult], success: bool, manual: bool = False ) -> None: """ Enregistrer le feedback COACHING pour apprentissage. Args: action_info: Action suggérée response: Réponse de l'utilisateur result: Résultat d'exécution (si exécuté) success: Si l'action a réussi manual: Si l'utilisateur a exécuté manuellement """ if not self.context: return # Préparer les données de correction correction_data = { 'action_type': action_info.get('action', 'unknown'), 'element_type': action_info.get('target', {}).get('type', 'unknown'), 'decision': response.decision.value, 'success': success, 'manual_execution': manual, 'feedback': response.feedback, 'workflow_id': self.context.workflow_id, 'node_id': self.context.current_node_id, 'step': self.context.steps_executed } # Si correction fournie, l'ajouter if response.correction: correction_data['correction_type'] = 'user_correction' correction_data['original_target'] = action_info.get('target') correction_data['original_params'] = action_info.get('params') correction_data['corrected_target'] = response.correction.get('target') or response.correction.get('corrected_target') correction_data['corrected_params'] = response.correction.get('params') or response.correction.get('corrected_params') # Enregistrer via TrainingDataCollector if self._training_collector: try: # S'assurer qu'une session est active if not self._training_collector.current_session: self._training_collector.start_session( session_id=self.context.execution_id, workflow_id=self.context.workflow_id ) # Enregistrer l'action self._training_collector.record_action({ 'action': action_info.get('action'), 'target': action_info.get('target'), 'decision': response.decision.value, 'success': success }) # Si correction, l'enregistrer (propagera auto vers Correction Packs) if response.correction or response.decision == CoachingDecision.REJECT: self._training_collector.record_correction(correction_data) except Exception as e: logger.warning(f"Error recording coaching feedback: {e}") # Enregistrer via FeedbackProcessor pour statistiques if self._feedback_processor and not success: try: feedback_type = FeedbackType.INCORRECT if response.decision == CoachingDecision.REJECT else FeedbackType.PARTIAL self._feedback_processor.process_feedback( workflow_id=self.context.workflow_id, execution_id=self.context.execution_id, feedback_type=feedback_type, confidence=self.context.last_match_confidence, corrections=response.correction, context={ 'action': action_info.get('action'), 'element_type': action_info.get('target', {}).get('type'), 'failure_reason': response.feedback or 'user_rejected' } ) except Exception as e: logger.warning(f"Error processing coaching feedback: {e}") logger.info( f"Coaching feedback recorded: decision={response.decision.value}, " f"success={success}, has_correction={response.correction is not None}" ) def get_coaching_stats(self) -> Dict[str, Any]: """Récupérer les statistiques COACHING de la session courante.""" stats = {**self._coaching_stats} if stats['suggestions_made'] > 0: stats['acceptance_rate'] = stats['accepted'] / stats['suggestions_made'] stats['correction_rate'] = stats['corrected'] / stats['suggestions_made'] else: stats['acceptance_rate'] = 0.0 stats['correction_rate'] = 0.0 return stats def _determine_execution_mode(self, workflow: Workflow) -> ExecutionMode: """Déterminer le mode d'exécution basé sur le learning_state.""" learning_state = workflow.learning_state if learning_state == "OBSERVATION": return ExecutionMode.OBSERVATION elif learning_state == "COACHING": return ExecutionMode.COACHING elif learning_state == "AUTO_CANDIDATE": return ExecutionMode.SUPERVISED elif learning_state == "AUTO_CONFIRMED": return ExecutionMode.AUTOMATIC else: return ExecutionMode.OBSERVATION def _is_workflow_complete(self) -> bool: """Vérifier si le workflow est terminé.""" if not self.context or not self.context.current_node_id: return False workflow = self.pipeline.load_workflow(self.context.workflow_id) if not workflow: return True # Vérifier si on est sur un node terminal return self.context.current_node_id in workflow.end_nodes def _handle_no_match(self) -> None: """Gérer le cas où aucun match n'est trouvé.""" logger.warning("No match found - pausing execution") self.state = ExecutionState.PAUSED self._notify_state_change(ExecutionState.PAUSED) if self._on_error: self._on_error("no_match", Exception("No matching node found")) def _notify_state_change(self, new_state: ExecutionState) -> None: """Notifier un changement d'état.""" if self._on_state_change: try: self._on_state_change(new_state) except Exception as e: logger.error(f"State change callback error: {e}") # ========================================================================= # Callbacks et événements # ========================================================================= def on_step_complete(self, callback: Callable[[StepResult], None]) -> None: """Enregistrer un callback pour chaque étape complétée.""" self._on_step_complete = callback def on_state_change(self, callback: Callable[[ExecutionState], None]) -> None: """Enregistrer un callback pour les changements d'état.""" self._on_state_change = callback def on_error(self, callback: Callable[[str, Exception], None]) -> None: """Enregistrer un callback pour les erreurs.""" self._on_error = callback # ========================================================================= # Getters # ========================================================================= def get_state(self) -> ExecutionState: """Obtenir l'état actuel.""" return self.state def get_context(self) -> Optional[ExecutionContext]: """Obtenir le contexte d'exécution.""" return self.context def get_progress(self) -> Dict[str, Any]: """Obtenir la progression de l'exécution.""" if not self.context: return {"status": "idle"} return { "status": self.state.value, "workflow_id": self.context.workflow_id, "execution_id": self.context.execution_id, "mode": self.context.mode.value, "current_node": self.context.current_node_id, "steps_executed": self.context.steps_executed, "steps_succeeded": self.context.steps_succeeded, "steps_failed": self.context.steps_failed, "last_confidence": self.context.last_match_confidence, "duration_seconds": (datetime.now() - self.context.started_at).total_seconds() } def cleanup(self) -> None: """Nettoyer les ressources temporaires.""" import shutil try: if self._temp_dir.exists(): shutil.rmtree(self._temp_dir) except Exception as e: logger.warning(f"Cleanup failed: {e}") # ========================================================================= # Capture continue # ========================================================================= def _on_frame_captured(self, frame: CaptureFrame) -> None: """Callback appelé pour chaque frame capturé en mode continu.""" with self._frame_queue_lock: self._last_frame = frame self._frame_queue.append(frame) # Garder seulement les 10 derniers frames en queue if len(self._frame_queue) > 10: self._frame_queue.pop(0) logger.debug(f"Frame captured: {frame.frame_id} (changed={frame.changed_from_previous})") def _get_latest_frame(self) -> Optional[CaptureFrame]: """Obtenir le dernier frame capturé.""" with self._frame_queue_lock: return self._last_frame def get_capture_stats(self) -> Dict[str, Any]: """Obtenir les statistiques de capture.""" stats = self.screen_capturer.get_stats() return { "total_captures": stats.total_captures, "captures_per_second": stats.captures_per_second, "unchanged_skipped": stats.unchanged_frames_skipped, "avg_capture_time_ms": stats.average_capture_time_ms, "buffer_size": stats.buffer_size, "memory_mb": stats.memory_usage_mb } def set_continuous_capture(self, enabled: bool) -> None: """Activer/désactiver la capture continue.""" self._use_continuous_capture = enabled logger.info(f"Continuous capture {'enabled' if enabled else 'disabled'}") # ========================================================================= # Target Resolution avancée # ========================================================================= def resolve_target_advanced( self, target_spec: Any, screen_state: ScreenState ) -> Optional[ResolvedTarget]: """ Résoudre une cible avec le resolver avancé. Args: target_spec: Spécification de la cible screen_state: État actuel de l'écran Returns: ResolvedTarget ou None """ return self.target_resolver.resolve_target(target_spec, screen_state) def get_resolver_stats(self) -> Dict[str, Any]: """Obtenir les statistiques du resolver.""" return self.target_resolver.get_stats() # ========================================================================= # GPU Resource Management # ========================================================================= def _update_gpu_mode(self, mode: ExecutionMode) -> None: """ Update GPU Resource Manager based on execution mode. Args: mode: Current execution mode """ if not self._gpu_manager or not GPU_MANAGER_AVAILABLE: return try: import asyncio # Map ExecutionLoop mode to GPU mode if mode == ExecutionMode.AUTOMATIC: gpu_mode = GPUExecutionMode.AUTOPILOT elif mode in [ExecutionMode.OBSERVATION, ExecutionMode.COACHING]: gpu_mode = GPUExecutionMode.RECORDING else: gpu_mode = GPUExecutionMode.IDLE # Run async mode change loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self._gpu_manager.set_execution_mode(gpu_mode)) loop.close() logger.info(f"GPU mode updated to {gpu_mode.value}") except Exception as e: logger.warning(f"Failed to update GPU mode: {e}") # ========================================================================= # Continuous Learning Integration # ========================================================================= def _update_prototype_on_success( self, node_id: str, embedding: Any ) -> None: """ Mettre à jour le prototype après une exécution réussie. Args: node_id: ID du node embedding: Embedding de l'état actuel """ if not self._continuous_learner or not CONTINUOUS_LEARNER_AVAILABLE: return try: import numpy as np if hasattr(embedding, 'get_vector'): vector = embedding.get_vector() elif isinstance(embedding, np.ndarray): vector = embedding else: return self._continuous_learner.update_prototype( node_id=node_id, new_embedding=vector, execution_success=True ) logger.debug(f"Prototype updated for node {node_id}") except Exception as e: logger.warning(f"Failed to update prototype: {e}") def _check_drift( self, node_id: str, confidence: float ) -> Optional[DriftStatus]: """ Vérifier la dérive UI après un match. Args: node_id: ID du node matché confidence: Confiance du match Returns: DriftStatus si dérive détectée """ if not self._continuous_learner or not CONTINUOUS_LEARNER_AVAILABLE: return None try: # Ajouter à l'historique if node_id not in self._confidence_history: self._confidence_history[node_id] = [] self._confidence_history[node_id].append(confidence) # Garder les 10 dernières self._confidence_history[node_id] = self._confidence_history[node_id][-10:] # Vérifier dérive drift_status = self._continuous_learner.detect_drift( node_id=node_id, recent_confidences=self._confidence_history[node_id] ) if drift_status.is_drifting: logger.warning( f"Drift detected for node {node_id}: " f"severity={drift_status.drift_severity:.2f}, " f"action={drift_status.recommended_action}" ) # Notifier via callback si disponible if self._on_error: self._on_error( "drift_detected", Exception(f"UI drift detected for node {node_id}") ) return drift_status except Exception as e: logger.warning(f"Failed to check drift: {e}") return None def get_learning_stats(self) -> Dict[str, Any]: """Obtenir les statistiques d'apprentissage continu.""" stats = { "continuous_learner_available": CONTINUOUS_LEARNER_AVAILABLE, "confidence_history": {} } for node_id, confidences in self._confidence_history.items(): if confidences: import numpy as np stats["confidence_history"][node_id] = { "count": len(confidences), "mean": float(np.mean(confidences)), "min": float(min(confidences)), "max": float(max(confidences)) } return stats # ============================================================================= # Factory function # ============================================================================= def create_execution_loop( pipeline: "WorkflowPipeline", capture_interval_ms: int = 500 ) -> ExecutionLoop: """ Créer une boucle d'exécution avec configuration par défaut. Args: pipeline: WorkflowPipeline capture_interval_ms: Intervalle de capture Returns: ExecutionLoop configuré """ return ExecutionLoop( pipeline=pipeline, capture_interval_ms=capture_interval_ms )