From d6e2530f2a5b25a8be0356120f483120773071f7 Mon Sep 17 00:00:00 2001 From: Dom Date: Sun, 18 Jan 2026 19:14:47 +0100 Subject: [PATCH] feat(execution): Implement complete COACHING mode in ExecutionLoop - Add CoachingDecision enum (ACCEPT, REJECT, CORRECT, EXECUTE_MANUAL, SKIP) - Add CoachingResponse dataclass for user decisions - Add WAITING_COACHING state to ExecutionState - Implement _request_coaching_decision() with callback or polling support - Implement submit_coaching_decision() for external API/UI submission - Implement _apply_coaching_correction() for applying user corrections - Implement _record_coaching_feedback() integrating with: - TrainingDataCollector for session recording - FeedbackProcessor for statistics - CorrectionPackIntegration for automatic correction capture - Add get_coaching_stats() for session statistics - Add 17 unit tests for COACHING functionality COACHING mode now: 1. Suggests actions to user 2. Waits for user decision (accept/reject/correct/manual/skip) 3. Applies corrections if provided 4. Records all feedback for learning 5. Propagates corrections to Correction Packs automatically Co-Authored-By: Claude Opus 4.5 --- core/execution/execution_loop.py | 1361 ++++++++++++++++++++++++++++++ tests/test_coaching_loop.py | 278 ++++++ 2 files changed, 1639 insertions(+) create mode 100644 core/execution/execution_loop.py create mode 100644 tests/test_coaching_loop.py diff --git a/core/execution/execution_loop.py b/core/execution/execution_loop.py new file mode 100644 index 000000000..50b1c04f5 --- /dev/null +++ b/core/execution/execution_loop.py @@ -0,0 +1,1361 @@ +""" +ExecutionLoop - Boucle d'exécution temps réel du RPA Vision V3 + +C'est le cœur du système qui fait tourner le RPA : +1. Capture l'écran en continu +2. Identifie l'état actuel (matching) +3. Détermine la prochaine action +4. Exécute l'action +5. Vérifie le résultat +6. Boucle jusqu'à la fin du workflow + +Modes d'exécution: +- OBSERVATION: Observe sans agir (shadow mode) +- COACHING: Suggère les actions, l'utilisateur exécute +- SUPERVISED: Exécute avec confirmation à chaque étape +- AUTOMATIC: Exécute automatiquement (pilote automatique) +""" + +import logging +import time +import threading +from typing import Optional, Dict, Any, Callable, List +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +import tempfile + +from core.models.workflow_graph import Workflow, WorkflowEdge, LearningState +from core.models.screen_state import ScreenState +from core.execution.action_executor import ActionExecutor, ExecutionResult, ExecutionStatus +from core.capture.screen_capturer import ScreenCapturer, CaptureFrame +from core.execution.target_resolver import TargetResolver, ResolvedTarget + +# Import tardif pour éviter import circulaire +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from core.pipeline.workflow_pipeline import WorkflowPipeline + +# GPU Resource Manager integration +try: + from core.gpu import GPUResourceManager, ExecutionMode as GPUExecutionMode, get_gpu_resource_manager + GPU_MANAGER_AVAILABLE = True +except ImportError: + GPU_MANAGER_AVAILABLE = False + +# Continuous Learner integration +try: + from core.learning.continuous_learner import ContinuousLearner, DriftStatus + CONTINUOUS_LEARNER_AVAILABLE = True +except ImportError: + CONTINUOUS_LEARNER_AVAILABLE = False + +# Execution Robustness integration +try: + from core.execution.execution_robustness import ExecutionRobustness, RetryManager + ROBUSTNESS_AVAILABLE = True +except ImportError: + ROBUSTNESS_AVAILABLE = False + +# Analytics integration +try: + from core.analytics.analytics_system import get_analytics_system + from core.analytics.integration.execution_integration import AnalyticsExecutionIntegration + ANALYTICS_AVAILABLE = True +except ImportError: + ANALYTICS_AVAILABLE = False + +# Feedback and Correction integration +try: + from core.learning.feedback_processor import FeedbackProcessor, FeedbackType + from core.corrections import get_correction_pack_integration, CorrectionPackIntegration + FEEDBACK_AVAILABLE = True +except ImportError: + FEEDBACK_AVAILABLE = False + +# Training Data Collector integration +try: + from core.training.training_data_collector import TrainingDataCollector + TRAINING_COLLECTOR_AVAILABLE = True +except ImportError: + TRAINING_COLLECTOR_AVAILABLE = False + +logger = logging.getLogger(__name__) + + +class ExecutionMode(str, Enum): + """Modes d'exécution du RPA""" + OBSERVATION = "observation" # Observer sans agir + COACHING = "coaching" # Suggérer, utilisateur exécute + SUPERVISED = "supervised" # Exécuter avec confirmation + AUTOMATIC = "automatic" # Pilote automatique + + +class ExecutionState(str, Enum): + """États de la boucle d'exécution""" + IDLE = "idle" # En attente + RUNNING = "running" # En cours d'exécution + PAUSED = "paused" # En pause + WAITING_CONFIRMATION = "waiting" # Attend confirmation utilisateur + WAITING_COACHING = "coaching" # Attend décision coaching utilisateur + COMPLETED = "completed" # Terminé avec succès + FAILED = "failed" # Terminé en erreur + STOPPED = "stopped" # Arrêté manuellement + + +class CoachingDecision(str, Enum): + """Décisions possibles en mode COACHING""" + ACCEPT = "accept" # Accepter et exécuter l'action suggérée + REJECT = "reject" # Rejeter l'action (passer ou terminer) + CORRECT = "correct" # Fournir une correction + EXECUTE_MANUAL = "manual" # L'utilisateur a exécuté manuellement + SKIP = "skip" # Sauter cette action + + +@dataclass +class CoachingResponse: + """Réponse de l'utilisateur en mode COACHING""" + decision: CoachingDecision + correction: Optional[Dict[str, Any]] = None # Correction si decision == CORRECT + feedback: Optional[str] = None # Commentaire utilisateur + executed_manually: bool = False # True si l'utilisateur a fait l'action lui-même + + +@dataclass +class ExecutionContext: + """Contexte d'une exécution de workflow""" + workflow_id: str + execution_id: str + mode: ExecutionMode + started_at: datetime + current_node_id: Optional[str] = None + current_edge_id: Optional[str] = None + steps_executed: int = 0 + steps_succeeded: int = 0 + steps_failed: int = 0 + last_screenshot_path: Optional[str] = None + last_match_confidence: float = 0.0 + variables: Dict[str, Any] = field(default_factory=dict) + history: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class StepResult: + """Résultat d'une étape d'exécution""" + success: bool + node_id: str + edge_id: Optional[str] + action_result: Optional[ExecutionResult] + match_confidence: float + duration_ms: float + message: str + screenshot_path: Optional[str] = None + + +class ExecutionLoop: + """ + Boucle d'exécution principale du RPA. + + Gère le cycle complet : + Capture → Matching → Action → Vérification → Boucle + + Example: + >>> loop = ExecutionLoop(pipeline) + >>> loop.start("workflow_login", mode=ExecutionMode.SUPERVISED) + >>> # ... le RPA s'exécute ... + >>> loop.stop() + """ + + def __init__( + self, + pipeline: "WorkflowPipeline", + action_executor: Optional[ActionExecutor] = None, + screen_capturer: Optional[ScreenCapturer] = None, + capture_interval_ms: int = 500, + max_no_match_retries: int = 5, + confirmation_callback: Optional[Callable[[str, Dict], bool]] = None, + coaching_callback: Optional[Callable[[str, Dict], "CoachingResponse"]] = None + ): + """ + Initialiser la boucle d'exécution. + + Args: + pipeline: WorkflowPipeline pour matching et gestion workflows + action_executor: Exécuteur d'actions (créé si None) + screen_capturer: Capturer d'écran (créé si None) + capture_interval_ms: Intervalle entre captures (ms) + max_no_match_retries: Nombre max de tentatives si pas de match + confirmation_callback: Callback pour demander confirmation (SUPERVISED) + coaching_callback: Callback pour décisions coaching (COACHING) + """ + self.pipeline = pipeline + self.action_executor = action_executor or ActionExecutor() + self.screen_capturer = screen_capturer or ScreenCapturer( + buffer_size=20, + detect_changes=True + ) + self.target_resolver = TargetResolver( + similarity_threshold=0.75, + use_embedding_fallback=True + ) + self.capture_interval_ms = capture_interval_ms + self.max_no_match_retries = max_no_match_retries + self.confirmation_callback = confirmation_callback + self.coaching_callback = coaching_callback + + # État interne + self.state = ExecutionState.IDLE + self.context: Optional[ExecutionContext] = None + self._stop_requested = False + self._pause_requested = False + self._thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + + # Mode capture continue + self._use_continuous_capture = True + self._last_frame: Optional[CaptureFrame] = None + self._frame_queue: List[CaptureFrame] = [] + self._frame_queue_lock = threading.Lock() + + # Callbacks pour événements + self._on_step_complete: Optional[Callable[[StepResult], None]] = None + self._on_state_change: Optional[Callable[[ExecutionState], None]] = None + self._on_error: Optional[Callable[[str, Exception], None]] = None + + # Répertoire temporaire pour screenshots + self._temp_dir = Path(tempfile.mkdtemp(prefix="rpa_vision_")) + + # GPU Resource Manager + self._gpu_manager = None + if GPU_MANAGER_AVAILABLE: + try: + self._gpu_manager = get_gpu_resource_manager() + logger.info("GPU Resource Manager integrated") + except Exception as e: + logger.warning(f"GPU Resource Manager not available: {e}") + + # Continuous Learner pour apprentissage continu + self._continuous_learner = None + if CONTINUOUS_LEARNER_AVAILABLE: + try: + self._continuous_learner = ContinuousLearner() + logger.info("Continuous Learner integrated") + except Exception as e: + logger.warning(f"Continuous Learner not available: {e}") + + # Historique des confidences pour détection de dérive + self._confidence_history: Dict[str, List[float]] = {} + + # Execution Robustness pour retry et récupération + self._robustness = None + if ROBUSTNESS_AVAILABLE: + try: + self._robustness = ExecutionRobustness(pipeline) + logger.info("Execution Robustness integrated") + except Exception as e: + logger.warning(f"Execution Robustness not available: {e}") + + # Analytics integration pour métriques et insights + self._analytics_integration = None + if ANALYTICS_AVAILABLE: + try: + analytics_system = get_analytics_system() + self._analytics_integration = AnalyticsExecutionIntegration(analytics_system) + logger.info("Analytics System integrated") + except Exception as e: + logger.warning(f"Analytics System not available: {e}") + + # Feedback and Correction Pack integration pour mode COACHING + self._feedback_processor = None + self._correction_integration = None + if FEEDBACK_AVAILABLE: + try: + self._feedback_processor = FeedbackProcessor(auto_integrate_corrections=True) + self._correction_integration = get_correction_pack_integration() + logger.info("Feedback and Correction Pack integration enabled") + except Exception as e: + logger.warning(f"Feedback integration not available: {e}") + + # Training Data Collector pour enregistrement des sessions COACHING + self._training_collector = None + if TRAINING_COLLECTOR_AVAILABLE: + try: + self._training_collector = TrainingDataCollector( + output_dir="data/coaching_sessions", + auto_integrate_corrections=True + ) + logger.info("Training Data Collector integrated") + except Exception as e: + logger.warning(f"Training Data Collector not available: {e}") + + # Coaching state + self._coaching_response: Optional[CoachingResponse] = None + self._coaching_stats = { + 'suggestions_made': 0, + 'accepted': 0, + 'rejected': 0, + 'corrected': 0, + 'manual_executions': 0 + } + + logger.info("ExecutionLoop initialized") + + # ========================================================================= + # Contrôle de l'exécution + # ========================================================================= + + def start( + self, + workflow_id: str, + mode: Optional[ExecutionMode] = None, + start_node_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None + ) -> bool: + """ + Démarrer l'exécution d'un workflow. + + Args: + workflow_id: ID du workflow à exécuter + mode: Mode d'exécution (auto-détecté si None) + start_node_id: Node de départ (entry_node si None) + variables: Variables initiales + + Returns: + True si démarré avec succès + """ + with self._lock: + if self.state == ExecutionState.RUNNING: + logger.warning("Execution already running") + return False + + # Charger le workflow + workflow = self.pipeline.load_workflow(workflow_id) + if not workflow: + logger.error(f"Workflow not found: {workflow_id}") + return False + + # Déterminer le mode d'exécution + if mode is None: + mode = self._determine_execution_mode(workflow) + + # Créer le contexte + self.context = ExecutionContext( + workflow_id=workflow_id, + execution_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}", + mode=mode, + started_at=datetime.now(), + current_node_id=start_node_id or (workflow.entry_nodes[0] if workflow.entry_nodes else None), + variables=variables or {} + ) + + self._stop_requested = False + self._pause_requested = False + self.state = ExecutionState.RUNNING + + # Notify GPU Resource Manager about execution mode + self._update_gpu_mode(mode) + + # Notify Analytics about execution start + if self._analytics_integration: + try: + self._analytics_integration.on_execution_start( + workflow_id=workflow_id, + execution_id=self.context.execution_id, + mode=mode.value + ) + # Start resource monitoring for this execution + self._analytics_integration.start_resource_monitoring( + execution_id=self.context.execution_id + ) + except Exception as e: + logger.warning(f"Analytics notification failed: {e}") + + # Démarrer le thread d'exécution + self._thread = threading.Thread(target=self._execution_loop, daemon=True) + self._thread.start() + + logger.info(f"Started execution of {workflow_id} in {mode.value} mode") + self._notify_state_change(ExecutionState.RUNNING) + + return True + + def stop(self) -> None: + """Arrêter l'exécution.""" + with self._lock: + self._stop_requested = True + logger.info("Stop requested") + + def pause(self) -> None: + """Mettre en pause l'exécution.""" + with self._lock: + self._pause_requested = True + self.state = ExecutionState.PAUSED + logger.info("Pause requested") + self._notify_state_change(ExecutionState.PAUSED) + + def resume(self) -> None: + """Reprendre l'exécution après pause.""" + with self._lock: + self._pause_requested = False + self.state = ExecutionState.RUNNING + logger.info("Resumed") + self._notify_state_change(ExecutionState.RUNNING) + + def confirm_action(self, approved: bool = True) -> None: + """Confirmer ou rejeter une action en attente.""" + with self._lock: + if self.state == ExecutionState.WAITING_CONFIRMATION: + self._confirmation_result = approved + self.state = ExecutionState.RUNNING + + # ========================================================================= + # Boucle principale + # ========================================================================= + + def _execution_loop(self) -> None: + """Boucle principale d'exécution (tourne dans un thread).""" + no_match_count = 0 + + # Démarrer la capture continue si activée + if self._use_continuous_capture: + self.screen_capturer.start_continuous( + callback=self._on_frame_captured, + interval_ms=self.capture_interval_ms, + skip_unchanged=True + ) + logger.info("Started continuous screen capture") + + try: + while not self._stop_requested: + # Vérifier pause + while self._pause_requested and not self._stop_requested: + time.sleep(0.1) + + if self._stop_requested: + break + + # Exécuter une étape + step_result = self._execute_step() + + if step_result is None: + # Pas de match trouvé + no_match_count += 1 + if no_match_count >= self.max_no_match_retries: + logger.error(f"No match found after {no_match_count} retries") + self._handle_no_match() + break + time.sleep(self.capture_interval_ms / 1000.0) + continue + + no_match_count = 0 # Reset counter + + # Notifier le résultat + if self._on_step_complete: + self._on_step_complete(step_result) + + # Enregistrer dans l'historique + self.context.history.append({ + "timestamp": datetime.now().isoformat(), + "node_id": step_result.node_id, + "edge_id": step_result.edge_id, + "success": step_result.success, + "confidence": step_result.match_confidence + }) + + # Notify Analytics about step completion + if self._analytics_integration and step_result: + try: + self._analytics_integration.on_step_complete( + workflow_id=self.context.workflow_id, + execution_id=self.context.execution_id, + step_id=step_result.node_id, + success=step_result.success, + duration_ms=step_result.duration_ms, + confidence=step_result.match_confidence + ) + except Exception as e: + logger.warning(f"Analytics step notification failed: {e}") + + # Vérifier si workflow terminé + if self._is_workflow_complete(): + logger.info("Workflow completed successfully") + self.state = ExecutionState.COMPLETED + self._notify_state_change(ExecutionState.COMPLETED) + break + + # Attendre avant prochaine capture + time.sleep(self.capture_interval_ms / 1000.0) + + except Exception as e: + logger.exception(f"Execution loop error: {e}") + self.state = ExecutionState.FAILED + self._notify_state_change(ExecutionState.FAILED) + if self._on_error: + self._on_error("execution_loop", e) + + finally: + # Arrêter la capture continue + if self._use_continuous_capture: + self.screen_capturer.stop_continuous() + logger.info("Stopped continuous screen capture") + + if self._stop_requested: + self.state = ExecutionState.STOPPED + self._notify_state_change(ExecutionState.STOPPED) + + # Notify Analytics about execution completion + if self._analytics_integration and self.context: + try: + success = self.state == ExecutionState.COMPLETED + duration_ms = (datetime.now() - self.context.started_at).total_seconds() * 1000 + + # Stop resource monitoring + self._analytics_integration.stop_resource_monitoring( + execution_id=self.context.execution_id + ) + + self._analytics_integration.on_execution_complete( + workflow_id=self.context.workflow_id, + execution_id=self.context.execution_id, + success=success, + duration_ms=duration_ms, + steps_executed=self.context.steps_executed, + steps_succeeded=self.context.steps_succeeded, + steps_failed=self.context.steps_failed, + error_message=None if success else f"Execution ended in state: {self.state.value}" + ) + except Exception as e: + logger.warning(f"Analytics completion notification failed: {e}") + + logger.info(f"Execution loop ended: {self.state.value}") + + def _execute_step(self) -> Optional[StepResult]: + """ + Exécuter une étape du workflow. + + Returns: + StepResult ou None si pas de match + """ + start_time = time.time() + + # 1. Capturer l'écran + screenshot_path = self._capture_screen() + if not screenshot_path: + logger.warning("Failed to capture screen") + return None + + self.context.last_screenshot_path = screenshot_path + + # 2. Identifier l'état actuel (matching) + match = self.pipeline.match_current_state( + screenshot_path, + workflow_id=self.context.workflow_id + ) + + if not match: + logger.debug("No match found for current screen") + return None + + current_node_id = match["node_id"] + confidence = match["confidence"] + self.context.current_node_id = current_node_id + self.context.last_match_confidence = confidence + + logger.info(f"Matched node: {current_node_id} (confidence: {confidence:.3f})") + + # 3. Obtenir la prochaine action + next_action = self.pipeline.get_next_action( + self.context.workflow_id, + current_node_id + ) + + if not next_action: + # Pas d'action suivante = fin du workflow ou node terminal + return StepResult( + success=True, + node_id=current_node_id, + edge_id=None, + action_result=None, + match_confidence=confidence, + duration_ms=(time.time() - start_time) * 1000, + message="No next action (terminal node)", + screenshot_path=screenshot_path + ) + + edge_id = next_action["edge_id"] + self.context.current_edge_id = edge_id + + # 4. Selon le mode, exécuter ou suggérer + action_result = None + + if self.context.mode == ExecutionMode.OBSERVATION: + # Mode observation : ne rien faire + logger.info(f"[OBSERVATION] Would execute: {next_action['action']}") + + elif self.context.mode == ExecutionMode.COACHING: + # Mode coaching : suggérer l'action et attendre décision utilisateur + logger.info(f"[COACHING] Suggested action: {next_action['action']}") + self._coaching_stats['suggestions_made'] += 1 + + # Demander la décision à l'utilisateur + coaching_response = self._request_coaching_decision(next_action, screenshot_path) + + if coaching_response.decision == CoachingDecision.ACCEPT: + # Utilisateur accepte : exécuter l'action suggérée + self._coaching_stats['accepted'] += 1 + action_result = self._execute_action(next_action) + self._record_coaching_feedback( + next_action, coaching_response, action_result, success=True + ) + + elif coaching_response.decision == CoachingDecision.REJECT: + # Utilisateur rejette : ne pas exécuter + self._coaching_stats['rejected'] += 1 + self._record_coaching_feedback( + next_action, coaching_response, None, success=False + ) + return StepResult( + success=False, + node_id=current_node_id, + edge_id=edge_id, + action_result=None, + match_confidence=confidence, + duration_ms=(time.time() - start_time) * 1000, + message="Action rejected by user in COACHING mode", + screenshot_path=screenshot_path + ) + + elif coaching_response.decision == CoachingDecision.CORRECT: + # Utilisateur fournit une correction + self._coaching_stats['corrected'] += 1 + corrected_action = self._apply_coaching_correction( + next_action, coaching_response.correction + ) + action_result = self._execute_action(corrected_action) + self._record_coaching_feedback( + next_action, coaching_response, action_result, + success=action_result.status == ExecutionStatus.SUCCESS if action_result else False + ) + + elif coaching_response.decision == CoachingDecision.EXECUTE_MANUAL: + # Utilisateur a exécuté manuellement + self._coaching_stats['manual_executions'] += 1 + self._record_coaching_feedback( + next_action, coaching_response, None, success=True, manual=True + ) + # Pas besoin d'exécuter, l'utilisateur l'a fait + + elif coaching_response.decision == CoachingDecision.SKIP: + # Sauter cette action + self._record_coaching_feedback( + next_action, coaching_response, None, success=True + ) + # Continuer sans exécuter + + + elif self.context.mode == ExecutionMode.SUPERVISED: + # Mode supervisé : demander confirmation + if not self._request_confirmation(next_action): + logger.info("Action rejected by user") + return StepResult( + success=False, + node_id=current_node_id, + edge_id=edge_id, + action_result=None, + match_confidence=confidence, + duration_ms=(time.time() - start_time) * 1000, + message="Action rejected by user", + screenshot_path=screenshot_path + ) + + # Exécuter l'action + action_result = self._execute_action(next_action) + + elif self.context.mode == ExecutionMode.AUTOMATIC: + # Mode automatique : exécuter directement + action_result = self._execute_action(next_action) + + # 5. Mettre à jour les compteurs + self.context.steps_executed += 1 + if action_result and action_result.status == ExecutionStatus.SUCCESS: + self.context.steps_succeeded += 1 + elif action_result: + self.context.steps_failed += 1 + + duration_ms = (time.time() - start_time) * 1000 + + return StepResult( + success=action_result.status == ExecutionStatus.SUCCESS if action_result else True, + node_id=current_node_id, + edge_id=edge_id, + action_result=action_result, + match_confidence=confidence, + duration_ms=duration_ms, + message=action_result.message if action_result else "Observed", + screenshot_path=screenshot_path + ) + + # ========================================================================= + # Méthodes utilitaires + # ========================================================================= + + def _capture_screen(self) -> Optional[str]: + """Capturer l'écran et sauvegarder dans un fichier temporaire.""" + try: + screenshot = self.screen_capturer.capture_screen() + if screenshot is None: + return None + + # Sauvegarder dans fichier temporaire + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + screenshot_path = self._temp_dir / f"capture_{timestamp}.png" + screenshot.save(str(screenshot_path)) + + return str(screenshot_path) + + except Exception as e: + logger.error(f"Screen capture failed: {e}") + return None + + def _execute_action(self, action_info: Dict[str, Any]) -> ExecutionResult: + """Exécuter une action via l'ActionExecutor.""" + try: + # Charger le workflow et l'edge + workflow = self.pipeline.load_workflow(self.context.workflow_id) + edge = workflow.get_edge(action_info["edge_id"]) + + if not edge: + return ExecutionResult( + status=ExecutionStatus.FAILED, + message=f"Edge not found: {action_info['edge_id']}", + duration_ms=0 + ) + + # Créer un ScreenState minimal pour l'exécution + from core.models.screen_state import ( + ScreenState, WindowContext, RawLevel, PerceptionLevel, + ContextLevel, EmbeddingRef + ) + + screen_state = ScreenState( + screen_state_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}", + timestamp=datetime.now(), + session_id=self.context.execution_id, + window=WindowContext( + app_name="unknown", + window_title="Unknown", + screen_resolution=[1920, 1080], + workspace="main" + ), + raw=RawLevel( + screenshot_path=self.context.last_screenshot_path or "", + capture_method="execution", + file_size_bytes=0 + ), + perception=PerceptionLevel( + embedding=EmbeddingRef(provider="", vector_id="", dimensions=512), + detected_text=[], + text_detection_method="none", + confidence_avg=0.0 + ), + context=ContextLevel(), + ui_elements=[] + ) + + # Exécuter l'action + result = self.action_executor.execute_edge( + edge, + screen_state, + context=self.context.variables + ) + + logger.info(f"Action executed: {result.status.value} - {result.message}") + return result + + except Exception as e: + logger.exception(f"Action execution failed: {e}") + return ExecutionResult( + status=ExecutionStatus.FAILED, + message=str(e), + duration_ms=0, + error=e + ) + + def _request_confirmation(self, action_info: Dict[str, Any]) -> bool: + """Demander confirmation à l'utilisateur.""" + if self.confirmation_callback: + return self.confirmation_callback( + f"Execute action: {action_info['action']}?", + action_info + ) + + # Sans callback, attendre la confirmation via confirm_action() + self.state = ExecutionState.WAITING_CONFIRMATION + self._notify_state_change(ExecutionState.WAITING_CONFIRMATION) + self._confirmation_result = None + + # Attendre la confirmation (timeout 60s) + timeout = 60 + start = time.time() + while self._confirmation_result is None and (time.time() - start) < timeout: + if self._stop_requested: + return False + time.sleep(0.1) + + return self._confirmation_result or False + + # ========================================================================= + # Méthodes COACHING + # ========================================================================= + + def _request_coaching_decision( + self, + action_info: Dict[str, Any], + screenshot_path: Optional[str] = None + ) -> CoachingResponse: + """ + Demander une décision à l'utilisateur en mode COACHING. + + Args: + action_info: Information sur l'action suggérée + screenshot_path: Chemin vers la capture d'écran actuelle + + Returns: + CoachingResponse avec la décision de l'utilisateur + """ + # Si callback fourni, l'utiliser + if self.coaching_callback: + try: + return self.coaching_callback( + f"Suggested action: {action_info.get('action', 'unknown')}", + { + **action_info, + 'screenshot_path': screenshot_path, + 'context': { + 'workflow_id': self.context.workflow_id if self.context else None, + 'execution_id': self.context.execution_id if self.context else None, + 'step': self.context.steps_executed if self.context else 0 + } + } + ) + except Exception as e: + logger.error(f"Coaching callback error: {e}") + # En cas d'erreur, retourner SKIP par défaut + return CoachingResponse(decision=CoachingDecision.SKIP) + + # Sans callback, attendre la réponse via submit_coaching_decision() + self.state = ExecutionState.WAITING_COACHING + self._notify_state_change(ExecutionState.WAITING_COACHING) + self._coaching_response = None + + # Attendre la réponse (timeout 120s pour COACHING car plus complexe) + timeout = 120 + start = time.time() + while self._coaching_response is None and (time.time() - start) < timeout: + if self._stop_requested: + return CoachingResponse(decision=CoachingDecision.REJECT) + time.sleep(0.1) + + # Timeout = SKIP par défaut + if self._coaching_response is None: + logger.warning("Coaching decision timeout - skipping action") + return CoachingResponse(decision=CoachingDecision.SKIP) + + return self._coaching_response + + def submit_coaching_decision(self, response: CoachingResponse) -> bool: + """ + Soumettre une décision COACHING depuis l'extérieur (API, UI). + + Args: + response: Réponse COACHING de l'utilisateur + + Returns: + True si la décision a été acceptée + """ + if self.state != ExecutionState.WAITING_COACHING: + logger.warning(f"Cannot submit coaching decision in state {self.state}") + return False + + self._coaching_response = response + logger.info(f"Coaching decision received: {response.decision.value}") + return True + + def _apply_coaching_correction( + self, + original_action: Dict[str, Any], + correction: Optional[Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Appliquer une correction utilisateur à une action. + + Args: + original_action: Action originale suggérée + correction: Correction fournie par l'utilisateur + + Returns: + Action corrigée + """ + if not correction: + return original_action + + corrected = {**original_action} + + # Appliquer les corrections de cible + if 'target' in correction: + corrected['target'] = correction['target'] + if 'corrected_target' in correction: + corrected['target'] = correction['corrected_target'] + + # Appliquer les corrections de paramètres + if 'params' in correction: + corrected['params'] = {**corrected.get('params', {}), **correction['params']} + if 'corrected_params' in correction: + corrected['params'] = {**corrected.get('params', {}), **correction['corrected_params']} + + # Appliquer changement d'action + if 'action' in correction: + corrected['action'] = correction['action'] + + # Appliquer timing + if 'wait_before' in correction: + corrected['wait_before'] = correction['wait_before'] + if 'wait_after' in correction: + corrected['wait_after'] = correction['wait_after'] + + logger.info(f"Applied coaching correction: {list(correction.keys())}") + return corrected + + def _record_coaching_feedback( + self, + action_info: Dict[str, Any], + response: CoachingResponse, + result: Optional[ExecutionResult], + success: bool, + manual: bool = False + ) -> None: + """ + Enregistrer le feedback COACHING pour apprentissage. + + Args: + action_info: Action suggérée + response: Réponse de l'utilisateur + result: Résultat d'exécution (si exécuté) + success: Si l'action a réussi + manual: Si l'utilisateur a exécuté manuellement + """ + if not self.context: + return + + # Préparer les données de correction + correction_data = { + 'action_type': action_info.get('action', 'unknown'), + 'element_type': action_info.get('target', {}).get('type', 'unknown'), + 'decision': response.decision.value, + 'success': success, + 'manual_execution': manual, + 'feedback': response.feedback, + 'workflow_id': self.context.workflow_id, + 'node_id': self.context.current_node_id, + 'step': self.context.steps_executed + } + + # Si correction fournie, l'ajouter + if response.correction: + correction_data['correction_type'] = 'user_correction' + correction_data['original_target'] = action_info.get('target') + correction_data['original_params'] = action_info.get('params') + correction_data['corrected_target'] = response.correction.get('target') or response.correction.get('corrected_target') + correction_data['corrected_params'] = response.correction.get('params') or response.correction.get('corrected_params') + + # Enregistrer via TrainingDataCollector + if self._training_collector: + try: + # S'assurer qu'une session est active + if not self._training_collector.current_session: + self._training_collector.start_session( + session_id=self.context.execution_id, + workflow_id=self.context.workflow_id + ) + + # Enregistrer l'action + self._training_collector.record_action({ + 'action': action_info.get('action'), + 'target': action_info.get('target'), + 'decision': response.decision.value, + 'success': success + }) + + # Si correction, l'enregistrer (propagera auto vers Correction Packs) + if response.correction or response.decision == CoachingDecision.REJECT: + self._training_collector.record_correction(correction_data) + + except Exception as e: + logger.warning(f"Error recording coaching feedback: {e}") + + # Enregistrer via FeedbackProcessor pour statistiques + if self._feedback_processor and not success: + try: + feedback_type = FeedbackType.INCORRECT if response.decision == CoachingDecision.REJECT else FeedbackType.PARTIAL + self._feedback_processor.process_feedback( + workflow_id=self.context.workflow_id, + execution_id=self.context.execution_id, + feedback_type=feedback_type, + confidence=self.context.last_match_confidence, + corrections=response.correction, + context={ + 'action': action_info.get('action'), + 'element_type': action_info.get('target', {}).get('type'), + 'failure_reason': response.feedback or 'user_rejected' + } + ) + except Exception as e: + logger.warning(f"Error processing coaching feedback: {e}") + + logger.info( + f"Coaching feedback recorded: decision={response.decision.value}, " + f"success={success}, has_correction={response.correction is not None}" + ) + + def get_coaching_stats(self) -> Dict[str, Any]: + """Récupérer les statistiques COACHING de la session courante.""" + stats = {**self._coaching_stats} + if stats['suggestions_made'] > 0: + stats['acceptance_rate'] = stats['accepted'] / stats['suggestions_made'] + stats['correction_rate'] = stats['corrected'] / stats['suggestions_made'] + else: + stats['acceptance_rate'] = 0.0 + stats['correction_rate'] = 0.0 + return stats + + def _determine_execution_mode(self, workflow: Workflow) -> ExecutionMode: + """Déterminer le mode d'exécution basé sur le learning_state.""" + learning_state = workflow.learning_state + + if learning_state == "OBSERVATION": + return ExecutionMode.OBSERVATION + elif learning_state == "COACHING": + return ExecutionMode.COACHING + elif learning_state == "AUTO_CANDIDATE": + return ExecutionMode.SUPERVISED + elif learning_state == "AUTO_CONFIRMED": + return ExecutionMode.AUTOMATIC + else: + return ExecutionMode.OBSERVATION + + def _is_workflow_complete(self) -> bool: + """Vérifier si le workflow est terminé.""" + if not self.context or not self.context.current_node_id: + return False + + workflow = self.pipeline.load_workflow(self.context.workflow_id) + if not workflow: + return True + + # Vérifier si on est sur un node terminal + return self.context.current_node_id in workflow.end_nodes + + def _handle_no_match(self) -> None: + """Gérer le cas où aucun match n'est trouvé.""" + logger.warning("No match found - pausing execution") + self.state = ExecutionState.PAUSED + self._notify_state_change(ExecutionState.PAUSED) + + if self._on_error: + self._on_error("no_match", Exception("No matching node found")) + + def _notify_state_change(self, new_state: ExecutionState) -> None: + """Notifier un changement d'état.""" + if self._on_state_change: + try: + self._on_state_change(new_state) + except Exception as e: + logger.error(f"State change callback error: {e}") + + # ========================================================================= + # Callbacks et événements + # ========================================================================= + + def on_step_complete(self, callback: Callable[[StepResult], None]) -> None: + """Enregistrer un callback pour chaque étape complétée.""" + self._on_step_complete = callback + + def on_state_change(self, callback: Callable[[ExecutionState], None]) -> None: + """Enregistrer un callback pour les changements d'état.""" + self._on_state_change = callback + + def on_error(self, callback: Callable[[str, Exception], None]) -> None: + """Enregistrer un callback pour les erreurs.""" + self._on_error = callback + + # ========================================================================= + # Getters + # ========================================================================= + + def get_state(self) -> ExecutionState: + """Obtenir l'état actuel.""" + return self.state + + def get_context(self) -> Optional[ExecutionContext]: + """Obtenir le contexte d'exécution.""" + return self.context + + def get_progress(self) -> Dict[str, Any]: + """Obtenir la progression de l'exécution.""" + if not self.context: + return {"status": "idle"} + + return { + "status": self.state.value, + "workflow_id": self.context.workflow_id, + "execution_id": self.context.execution_id, + "mode": self.context.mode.value, + "current_node": self.context.current_node_id, + "steps_executed": self.context.steps_executed, + "steps_succeeded": self.context.steps_succeeded, + "steps_failed": self.context.steps_failed, + "last_confidence": self.context.last_match_confidence, + "duration_seconds": (datetime.now() - self.context.started_at).total_seconds() + } + + def cleanup(self) -> None: + """Nettoyer les ressources temporaires.""" + import shutil + try: + if self._temp_dir.exists(): + shutil.rmtree(self._temp_dir) + except Exception as e: + logger.warning(f"Cleanup failed: {e}") + + # ========================================================================= + # Capture continue + # ========================================================================= + + def _on_frame_captured(self, frame: CaptureFrame) -> None: + """Callback appelé pour chaque frame capturé en mode continu.""" + with self._frame_queue_lock: + self._last_frame = frame + self._frame_queue.append(frame) + # Garder seulement les 10 derniers frames en queue + if len(self._frame_queue) > 10: + self._frame_queue.pop(0) + + logger.debug(f"Frame captured: {frame.frame_id} (changed={frame.changed_from_previous})") + + def _get_latest_frame(self) -> Optional[CaptureFrame]: + """Obtenir le dernier frame capturé.""" + with self._frame_queue_lock: + return self._last_frame + + def get_capture_stats(self) -> Dict[str, Any]: + """Obtenir les statistiques de capture.""" + stats = self.screen_capturer.get_stats() + return { + "total_captures": stats.total_captures, + "captures_per_second": stats.captures_per_second, + "unchanged_skipped": stats.unchanged_frames_skipped, + "avg_capture_time_ms": stats.average_capture_time_ms, + "buffer_size": stats.buffer_size, + "memory_mb": stats.memory_usage_mb + } + + def set_continuous_capture(self, enabled: bool) -> None: + """Activer/désactiver la capture continue.""" + self._use_continuous_capture = enabled + logger.info(f"Continuous capture {'enabled' if enabled else 'disabled'}") + + # ========================================================================= + # Target Resolution avancée + # ========================================================================= + + def resolve_target_advanced( + self, + target_spec: Any, + screen_state: ScreenState + ) -> Optional[ResolvedTarget]: + """ + Résoudre une cible avec le resolver avancé. + + Args: + target_spec: Spécification de la cible + screen_state: État actuel de l'écran + + Returns: + ResolvedTarget ou None + """ + return self.target_resolver.resolve_target(target_spec, screen_state) + + def get_resolver_stats(self) -> Dict[str, Any]: + """Obtenir les statistiques du resolver.""" + return self.target_resolver.get_stats() + + # ========================================================================= + # GPU Resource Management + # ========================================================================= + + def _update_gpu_mode(self, mode: ExecutionMode) -> None: + """ + Update GPU Resource Manager based on execution mode. + + Args: + mode: Current execution mode + """ + if not self._gpu_manager or not GPU_MANAGER_AVAILABLE: + return + + try: + import asyncio + + # Map ExecutionLoop mode to GPU mode + if mode == ExecutionMode.AUTOMATIC: + gpu_mode = GPUExecutionMode.AUTOPILOT + elif mode in [ExecutionMode.OBSERVATION, ExecutionMode.COACHING]: + gpu_mode = GPUExecutionMode.RECORDING + else: + gpu_mode = GPUExecutionMode.IDLE + + # Run async mode change + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._gpu_manager.set_execution_mode(gpu_mode)) + loop.close() + + logger.info(f"GPU mode updated to {gpu_mode.value}") + + except Exception as e: + logger.warning(f"Failed to update GPU mode: {e}") + + # ========================================================================= + # Continuous Learning Integration + # ========================================================================= + + def _update_prototype_on_success( + self, + node_id: str, + embedding: Any + ) -> None: + """ + Mettre à jour le prototype après une exécution réussie. + + Args: + node_id: ID du node + embedding: Embedding de l'état actuel + """ + if not self._continuous_learner or not CONTINUOUS_LEARNER_AVAILABLE: + return + + try: + import numpy as np + if hasattr(embedding, 'get_vector'): + vector = embedding.get_vector() + elif isinstance(embedding, np.ndarray): + vector = embedding + else: + return + + self._continuous_learner.update_prototype( + node_id=node_id, + new_embedding=vector, + execution_success=True + ) + logger.debug(f"Prototype updated for node {node_id}") + + except Exception as e: + logger.warning(f"Failed to update prototype: {e}") + + def _check_drift( + self, + node_id: str, + confidence: float + ) -> Optional[DriftStatus]: + """ + Vérifier la dérive UI après un match. + + Args: + node_id: ID du node matché + confidence: Confiance du match + + Returns: + DriftStatus si dérive détectée + """ + if not self._continuous_learner or not CONTINUOUS_LEARNER_AVAILABLE: + return None + + try: + # Ajouter à l'historique + if node_id not in self._confidence_history: + self._confidence_history[node_id] = [] + self._confidence_history[node_id].append(confidence) + + # Garder les 10 dernières + self._confidence_history[node_id] = self._confidence_history[node_id][-10:] + + # Vérifier dérive + drift_status = self._continuous_learner.detect_drift( + node_id=node_id, + recent_confidences=self._confidence_history[node_id] + ) + + if drift_status.is_drifting: + logger.warning( + f"Drift detected for node {node_id}: " + f"severity={drift_status.drift_severity:.2f}, " + f"action={drift_status.recommended_action}" + ) + + # Notifier via callback si disponible + if self._on_error: + self._on_error( + "drift_detected", + Exception(f"UI drift detected for node {node_id}") + ) + + return drift_status + + except Exception as e: + logger.warning(f"Failed to check drift: {e}") + return None + + def get_learning_stats(self) -> Dict[str, Any]: + """Obtenir les statistiques d'apprentissage continu.""" + stats = { + "continuous_learner_available": CONTINUOUS_LEARNER_AVAILABLE, + "confidence_history": {} + } + + for node_id, confidences in self._confidence_history.items(): + if confidences: + import numpy as np + stats["confidence_history"][node_id] = { + "count": len(confidences), + "mean": float(np.mean(confidences)), + "min": float(min(confidences)), + "max": float(max(confidences)) + } + + return stats + + +# ============================================================================= +# Factory function +# ============================================================================= + +def create_execution_loop( + pipeline: "WorkflowPipeline", + capture_interval_ms: int = 500 +) -> ExecutionLoop: + """ + Créer une boucle d'exécution avec configuration par défaut. + + Args: + pipeline: WorkflowPipeline + capture_interval_ms: Intervalle de capture + + Returns: + ExecutionLoop configuré + """ + return ExecutionLoop( + pipeline=pipeline, + capture_interval_ms=capture_interval_ms + ) diff --git a/tests/test_coaching_loop.py b/tests/test_coaching_loop.py new file mode 100644 index 000000000..211318d67 --- /dev/null +++ b/tests/test_coaching_loop.py @@ -0,0 +1,278 @@ +""" +Tests for COACHING mode in ExecutionLoop. + +Tests the coaching decision flow, corrections, and integration +with Correction Packs. +""" + +import pytest +from unittest.mock import MagicMock, patch +from datetime import datetime + +from core.execution.execution_loop import ( + ExecutionLoop, + ExecutionMode, + ExecutionState, + ExecutionContext, + CoachingDecision, + CoachingResponse, + StepResult +) + + +class TestCoachingDecision: + """Tests for CoachingDecision enum.""" + + def test_all_decisions_exist(self): + """Verify all expected decisions exist.""" + assert CoachingDecision.ACCEPT.value == "accept" + assert CoachingDecision.REJECT.value == "reject" + assert CoachingDecision.CORRECT.value == "correct" + assert CoachingDecision.EXECUTE_MANUAL.value == "manual" + assert CoachingDecision.SKIP.value == "skip" + + +class TestCoachingResponse: + """Tests for CoachingResponse dataclass.""" + + def test_basic_response(self): + """Test creating basic response.""" + response = CoachingResponse(decision=CoachingDecision.ACCEPT) + assert response.decision == CoachingDecision.ACCEPT + assert response.correction is None + assert response.feedback is None + assert response.executed_manually is False + + def test_response_with_correction(self): + """Test response with correction data.""" + correction = { + 'target': {'id': 'new_button'}, + 'params': {'timeout': 5} + } + response = CoachingResponse( + decision=CoachingDecision.CORRECT, + correction=correction, + feedback="Button moved to new location" + ) + assert response.decision == CoachingDecision.CORRECT + assert response.correction == correction + assert response.feedback == "Button moved to new location" + + def test_manual_execution_response(self): + """Test response for manual execution.""" + response = CoachingResponse( + decision=CoachingDecision.EXECUTE_MANUAL, + executed_manually=True + ) + assert response.decision == CoachingDecision.EXECUTE_MANUAL + assert response.executed_manually is True + + +class TestExecutionLoopCoaching: + """Tests for ExecutionLoop COACHING mode.""" + + @pytest.fixture + def mock_pipeline(self): + """Create mock pipeline.""" + pipeline = MagicMock() + pipeline.load_workflow.return_value = MagicMock( + workflow_id="test_wf", + learning_state="COACHING", + entry_nodes=["node_1"] + ) + return pipeline + + @pytest.fixture + def coaching_callback(self): + """Create mock coaching callback.""" + return MagicMock(return_value=CoachingResponse( + decision=CoachingDecision.ACCEPT + )) + + def test_init_with_coaching_callback(self, mock_pipeline, coaching_callback): + """Test ExecutionLoop init with coaching callback.""" + loop = ExecutionLoop( + pipeline=mock_pipeline, + coaching_callback=coaching_callback + ) + assert loop.coaching_callback == coaching_callback + assert loop._coaching_stats['suggestions_made'] == 0 + + def test_coaching_stats_initial(self, mock_pipeline): + """Test initial coaching stats.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + stats = loop.get_coaching_stats() + + assert stats['suggestions_made'] == 0 + assert stats['accepted'] == 0 + assert stats['rejected'] == 0 + assert stats['corrected'] == 0 + assert stats['manual_executions'] == 0 + assert stats['acceptance_rate'] == 0.0 + assert stats['correction_rate'] == 0.0 + + def test_apply_coaching_correction_target(self, mock_pipeline): + """Test applying target correction.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + + original = { + 'action': 'click', + 'target': {'id': 'old_button'}, + 'params': {'timeout': 10} + } + correction = { + 'target': {'id': 'new_button'} + } + + corrected = loop._apply_coaching_correction(original, correction) + + assert corrected['target']['id'] == 'new_button' + assert corrected['action'] == 'click' + assert corrected['params']['timeout'] == 10 + + def test_apply_coaching_correction_params(self, mock_pipeline): + """Test applying params correction.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + + original = { + 'action': 'type', + 'target': {'id': 'input'}, + 'params': {'value': 'old', 'timeout': 10} + } + correction = { + 'params': {'value': 'new_value', 'delay': 0.1} + } + + corrected = loop._apply_coaching_correction(original, correction) + + assert corrected['params']['value'] == 'new_value' + assert corrected['params']['delay'] == 0.1 + assert corrected['params']['timeout'] == 10 + + def test_apply_coaching_correction_action(self, mock_pipeline): + """Test applying action type correction.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + + original = { + 'action': 'click', + 'target': {'id': 'element'} + } + correction = { + 'action': 'double_click' + } + + corrected = loop._apply_coaching_correction(original, correction) + + assert corrected['action'] == 'double_click' + + def test_apply_coaching_correction_none(self, mock_pipeline): + """Test that None correction returns original.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + + original = {'action': 'click', 'target': {'id': 'btn'}} + corrected = loop._apply_coaching_correction(original, None) + + assert corrected == original + + def test_submit_coaching_decision_wrong_state(self, mock_pipeline): + """Test submitting decision in wrong state fails.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + loop.state = ExecutionState.RUNNING + + response = CoachingResponse(decision=CoachingDecision.ACCEPT) + result = loop.submit_coaching_decision(response) + + assert result is False + + def test_submit_coaching_decision_correct_state(self, mock_pipeline): + """Test submitting decision in correct state succeeds.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + loop.state = ExecutionState.WAITING_COACHING + + response = CoachingResponse(decision=CoachingDecision.ACCEPT) + result = loop.submit_coaching_decision(response) + + assert result is True + assert loop._coaching_response == response + + def test_request_coaching_decision_with_callback(self, mock_pipeline, coaching_callback): + """Test requesting coaching decision uses callback.""" + loop = ExecutionLoop( + pipeline=mock_pipeline, + coaching_callback=coaching_callback + ) + loop.context = ExecutionContext( + workflow_id="test_wf", + execution_id="exec_001", + mode=ExecutionMode.COACHING, + started_at=datetime.now() + ) + + action_info = {'action': 'click', 'target': {'id': 'btn'}} + response = loop._request_coaching_decision(action_info) + + assert response.decision == CoachingDecision.ACCEPT + coaching_callback.assert_called_once() + + +class TestCoachingIntegration: + """Integration tests for COACHING mode.""" + + @pytest.fixture + def mock_pipeline(self): + """Create mock pipeline.""" + pipeline = MagicMock() + workflow = MagicMock() + workflow.workflow_id = "test_wf" + workflow.learning_state = "COACHING" + workflow.entry_nodes = ["node_1"] + pipeline.load_workflow.return_value = workflow + return pipeline + + def test_determine_execution_mode_coaching(self, mock_pipeline): + """Test that COACHING learning state maps to COACHING mode.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + workflow = mock_pipeline.load_workflow() + + mode = loop._determine_execution_mode(workflow) + + assert mode == ExecutionMode.COACHING + + def test_coaching_stats_update(self, mock_pipeline): + """Test that coaching stats update correctly.""" + loop = ExecutionLoop(pipeline=mock_pipeline) + + # Simulate stats updates + loop._coaching_stats['suggestions_made'] = 10 + loop._coaching_stats['accepted'] = 7 + loop._coaching_stats['corrected'] = 2 + loop._coaching_stats['rejected'] = 1 + + stats = loop.get_coaching_stats() + + assert stats['suggestions_made'] == 10 + assert stats['acceptance_rate'] == 0.7 + assert stats['correction_rate'] == 0.2 + + +class TestExecutionStateCoaching: + """Tests for WAITING_COACHING state.""" + + def test_waiting_coaching_state_exists(self): + """Verify WAITING_COACHING state exists.""" + assert ExecutionState.WAITING_COACHING.value == "coaching" + + def test_state_transitions(self): + """Test state can transition to WAITING_COACHING.""" + # This tests that the enum value is valid + states = [ + ExecutionState.IDLE, + ExecutionState.RUNNING, + ExecutionState.WAITING_COACHING, + ExecutionState.COMPLETED + ] + assert ExecutionState.WAITING_COACHING in states + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])