Files
rpa_vision_v3/core/execution/execution_loop.py
Dom 78ee962918 feat(matching): match_current_state_from_state consomme enrichi (Lot E)
Nouvelle méthode match_current_state_from_state(screen_state, workflow_id)
qui utilise directement le ScreenState enrichi (window_title, detected_text,
ui_elements) fourni par ExecutionLoop au lieu de reconstruire un stub
ScreenState("Unknown", ui_elements=[], ...).

Préfère HierarchicalMatcher si workflow chargeable, fallback FAISS sinon.

L'ancienne API match_current_state(screenshot_path, workflow_id) est
convertie en wrapper : appelle ScreenAnalyzer.analyze() puis délègue.
Rétrocompat préservée.

ExecutionLoop._execute_step utilise la nouvelle méthode -> plus de double
analyze() dans le chemin d'exécution (économie latence).

Premier vrai matching context-aware. 11 nouveaux tests + 2 tests
integration loop. 172 tests non-régression verts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 09:07:04 +02:00

1797 lines
72 KiB
Python

"""
ExecutionLoop - Boucle d'exécution temps réel du RPA Vision V3
C'est le cœur du système qui fait tourner le RPA :
1. Capture l'écran en continu
2. Identifie l'état actuel (matching)
3. Détermine la prochaine action
4. Exécute l'action
5. Vérifie le résultat
6. Boucle jusqu'à la fin du workflow
Modes d'exécution:
- OBSERVATION: Observe sans agir (shadow mode)
- COACHING: Suggère les actions, l'utilisateur exécute
- SUPERVISED: Exécute avec confirmation à chaque étape
- AUTOMATIC: Exécute automatiquement (pilote automatique)
"""
import logging
import time
import threading
from typing import Optional, Dict, Any, Callable, List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
import tempfile
from core.models.workflow_graph import Workflow, WorkflowEdge, LearningState
from core.models.screen_state import ScreenState
from core.execution.action_executor import ActionExecutor, ExecutionResult, ExecutionStatus
from core.capture.screen_capturer import ScreenCapturer, CaptureFrame
from core.execution.target_resolver import TargetResolver, ResolvedTarget
# Import tardif pour éviter import circulaire
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.pipeline.workflow_pipeline import WorkflowPipeline
# GPU Resource Manager integration
try:
from core.gpu import GPUResourceManager, ExecutionMode as GPUExecutionMode, get_gpu_resource_manager
GPU_MANAGER_AVAILABLE = True
except ImportError:
GPU_MANAGER_AVAILABLE = False
# Continuous Learner integration
try:
from core.learning.continuous_learner import ContinuousLearner, DriftStatus
CONTINUOUS_LEARNER_AVAILABLE = True
except ImportError:
CONTINUOUS_LEARNER_AVAILABLE = False
# Execution Robustness integration
try:
from core.execution.execution_robustness import ExecutionRobustness, RetryManager
ROBUSTNESS_AVAILABLE = True
except ImportError:
ROBUSTNESS_AVAILABLE = False
# Analytics integration
try:
from core.analytics.analytics_system import get_analytics_system
from core.analytics.integration.execution_integration import AnalyticsExecutionIntegration
ANALYTICS_AVAILABLE = True
except ImportError:
ANALYTICS_AVAILABLE = False
# Feedback and Correction integration
try:
from core.learning.feedback_processor import FeedbackProcessor, FeedbackType
from core.corrections import get_correction_pack_integration, CorrectionPackIntegration
FEEDBACK_AVAILABLE = True
except ImportError:
FEEDBACK_AVAILABLE = False
# Training Data Collector integration
try:
from core.training.training_data_collector import TrainingDataCollector
TRAINING_COLLECTOR_AVAILABLE = True
except ImportError:
TRAINING_COLLECTOR_AVAILABLE = False
logger = logging.getLogger(__name__)
class ExecutionMode(str, Enum):
"""Modes d'exécution du RPA"""
OBSERVATION = "observation" # Observer sans agir
COACHING = "coaching" # Suggérer, utilisateur exécute
SUPERVISED = "supervised" # Exécuter avec confirmation
AUTOMATIC = "automatic" # Pilote automatique
class ExecutionState(str, Enum):
"""États de la boucle d'exécution"""
IDLE = "idle" # En attente
RUNNING = "running" # En cours d'exécution
PAUSED = "paused" # En pause
WAITING_CONFIRMATION = "waiting" # Attend confirmation utilisateur
WAITING_COACHING = "coaching" # Attend décision coaching utilisateur
COMPLETED = "completed" # Terminé avec succès
FAILED = "failed" # Terminé en erreur
STOPPED = "stopped" # Arrêté manuellement
class CoachingDecision(str, Enum):
"""Décisions possibles en mode COACHING"""
ACCEPT = "accept" # Accepter et exécuter l'action suggérée
REJECT = "reject" # Rejeter l'action (passer ou terminer)
CORRECT = "correct" # Fournir une correction
EXECUTE_MANUAL = "manual" # L'utilisateur a exécuté manuellement
SKIP = "skip" # Sauter cette action
@dataclass
class CoachingResponse:
"""Réponse de l'utilisateur en mode COACHING"""
decision: CoachingDecision
correction: Optional[Dict[str, Any]] = None # Correction si decision == CORRECT
feedback: Optional[str] = None # Commentaire utilisateur
executed_manually: bool = False # True si l'utilisateur a fait l'action lui-même
@dataclass
class ExecutionContext:
"""Contexte d'une exécution de workflow"""
workflow_id: str
execution_id: str
mode: ExecutionMode
started_at: datetime
current_node_id: Optional[str] = None
current_edge_id: Optional[str] = None
steps_executed: int = 0
steps_succeeded: int = 0
steps_failed: int = 0
last_screenshot_path: Optional[str] = None
last_match_confidence: float = 0.0
variables: Dict[str, Any] = field(default_factory=dict)
history: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class StepResult:
"""Résultat d'une étape d'exécution"""
success: bool
node_id: str
edge_id: Optional[str]
action_result: Optional[ExecutionResult]
match_confidence: float
duration_ms: float
message: str
screenshot_path: Optional[str] = None
# C1 — Instrumentation vision-aware
ocr_ms: float = 0.0 # Temps OCR du ScreenState de ce step
ui_ms: float = 0.0 # Temps détection UI de ce step
total_ms: float = 0.0 # Temps total (alias de duration_ms pour cohérence)
analyze_ms: float = 0.0 # Temps total analyse ScreenState (OCR + UI + reste)
cache_hit: bool = False # True si ScreenState vient du cache
degraded: bool = False # True si mode dégradé activé (timeout analyse)
class ExecutionLoop:
"""
Boucle d'exécution principale du RPA.
Gère le cycle complet :
Capture → Matching → Action → Vérification → Boucle
Example:
>>> loop = ExecutionLoop(pipeline)
>>> loop.start("workflow_login", mode=ExecutionMode.SUPERVISED)
>>> # ... le RPA s'exécute ...
>>> loop.stop()
"""
def __init__(
self,
pipeline: "WorkflowPipeline",
action_executor: Optional[ActionExecutor] = None,
screen_capturer: Optional[ScreenCapturer] = None,
capture_interval_ms: int = 500,
max_no_match_retries: int = 5,
confirmation_callback: Optional[Callable[[str, Dict], bool]] = None,
coaching_callback: Optional[Callable[[str, Dict], "CoachingResponse"]] = None,
screen_analyzer: Optional[Any] = None,
screen_state_cache: Optional[Any] = None,
enable_ui_detection: bool = True,
enable_ocr: bool = True,
analyze_timeout_ms: int = 8000,
window_info_provider: Optional[Callable[[], Optional[Dict[str, Any]]]] = None,
):
"""
Initialiser la boucle d'exécution.
Args:
pipeline: WorkflowPipeline pour matching et gestion workflows
action_executor: Exécuteur d'actions (créé si None)
screen_capturer: Capturer d'écran (créé si None)
capture_interval_ms: Intervalle entre captures (ms)
max_no_match_retries: Nombre max de tentatives si pas de match
confirmation_callback: Callback pour demander confirmation (SUPERVISED)
coaching_callback: Callback pour décisions coaching (COACHING)
screen_analyzer: ScreenAnalyzer pour construire un ScreenState enrichi
(lazy init via singleton si None)
screen_state_cache: Cache perceptuel (lazy init via singleton si None)
enable_ui_detection: Active la détection UI (True par défaut, flag d'urgence)
enable_ocr: Active l'OCR (True par défaut)
analyze_timeout_ms: Timeout soft pour l'analyse d'un ScreenState.
Au-delà, on active le mode dégradé pour les steps suivants.
window_info_provider: Callable renvoyant un dict window_info. Si None,
on tente `screen_capturer.get_active_window()`.
"""
self.pipeline = pipeline
self.action_executor = action_executor or ActionExecutor()
self.screen_capturer = screen_capturer or ScreenCapturer(
buffer_size=20,
detect_changes=True
)
self.target_resolver = TargetResolver(
similarity_threshold=0.75,
use_embedding_fallback=True
)
self.capture_interval_ms = capture_interval_ms
self.max_no_match_retries = max_no_match_retries
self.confirmation_callback = confirmation_callback
self.coaching_callback = coaching_callback
# C1 — Vision-aware execution
self._screen_analyzer = screen_analyzer # lazy init si None
self._screen_state_cache = screen_state_cache # lazy init si None
self.enable_ui_detection = enable_ui_detection
self.enable_ocr = enable_ocr
self.analyze_timeout_ms = analyze_timeout_ms
self._window_info_provider = window_info_provider
# Mode dégradé déclenché par un timeout analyse — persiste tant qu'un
# probe n'a pas démontré la récupération (voir ci-dessous).
self._degraded_mode = False
# Auto-rétablissement : compteur de steps rapides consécutifs.
# Si l'analyse tourne vite (< analyze_timeout_ms / 2) pendant
# _fast_steps_recovery_threshold steps → on quitte le mode dégradé.
self._successive_fast_steps = 0
self._fast_steps_recovery_threshold = 3
# En mode dégradé, on retente l'analyse tous les _probe_interval steps
# pour détecter la récupération (les autres steps restent en stub pour
# éviter de re-saturer le GPU). 10 par défaut = ~5s à 500ms/step.
self._probe_interval = 10
self._degraded_step_counter = 0
# État interne
self.state = ExecutionState.IDLE
self.context: Optional[ExecutionContext] = None
self._stop_requested = False
self._pause_requested = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# Mode capture continue
self._use_continuous_capture = True
self._last_frame: Optional[CaptureFrame] = None
self._frame_queue: List[CaptureFrame] = []
self._frame_queue_lock = threading.Lock()
# Callbacks pour événements
self._on_step_complete: Optional[Callable[[StepResult], None]] = None
self._on_state_change: Optional[Callable[[ExecutionState], None]] = None
self._on_error: Optional[Callable[[str, Exception], None]] = None
# Répertoire temporaire pour screenshots
self._temp_dir = Path(tempfile.mkdtemp(prefix="rpa_vision_"))
# GPU Resource Manager
self._gpu_manager = None
if GPU_MANAGER_AVAILABLE:
try:
self._gpu_manager = get_gpu_resource_manager()
logger.info("GPU Resource Manager integrated")
except Exception as e:
logger.warning(f"GPU Resource Manager not available: {e}")
# Continuous Learner pour apprentissage continu
self._continuous_learner = None
if CONTINUOUS_LEARNER_AVAILABLE:
try:
self._continuous_learner = ContinuousLearner()
logger.info("Continuous Learner integrated")
except Exception as e:
logger.warning(f"Continuous Learner not available: {e}")
# Historique des confidences pour détection de dérive
self._confidence_history: Dict[str, List[float]] = {}
# Execution Robustness pour retry et récupération
self._robustness = None
if ROBUSTNESS_AVAILABLE:
try:
self._robustness = ExecutionRobustness(pipeline)
logger.info("Execution Robustness integrated")
except Exception as e:
logger.warning(f"Execution Robustness not available: {e}")
# Analytics integration pour métriques et insights
self._analytics_integration = None
if ANALYTICS_AVAILABLE:
try:
analytics_system = get_analytics_system()
self._analytics_integration = AnalyticsExecutionIntegration(analytics_system)
logger.info("Analytics System integrated")
except Exception as e:
logger.warning(f"Analytics System not available: {e}")
# Feedback and Correction Pack integration pour mode COACHING
self._feedback_processor = None
self._correction_integration = None
if FEEDBACK_AVAILABLE:
try:
self._feedback_processor = FeedbackProcessor(auto_integrate_corrections=True)
self._correction_integration = get_correction_pack_integration()
logger.info("Feedback and Correction Pack integration enabled")
except Exception as e:
logger.warning(f"Feedback integration not available: {e}")
# Training Data Collector pour enregistrement des sessions COACHING
self._training_collector = None
if TRAINING_COLLECTOR_AVAILABLE:
try:
self._training_collector = TrainingDataCollector(
output_dir="data/coaching_sessions",
auto_integrate_corrections=True
)
logger.info("Training Data Collector integrated")
except Exception as e:
logger.warning(f"Training Data Collector not available: {e}")
# Coaching state
self._coaching_response: Optional[CoachingResponse] = None
self._coaching_stats = {
'suggestions_made': 0,
'accepted': 0,
'rejected': 0,
'corrected': 0,
'manual_executions': 0
}
logger.info("ExecutionLoop initialized")
# =========================================================================
# Contrôle de l'exécution
# =========================================================================
def start(
self,
workflow_id: str,
mode: Optional[ExecutionMode] = None,
start_node_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None
) -> bool:
"""
Démarrer l'exécution d'un workflow.
Args:
workflow_id: ID du workflow à exécuter
mode: Mode d'exécution (auto-détecté si None)
start_node_id: Node de départ (entry_node si None)
variables: Variables initiales
Returns:
True si démarré avec succès
"""
with self._lock:
if self.state == ExecutionState.RUNNING:
logger.warning("Execution already running")
return False
# Charger le workflow
workflow = self.pipeline.load_workflow(workflow_id)
if not workflow:
logger.error(f"Workflow not found: {workflow_id}")
return False
# Déterminer le mode d'exécution
if mode is None:
mode = self._determine_execution_mode(workflow)
# Créer le contexte
self.context = ExecutionContext(
workflow_id=workflow_id,
execution_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
mode=mode,
started_at=datetime.now(),
current_node_id=start_node_id or (workflow.entry_nodes[0] if workflow.entry_nodes else None),
variables=variables or {}
)
self._stop_requested = False
self._pause_requested = False
self.state = ExecutionState.RUNNING
# Notify GPU Resource Manager about execution mode
self._update_gpu_mode(mode)
# Notify Analytics about execution start
if self._analytics_integration:
try:
self._analytics_integration.on_execution_start(
workflow_id=workflow_id,
execution_id=self.context.execution_id,
mode=mode.value
)
# Start resource monitoring for this execution
self._analytics_integration.start_resource_monitoring(
execution_id=self.context.execution_id
)
except Exception as e:
logger.warning(f"Analytics notification failed: {e}")
# Démarrer le thread d'exécution
self._thread = threading.Thread(target=self._execution_loop, daemon=True)
self._thread.start()
logger.info(f"Started execution of {workflow_id} in {mode.value} mode")
self._notify_state_change(ExecutionState.RUNNING)
return True
def stop(self) -> None:
"""Arrêter l'exécution."""
with self._lock:
self._stop_requested = True
logger.info("Stop requested")
def pause(self) -> None:
"""Mettre en pause l'exécution."""
with self._lock:
self._pause_requested = True
self.state = ExecutionState.PAUSED
logger.info("Pause requested")
self._notify_state_change(ExecutionState.PAUSED)
def resume(self) -> None:
"""Reprendre l'exécution après pause."""
with self._lock:
self._pause_requested = False
self.state = ExecutionState.RUNNING
logger.info("Resumed")
self._notify_state_change(ExecutionState.RUNNING)
def confirm_action(self, approved: bool = True) -> None:
"""Confirmer ou rejeter une action en attente."""
with self._lock:
if self.state == ExecutionState.WAITING_CONFIRMATION:
self._confirmation_result = approved
self.state = ExecutionState.RUNNING
# =========================================================================
# Boucle principale
# =========================================================================
def _execution_loop(self) -> None:
"""Boucle principale d'exécution (tourne dans un thread)."""
no_match_count = 0
# Démarrer la capture continue si activée
if self._use_continuous_capture:
self.screen_capturer.start_continuous(
callback=self._on_frame_captured,
interval_ms=self.capture_interval_ms,
skip_unchanged=True
)
logger.info("Started continuous screen capture")
try:
while not self._stop_requested:
# Vérifier pause
while self._pause_requested and not self._stop_requested:
time.sleep(0.1)
if self._stop_requested:
break
# Exécuter une étape
step_result = self._execute_step()
if step_result is None:
# Pas de match trouvé
no_match_count += 1
if no_match_count >= self.max_no_match_retries:
logger.error(f"No match found after {no_match_count} retries")
self._handle_no_match()
break
time.sleep(self.capture_interval_ms / 1000.0)
continue
no_match_count = 0 # Reset counter
# Notifier le résultat
if self._on_step_complete:
self._on_step_complete(step_result)
# Enregistrer dans l'historique
self.context.history.append({
"timestamp": datetime.now().isoformat(),
"node_id": step_result.node_id,
"edge_id": step_result.edge_id,
"success": step_result.success,
"confidence": step_result.match_confidence
})
# Notify Analytics about step completion
# C1 — transmet tous les champs vision-aware (ocr_ms, ui_ms,
# analyze_ms, cache_hit, degraded) au système analytics via
# on_step_result qui accepte un StepResult complet.
if self._analytics_integration and step_result:
try:
self._analytics_integration.on_step_result(
execution_id=self.context.execution_id,
workflow_id=self.context.workflow_id,
step_result=step_result,
)
except Exception as e:
logger.warning(f"Analytics step notification failed: {e}")
# Vérifier si workflow terminé
if self._is_workflow_complete():
logger.info("Workflow completed successfully")
self.state = ExecutionState.COMPLETED
self._notify_state_change(ExecutionState.COMPLETED)
break
# Attendre avant prochaine capture
time.sleep(self.capture_interval_ms / 1000.0)
except Exception as e:
logger.exception(f"Execution loop error: {e}")
self.state = ExecutionState.FAILED
self._notify_state_change(ExecutionState.FAILED)
if self._on_error:
self._on_error("execution_loop", e)
finally:
# Arrêter la capture continue
if self._use_continuous_capture:
self.screen_capturer.stop_continuous()
logger.info("Stopped continuous screen capture")
if self._stop_requested:
self.state = ExecutionState.STOPPED
self._notify_state_change(ExecutionState.STOPPED)
# Notify Analytics about execution completion
# Contrat normalisé (Lot A) : duration_ms + status explicite
# au lieu du booléen success + duration ambigu.
if self._analytics_integration and self.context:
try:
duration_ms = (
datetime.now() - self.context.started_at
).total_seconds() * 1000
# Mapping ExecutionState → status analytics
if self.state == ExecutionState.COMPLETED:
status = "completed"
elif self.state == ExecutionState.FAILED:
status = "failed"
elif self.state == ExecutionState.STOPPED:
status = "stopped"
elif self.state == ExecutionState.PAUSED:
# Pause non résolue à la sortie = blocage non récupéré
status = "blocked"
else:
status = self.state.value
error_message = (
None
if status == "completed"
else f"Execution ended in state: {self.state.value}"
)
# Stop resource monitoring
self._analytics_integration.stop_resource_monitoring(
execution_id=self.context.execution_id
)
self._analytics_integration.on_execution_complete(
workflow_id=self.context.workflow_id,
execution_id=self.context.execution_id,
duration_ms=duration_ms,
status=status,
steps_total=self.context.steps_executed,
steps_completed=self.context.steps_succeeded,
steps_failed=self.context.steps_failed,
error_message=error_message,
)
except Exception as e:
logger.warning(f"Analytics completion notification failed: {e}")
logger.info(f"Execution loop ended: {self.state.value}")
def _execute_step(self) -> Optional[StepResult]:
"""
Exécuter une étape du workflow.
Returns:
StepResult ou None si pas de match
"""
start_time = time.time()
# 1. Capturer l'écran
screenshot_path = self._capture_screen()
if not screenshot_path:
logger.warning("Failed to capture screen")
return None
self.context.last_screenshot_path = screenshot_path
# 1bis. Construire un ScreenState enrichi (C1) — avec cache perceptuel
screen_state, timings = self._build_screen_state(screenshot_path)
logger.debug(
f"[Step] ScreenState analyze={timings['analyze_ms']:.0f}ms "
f"ocr={timings['ocr_ms']:.0f}ms ui={timings['ui_ms']:.0f}ms "
f"cache_hit={timings['cache_hit']} degraded={timings['degraded']}"
)
# 2. Identifier l'état actuel (matching)
#
# Lot E — on consomme le ScreenState enrichi déjà construit en 1bis
# (avec ui_elements, detected_text, window_title réels) au lieu de
# laisser le pipeline reconstruire un stub avec window_title="Unknown".
# Premier vrai matching context-aware.
match = self.pipeline.match_current_state_from_state(
screen_state,
workflow_id=self.context.workflow_id,
)
if not match:
logger.debug("No match found for current screen")
return None
current_node_id = match["node_id"]
confidence = match["confidence"]
self.context.current_node_id = current_node_id
self.context.last_match_confidence = confidence
logger.info(f"Matched node: {current_node_id} (confidence: {confidence:.3f})")
# 3. Obtenir la prochaine action (C3 : sélection d'edge robuste)
#
# Lot A — contrat dict avec status explicite :
# "terminal" → fin légitime du workflow (success=True)
# "blocked" → pause supervisée (plus JAMAIS traité comme un succès
# pour ne pas déclencher un faux _is_workflow_complete)
# "selected" → action à exécuter
#
# Lot B — on propage la confidence du match courant (source_similarity)
# pour que l'EdgeScorer puisse vérifier la précondition
# `min_source_similarity` de chaque edge. Sans cette propagation, la
# contrainte était silencieusement désactivée (hardcodé à 1.0).
next_action = self.pipeline.get_next_action(
self.context.workflow_id,
current_node_id,
screen_state=screen_state,
source_similarity=confidence,
)
# Rétrocompat défensive : si un pipeline legacy renvoie None ou un dict
# sans status, on considère ça comme un blocage (safe default).
if not isinstance(next_action, dict) or "status" not in next_action:
logger.error(
"get_next_action a renvoyé un résultat sans status "
f"(legacy?). Valeur reçue: {next_action!r}"
)
next_action = {"status": "blocked", "reason": "legacy_none_return"}
action_status = next_action.get("status")
if action_status == "terminal":
# Fin légitime : aucun outgoing_edge sur le node courant
total_ms = (time.time() - start_time) * 1000
return StepResult(
success=True,
node_id=current_node_id,
edge_id=None,
action_result=None,
match_confidence=confidence,
duration_ms=total_ms,
message="Workflow terminated (terminal node)",
screenshot_path=screenshot_path,
ocr_ms=timings["ocr_ms"],
ui_ms=timings["ui_ms"],
analyze_ms=timings["analyze_ms"],
total_ms=total_ms,
cache_hit=timings["cache_hit"],
degraded=timings["degraded"],
)
if action_status == "blocked":
# Blocage : des edges existent mais aucun n'est valide.
# On déclenche une pause supervisée (paused_need_help) et on
# remonte l'erreur. On ne retourne PAS success=True.
reason = next_action.get("reason", "unknown")
logger.warning(
f"ExecutionLoop bloqué sur {current_node_id}: {reason} "
f"→ pause supervisée demandée"
)
# On bascule en PAUSED et on arme _pause_requested pour que la
# boucle principale attende un resume() humain.
self.state = ExecutionState.PAUSED
self._pause_requested = True
self._notify_state_change(ExecutionState.PAUSED)
if self._on_error:
try:
self._on_error(
"blocked",
Exception(f"No valid edge from {current_node_id}: {reason}"),
)
except Exception as cb_err:
logger.debug(f"on_error callback failed: {cb_err}")
total_ms = (time.time() - start_time) * 1000
return StepResult(
success=False,
node_id=current_node_id,
edge_id=None,
action_result=None,
match_confidence=confidence,
duration_ms=total_ms,
message=f"Blocked: {reason}",
screenshot_path=screenshot_path,
ocr_ms=timings["ocr_ms"],
ui_ms=timings["ui_ms"],
analyze_ms=timings["analyze_ms"],
total_ms=total_ms,
cache_hit=timings["cache_hit"],
degraded=timings["degraded"],
)
# À partir d'ici, on est forcément en status="selected"
edge_id = next_action["edge_id"]
self.context.current_edge_id = edge_id
# 4. Selon le mode, exécuter ou suggérer
action_result = None
if self.context.mode == ExecutionMode.OBSERVATION:
# Mode observation : ne rien faire
logger.info(f"[OBSERVATION] Would execute: {next_action['action']}")
elif self.context.mode == ExecutionMode.COACHING:
# Mode coaching : suggérer l'action et attendre décision utilisateur
logger.info(f"[COACHING] Suggested action: {next_action['action']}")
self._coaching_stats['suggestions_made'] += 1
# Demander la décision à l'utilisateur
coaching_response = self._request_coaching_decision(next_action, screenshot_path)
if coaching_response.decision == CoachingDecision.ACCEPT:
# Utilisateur accepte : exécuter l'action suggérée
self._coaching_stats['accepted'] += 1
action_result = self._execute_action(next_action, screen_state=screen_state)
self._record_coaching_feedback(
next_action, coaching_response, action_result, success=True
)
elif coaching_response.decision == CoachingDecision.REJECT:
# Utilisateur rejette : ne pas exécuter
self._coaching_stats['rejected'] += 1
self._record_coaching_feedback(
next_action, coaching_response, None, success=False
)
total_ms = (time.time() - start_time) * 1000
return StepResult(
success=False,
node_id=current_node_id,
edge_id=edge_id,
action_result=None,
match_confidence=confidence,
duration_ms=total_ms,
message="Action rejected by user in COACHING mode",
screenshot_path=screenshot_path,
ocr_ms=timings["ocr_ms"],
ui_ms=timings["ui_ms"],
analyze_ms=timings["analyze_ms"],
total_ms=total_ms,
cache_hit=timings["cache_hit"],
degraded=timings["degraded"],
)
elif coaching_response.decision == CoachingDecision.CORRECT:
# Utilisateur fournit une correction
self._coaching_stats['corrected'] += 1
corrected_action = self._apply_coaching_correction(
next_action, coaching_response.correction
)
action_result = self._execute_action(corrected_action, screen_state=screen_state)
self._record_coaching_feedback(
next_action, coaching_response, action_result,
success=action_result.status == ExecutionStatus.SUCCESS if action_result else False
)
elif coaching_response.decision == CoachingDecision.EXECUTE_MANUAL:
# Utilisateur a exécuté manuellement
self._coaching_stats['manual_executions'] += 1
self._record_coaching_feedback(
next_action, coaching_response, None, success=True, manual=True
)
# Pas besoin d'exécuter, l'utilisateur l'a fait
elif coaching_response.decision == CoachingDecision.SKIP:
# Sauter cette action
self._record_coaching_feedback(
next_action, coaching_response, None, success=True
)
# Continuer sans exécuter
elif self.context.mode == ExecutionMode.SUPERVISED:
# Mode supervisé : demander confirmation
if not self._request_confirmation(next_action):
logger.info("Action rejected by user")
total_ms = (time.time() - start_time) * 1000
return StepResult(
success=False,
node_id=current_node_id,
edge_id=edge_id,
action_result=None,
match_confidence=confidence,
duration_ms=total_ms,
message="Action rejected by user",
screenshot_path=screenshot_path,
ocr_ms=timings["ocr_ms"],
ui_ms=timings["ui_ms"],
analyze_ms=timings["analyze_ms"],
total_ms=total_ms,
cache_hit=timings["cache_hit"],
degraded=timings["degraded"],
)
# Exécuter l'action
action_result = self._execute_action(next_action, screen_state=screen_state)
elif self.context.mode == ExecutionMode.AUTOMATIC:
# Mode automatique : exécuter directement
action_result = self._execute_action(next_action, screen_state=screen_state)
# 5. Mettre à jour les compteurs
self.context.steps_executed += 1
if action_result and action_result.status == ExecutionStatus.SUCCESS:
self.context.steps_succeeded += 1
elif action_result:
self.context.steps_failed += 1
duration_ms = (time.time() - start_time) * 1000
return StepResult(
success=action_result.status == ExecutionStatus.SUCCESS if action_result else True,
node_id=current_node_id,
edge_id=edge_id,
action_result=action_result,
match_confidence=confidence,
duration_ms=duration_ms,
message=action_result.message if action_result else "Observed",
screenshot_path=screenshot_path,
ocr_ms=timings["ocr_ms"],
ui_ms=timings["ui_ms"],
analyze_ms=timings["analyze_ms"],
total_ms=duration_ms,
cache_hit=timings["cache_hit"],
degraded=timings["degraded"],
)
# =========================================================================
# Méthodes utilitaires
# =========================================================================
def _capture_screen(self) -> Optional[str]:
"""Capturer l'écran et sauvegarder dans un fichier temporaire."""
try:
screenshot = self.screen_capturer.capture_screen()
if screenshot is None:
return None
# Sauvegarder dans fichier temporaire
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
screenshot_path = self._temp_dir / f"capture_{timestamp}.png"
screenshot.save(str(screenshot_path))
return str(screenshot_path)
except Exception as e:
logger.error(f"Screen capture failed: {e}")
return None
def _execute_action(
self,
action_info: Dict[str, Any],
screen_state: Optional[Any] = None,
) -> ExecutionResult:
"""
Exécuter une action via l'ActionExecutor.
Args:
action_info: dict action {edge_id, action, target_node, ...}
screen_state: ScreenState enrichi (si None, fallback stub minimal)
"""
try:
# Charger le workflow et l'edge
workflow = self.pipeline.load_workflow(self.context.workflow_id)
edge = workflow.get_edge(action_info["edge_id"])
if not edge:
return ExecutionResult(
status=ExecutionStatus.FAILED,
message=f"Edge not found: {action_info['edge_id']}",
duration_ms=0
)
# Utiliser le ScreenState enrichi fourni par le loop ; fallback minimal
# uniquement si on n'en a pas (legacy, tests).
if screen_state is None:
screen_state = self._build_stub_screen_state()
# Exécuter l'action
result = self.action_executor.execute_edge(
edge,
screen_state,
context=self.context.variables
)
logger.info(f"Action executed: {result.status.value} - {result.message}")
return result
except Exception as e:
logger.exception(f"Action execution failed: {e}")
return ExecutionResult(
status=ExecutionStatus.FAILED,
message=str(e),
duration_ms=0,
error=e
)
# =========================================================================
# C1 — Construction du ScreenState (vision-aware)
# =========================================================================
def _get_screen_analyzer(self):
"""
Récupérer le ScreenAnalyzer (singleton partagé, lazy).
Retourne None si indisponible (import error, etc.) — le loop
bascule alors en fallback stub.
Note Lot C : on ne passe plus `session_id` au singleton. Le session_id
est désormais un paramètre d'appel de `analyze()`, pour éviter que deux
ExecutionLoop partageant le même analyzer se marchent dessus.
"""
if self._screen_analyzer is not None:
return self._screen_analyzer
try:
from core.pipeline import get_screen_analyzer
self._screen_analyzer = get_screen_analyzer()
return self._screen_analyzer
except Exception as e:
logger.warning(f"ScreenAnalyzer indisponible: {e}")
return None
def _get_screen_state_cache(self):
"""Récupérer le cache de ScreenState (singleton partagé, lazy)."""
if self._screen_state_cache is not None:
return self._screen_state_cache
try:
from core.pipeline import get_screen_state_cache
self._screen_state_cache = get_screen_state_cache()
return self._screen_state_cache
except Exception as e:
logger.warning(f"ScreenStateCache indisponible: {e}")
return None
def _resolve_window_info(self) -> Optional[Dict[str, Any]]:
"""
Récupérer les infos de la fenêtre active.
Ordre de préférence :
1. `window_info_provider` fourni au constructeur
2. `screen_capturer.get_active_window()`
3. None → ScreenAnalyzer utilisera les valeurs par défaut
"""
if self._window_info_provider is not None:
try:
return self._window_info_provider()
except Exception as e:
logger.debug(f"window_info_provider failed: {e}")
try:
raw = self.screen_capturer.get_active_window()
if raw:
# Normaliser vers le format attendu par ScreenAnalyzer
return {
"title": raw.get("title", "Unknown"),
"app_name": raw.get("app", "unknown"),
"window_bounds": [
raw.get("x", 0),
raw.get("y", 0),
raw.get("width", 0),
raw.get("height", 0),
],
}
except Exception as e:
logger.debug(f"get_active_window failed: {e}")
return None
def _build_screen_state(
self,
screenshot_path: str,
) -> tuple:
"""
Construire un ScreenState enrichi depuis un screenshot.
Logique :
- Si enable_ui_detection=False ET enable_ocr=False → stub
- Si analyseur indisponible → stub
- Sinon : cache.get_or_compute(analyzer.analyze)
- Timeout soft : si l'analyse dépasse `analyze_timeout_ms`, on log
un warning et on active le mode dégradé pour les prochains steps.
Returns:
(screen_state, timings_dict)
timings_dict: {
"analyze_ms", "ocr_ms", "ui_ms", "cache_hit", "degraded"
}
"""
timings = {
"analyze_ms": 0.0,
"ocr_ms": 0.0,
"ui_ms": 0.0,
"cache_hit": False,
"degraded": False,
}
# Mode "tout désactivé" (flag d'urgence) → stub
if not self.enable_ui_detection and not self.enable_ocr:
timings["degraded"] = True
return self._build_stub_screen_state(screenshot_path), timings
analyzer = self._get_screen_analyzer()
if analyzer is None:
timings["degraded"] = True
return self._build_stub_screen_state(screenshot_path), timings
# Mode dégradé : on reste sur stub, sauf "probe" périodique qui teste
# si le GPU est redevenu performant. Si oui, on accumule les steps
# rapides ; après _fast_steps_recovery_threshold probes rapides
# consécutifs on retourne en mode complet.
if self._degraded_mode:
self._degraded_step_counter += 1
if self._degraded_step_counter < self._probe_interval:
timings["degraded"] = True
return self._build_stub_screen_state(screenshot_path), timings
# Sinon on tente un probe réel ci-dessous
self._degraded_step_counter = 0
cache = self._get_screen_state_cache()
# Invalidation proactive : si l'écran a massivement changé depuis
# la dernière entrée du cache, on purge. Le TTL seul (2s) laisserait
# passer des entrées obsolètes sur des changements rapides (popup, nav).
if cache is not None:
try:
cache.invalidate_if_changed(screenshot_path, threshold=0.3)
except Exception as e:
logger.debug(f"invalidate_if_changed a échoué: {e}")
window_info = self._resolve_window_info()
# Fonction de calcul (cache miss)
# Les flags runtime (enable_ocr, enable_ui_detection) et le session_id
# sont passés en kwargs-only à analyze() : AUCUNE mutation de l'analyseur
# singleton (Lot C — thread-safety, deux ExecutionLoop peuvent partager
# le même analyzer sans se contaminer).
execution_id = self.context.execution_id if self.context else ""
def compute(path: str):
t_start = time.time()
state = analyzer.analyze(
path,
window_info=window_info,
enable_ocr=self.enable_ocr,
enable_ui_detection=self.enable_ui_detection,
session_id=execution_id,
)
elapsed = (time.time() - t_start) * 1000
# Annoter le temps dans les métadonnées
if hasattr(state, "metadata"):
state.metadata["analyze_ms"] = elapsed
return state
t0 = time.time()
try:
if cache is not None:
# Lot D — clé composite context-aware : deux contextes
# différents partageant le même screenshot n'entrent plus
# en collision. Le workflow_id isole les replays par workflow,
# les flags différencient les modes d'analyse (OCR on/off,
# UI on/off), et le (window_title, app_name) distingue deux
# applications qui présenteraient un rendu visuel similaire.
ctx_window_title = (window_info or {}).get("title", "") or ""
ctx_app_name = (window_info or {}).get("app_name", "") or ""
ctx_workflow_id = (
self.context.workflow_id if self.context else ""
)
state, cache_hit, _ = cache.get_or_compute(
screenshot_path,
compute,
window_title=ctx_window_title,
app_name=ctx_app_name,
enable_ocr=self.enable_ocr,
enable_ui_detection=self.enable_ui_detection,
workflow_id=ctx_workflow_id,
)
else:
state = compute(screenshot_path)
cache_hit = False
except Exception as e:
logger.warning(f"ScreenState build failed: {e} — fallback stub")
timings["degraded"] = True
return self._build_stub_screen_state(screenshot_path), timings
analyze_ms = (time.time() - t0) * 1000
timings["analyze_ms"] = analyze_ms
timings["cache_hit"] = cache_hit
# Décomposer OCR vs UI si possible (métadonnées)
meta = getattr(state, "metadata", {}) or {}
timings["ocr_ms"] = float(meta.get("ocr_ms", 0.0))
timings["ui_ms"] = float(meta.get("ui_ms", 0.0))
# Timeout soft : activer le mode dégradé si > seuil
# (cache_hit ignoré : un hit ne prouve rien sur la santé du GPU)
if analyze_ms > self.analyze_timeout_ms and not cache_hit:
logger.warning(
f"ScreenState analysis slow: {analyze_ms:.0f}ms > "
f"{self.analyze_timeout_ms}ms → activation mode dégradé"
)
self._degraded_mode = True
self._successive_fast_steps = 0
timings["degraded"] = True
else:
# Step "rapide" : incrémenter le compteur si < timeout / 2.
# On ignore les cache hits (pas représentatifs de la perf GPU).
fast_threshold_ms = self.analyze_timeout_ms / 2
if not cache_hit and analyze_ms < fast_threshold_ms:
self._successive_fast_steps += 1
# Auto-rétablissement : si on était en dégradé et qu'on a
# enchaîné assez de steps rapides → retour en mode complet.
if (
self._degraded_mode
and self._successive_fast_steps
>= self._fast_steps_recovery_threshold
):
logger.info(
"Mode complet restauré après %d steps rapides "
"(dernier analyze_ms=%.0fms < seuil=%.0fms)",
self._successive_fast_steps,
analyze_ms,
fast_threshold_ms,
)
self._degraded_mode = False
self._successive_fast_steps = 0
elif not cache_hit:
# Step ni lent ni rapide (entre timeout/2 et timeout) : reset
self._successive_fast_steps = 0
# On propage l'état dégradé courant dans les timings (utile pour le
# StepResult : tant qu'on n'a pas récupéré assez de steps rapides,
# on continue à signaler "degraded=True").
timings["degraded"] = self._degraded_mode
return state, timings
def _build_stub_screen_state(self, screenshot_path: Optional[str] = None):
"""
Construire un ScreenState minimal (fallback legacy).
Utilisé quand l'analyseur est indisponible ou que tous les flags
de détection sont désactivés (flag d'urgence).
"""
from core.models.screen_state import (
ScreenState, WindowContext, RawLevel, PerceptionLevel,
ContextLevel, EmbeddingRef
)
path = screenshot_path or (
self.context.last_screenshot_path if self.context else ""
) or ""
return ScreenState(
screen_state_id=f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}",
timestamp=datetime.now(),
session_id=self.context.execution_id if self.context else "stub",
window=WindowContext(
app_name="unknown",
window_title="Unknown",
screen_resolution=[1920, 1080],
workspace="main",
),
raw=RawLevel(
screenshot_path=path,
capture_method="execution",
file_size_bytes=0,
),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="", vector_id="", dimensions=512),
detected_text=[],
text_detection_method="none",
confidence_avg=0.0,
),
context=ContextLevel(),
ui_elements=[],
)
def _request_confirmation(self, action_info: Dict[str, Any]) -> bool:
"""Demander confirmation à l'utilisateur."""
if self.confirmation_callback:
return self.confirmation_callback(
f"Execute action: {action_info['action']}?",
action_info
)
# Sans callback, attendre la confirmation via confirm_action()
self.state = ExecutionState.WAITING_CONFIRMATION
self._notify_state_change(ExecutionState.WAITING_CONFIRMATION)
self._confirmation_result = None
# Attendre la confirmation (timeout 60s)
timeout = 60
start = time.time()
while self._confirmation_result is None and (time.time() - start) < timeout:
if self._stop_requested:
return False
time.sleep(0.1)
return self._confirmation_result or False
# =========================================================================
# Méthodes COACHING
# =========================================================================
def _request_coaching_decision(
self,
action_info: Dict[str, Any],
screenshot_path: Optional[str] = None
) -> CoachingResponse:
"""
Demander une décision à l'utilisateur en mode COACHING.
Args:
action_info: Information sur l'action suggérée
screenshot_path: Chemin vers la capture d'écran actuelle
Returns:
CoachingResponse avec la décision de l'utilisateur
"""
# Si callback fourni, l'utiliser
if self.coaching_callback:
try:
return self.coaching_callback(
f"Suggested action: {action_info.get('action', 'unknown')}",
{
**action_info,
'screenshot_path': screenshot_path,
'context': {
'workflow_id': self.context.workflow_id if self.context else None,
'execution_id': self.context.execution_id if self.context else None,
'step': self.context.steps_executed if self.context else 0
}
}
)
except Exception as e:
logger.error(f"Coaching callback error: {e}")
# En cas d'erreur, retourner SKIP par défaut
return CoachingResponse(decision=CoachingDecision.SKIP)
# Sans callback, attendre la réponse via submit_coaching_decision()
self.state = ExecutionState.WAITING_COACHING
self._notify_state_change(ExecutionState.WAITING_COACHING)
self._coaching_response = None
# Attendre la réponse (timeout 120s pour COACHING car plus complexe)
timeout = 120
start = time.time()
while self._coaching_response is None and (time.time() - start) < timeout:
if self._stop_requested:
return CoachingResponse(decision=CoachingDecision.REJECT)
time.sleep(0.1)
# Timeout = SKIP par défaut
if self._coaching_response is None:
logger.warning("Coaching decision timeout - skipping action")
return CoachingResponse(decision=CoachingDecision.SKIP)
return self._coaching_response
def submit_coaching_decision(self, response: CoachingResponse) -> bool:
"""
Soumettre une décision COACHING depuis l'extérieur (API, UI).
Args:
response: Réponse COACHING de l'utilisateur
Returns:
True si la décision a été acceptée
"""
if self.state != ExecutionState.WAITING_COACHING:
logger.warning(f"Cannot submit coaching decision in state {self.state}")
return False
self._coaching_response = response
logger.info(f"Coaching decision received: {response.decision.value}")
return True
def _apply_coaching_correction(
self,
original_action: Dict[str, Any],
correction: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Appliquer une correction utilisateur à une action.
Args:
original_action: Action originale suggérée
correction: Correction fournie par l'utilisateur
Returns:
Action corrigée
"""
if not correction:
return original_action
corrected = {**original_action}
# Appliquer les corrections de cible
if 'target' in correction:
corrected['target'] = correction['target']
if 'corrected_target' in correction:
corrected['target'] = correction['corrected_target']
# Appliquer les corrections de paramètres
if 'params' in correction:
corrected['params'] = {**corrected.get('params', {}), **correction['params']}
if 'corrected_params' in correction:
corrected['params'] = {**corrected.get('params', {}), **correction['corrected_params']}
# Appliquer changement d'action
if 'action' in correction:
corrected['action'] = correction['action']
# Appliquer timing
if 'wait_before' in correction:
corrected['wait_before'] = correction['wait_before']
if 'wait_after' in correction:
corrected['wait_after'] = correction['wait_after']
logger.info(f"Applied coaching correction: {list(correction.keys())}")
return corrected
def _record_coaching_feedback(
self,
action_info: Dict[str, Any],
response: CoachingResponse,
result: Optional[ExecutionResult],
success: bool,
manual: bool = False
) -> None:
"""
Enregistrer le feedback COACHING pour apprentissage.
Args:
action_info: Action suggérée
response: Réponse de l'utilisateur
result: Résultat d'exécution (si exécuté)
success: Si l'action a réussi
manual: Si l'utilisateur a exécuté manuellement
"""
if not self.context:
return
# Préparer les données de correction
correction_data = {
'action_type': action_info.get('action', 'unknown'),
'element_type': action_info.get('target', {}).get('type', 'unknown'),
'decision': response.decision.value,
'success': success,
'manual_execution': manual,
'feedback': response.feedback,
'workflow_id': self.context.workflow_id,
'node_id': self.context.current_node_id,
'step': self.context.steps_executed
}
# Si correction fournie, l'ajouter
if response.correction:
correction_data['correction_type'] = 'user_correction'
correction_data['original_target'] = action_info.get('target')
correction_data['original_params'] = action_info.get('params')
correction_data['corrected_target'] = response.correction.get('target') or response.correction.get('corrected_target')
correction_data['corrected_params'] = response.correction.get('params') or response.correction.get('corrected_params')
# Enregistrer via TrainingDataCollector
if self._training_collector:
try:
# S'assurer qu'une session est active
if not self._training_collector.current_session:
self._training_collector.start_session(
session_id=self.context.execution_id,
workflow_id=self.context.workflow_id
)
# Enregistrer l'action
self._training_collector.record_action({
'action': action_info.get('action'),
'target': action_info.get('target'),
'decision': response.decision.value,
'success': success
})
# Si correction, l'enregistrer (propagera auto vers Correction Packs)
if response.correction or response.decision == CoachingDecision.REJECT:
self._training_collector.record_correction(correction_data)
except Exception as e:
logger.warning(f"Error recording coaching feedback: {e}")
# Enregistrer via FeedbackProcessor pour statistiques
if self._feedback_processor and not success:
try:
feedback_type = FeedbackType.INCORRECT if response.decision == CoachingDecision.REJECT else FeedbackType.PARTIAL
self._feedback_processor.process_feedback(
workflow_id=self.context.workflow_id,
execution_id=self.context.execution_id,
feedback_type=feedback_type,
confidence=self.context.last_match_confidence,
corrections=response.correction,
context={
'action': action_info.get('action'),
'element_type': action_info.get('target', {}).get('type'),
'failure_reason': response.feedback or 'user_rejected'
}
)
except Exception as e:
logger.warning(f"Error processing coaching feedback: {e}")
logger.info(
f"Coaching feedback recorded: decision={response.decision.value}, "
f"success={success}, has_correction={response.correction is not None}"
)
def get_coaching_stats(self) -> Dict[str, Any]:
"""Récupérer les statistiques COACHING de la session courante."""
stats = {**self._coaching_stats}
if stats['suggestions_made'] > 0:
stats['acceptance_rate'] = stats['accepted'] / stats['suggestions_made']
stats['correction_rate'] = stats['corrected'] / stats['suggestions_made']
else:
stats['acceptance_rate'] = 0.0
stats['correction_rate'] = 0.0
return stats
def _determine_execution_mode(self, workflow: Workflow) -> ExecutionMode:
"""Déterminer le mode d'exécution basé sur le learning_state."""
learning_state = workflow.learning_state
if learning_state == "OBSERVATION":
return ExecutionMode.OBSERVATION
elif learning_state == "COACHING":
return ExecutionMode.COACHING
elif learning_state == "AUTO_CANDIDATE":
return ExecutionMode.SUPERVISED
elif learning_state == "AUTO_CONFIRMED":
return ExecutionMode.AUTOMATIC
else:
return ExecutionMode.OBSERVATION
def _is_workflow_complete(self) -> bool:
"""Vérifier si le workflow est terminé."""
if not self.context or not self.context.current_node_id:
return False
workflow = self.pipeline.load_workflow(self.context.workflow_id)
if not workflow:
return True
# Vérifier si on est sur un node terminal
return self.context.current_node_id in workflow.end_nodes
def _handle_no_match(self) -> None:
"""Gérer le cas où aucun match n'est trouvé."""
logger.warning("No match found - pausing execution")
self.state = ExecutionState.PAUSED
self._notify_state_change(ExecutionState.PAUSED)
if self._on_error:
self._on_error("no_match", Exception("No matching node found"))
def _notify_state_change(self, new_state: ExecutionState) -> None:
"""Notifier un changement d'état."""
if self._on_state_change:
try:
self._on_state_change(new_state)
except Exception as e:
logger.error(f"State change callback error: {e}")
# =========================================================================
# Callbacks et événements
# =========================================================================
def on_step_complete(self, callback: Callable[[StepResult], None]) -> None:
"""Enregistrer un callback pour chaque étape complétée."""
self._on_step_complete = callback
def on_state_change(self, callback: Callable[[ExecutionState], None]) -> None:
"""Enregistrer un callback pour les changements d'état."""
self._on_state_change = callback
def on_error(self, callback: Callable[[str, Exception], None]) -> None:
"""Enregistrer un callback pour les erreurs."""
self._on_error = callback
# =========================================================================
# Getters
# =========================================================================
def get_state(self) -> ExecutionState:
"""Obtenir l'état actuel."""
return self.state
def get_context(self) -> Optional[ExecutionContext]:
"""Obtenir le contexte d'exécution."""
return self.context
def get_progress(self) -> Dict[str, Any]:
"""Obtenir la progression de l'exécution."""
if not self.context:
return {"status": "idle"}
return {
"status": self.state.value,
"workflow_id": self.context.workflow_id,
"execution_id": self.context.execution_id,
"mode": self.context.mode.value,
"current_node": self.context.current_node_id,
"steps_executed": self.context.steps_executed,
"steps_succeeded": self.context.steps_succeeded,
"steps_failed": self.context.steps_failed,
"last_confidence": self.context.last_match_confidence,
"duration_seconds": (datetime.now() - self.context.started_at).total_seconds()
}
def cleanup(self) -> None:
"""Nettoyer les ressources temporaires."""
import shutil
try:
if self._temp_dir.exists():
shutil.rmtree(self._temp_dir)
except Exception as e:
logger.warning(f"Cleanup failed: {e}")
# =========================================================================
# Capture continue
# =========================================================================
def _on_frame_captured(self, frame: CaptureFrame) -> None:
"""Callback appelé pour chaque frame capturé en mode continu."""
with self._frame_queue_lock:
self._last_frame = frame
self._frame_queue.append(frame)
# Garder seulement les 10 derniers frames en queue
if len(self._frame_queue) > 10:
self._frame_queue.pop(0)
logger.debug(f"Frame captured: {frame.frame_id} (changed={frame.changed_from_previous})")
def _get_latest_frame(self) -> Optional[CaptureFrame]:
"""Obtenir le dernier frame capturé."""
with self._frame_queue_lock:
return self._last_frame
def get_capture_stats(self) -> Dict[str, Any]:
"""Obtenir les statistiques de capture."""
stats = self.screen_capturer.get_stats()
return {
"total_captures": stats.total_captures,
"captures_per_second": stats.captures_per_second,
"unchanged_skipped": stats.unchanged_frames_skipped,
"avg_capture_time_ms": stats.average_capture_time_ms,
"buffer_size": stats.buffer_size,
"memory_mb": stats.memory_usage_mb
}
def set_continuous_capture(self, enabled: bool) -> None:
"""Activer/désactiver la capture continue."""
self._use_continuous_capture = enabled
logger.info(f"Continuous capture {'enabled' if enabled else 'disabled'}")
# =========================================================================
# Target Resolution avancée
# =========================================================================
def resolve_target_advanced(
self,
target_spec: Any,
screen_state: ScreenState
) -> Optional[ResolvedTarget]:
"""
Résoudre une cible avec le resolver avancé.
Args:
target_spec: Spécification de la cible
screen_state: État actuel de l'écran
Returns:
ResolvedTarget ou None
"""
return self.target_resolver.resolve_target(target_spec, screen_state)
def get_resolver_stats(self) -> Dict[str, Any]:
"""Obtenir les statistiques du resolver."""
return self.target_resolver.get_stats()
# =========================================================================
# GPU Resource Management
# =========================================================================
def _update_gpu_mode(self, mode: ExecutionMode) -> None:
"""
Update GPU Resource Manager based on execution mode.
Args:
mode: Current execution mode
"""
if not self._gpu_manager or not GPU_MANAGER_AVAILABLE:
return
try:
import asyncio
# Map ExecutionLoop mode to GPU mode
if mode == ExecutionMode.AUTOMATIC:
gpu_mode = GPUExecutionMode.AUTOPILOT
elif mode in [ExecutionMode.OBSERVATION, ExecutionMode.COACHING]:
gpu_mode = GPUExecutionMode.RECORDING
else:
gpu_mode = GPUExecutionMode.IDLE
# Run async mode change
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._gpu_manager.set_execution_mode(gpu_mode))
loop.close()
logger.info(f"GPU mode updated to {gpu_mode.value}")
except Exception as e:
logger.warning(f"Failed to update GPU mode: {e}")
# =========================================================================
# Continuous Learning Integration
# =========================================================================
def _update_prototype_on_success(
self,
node_id: str,
embedding: Any
) -> None:
"""
Mettre à jour le prototype après une exécution réussie.
Args:
node_id: ID du node
embedding: Embedding de l'état actuel
"""
if not self._continuous_learner or not CONTINUOUS_LEARNER_AVAILABLE:
return
try:
import numpy as np
if hasattr(embedding, 'get_vector'):
vector = embedding.get_vector()
elif isinstance(embedding, np.ndarray):
vector = embedding
else:
return
self._continuous_learner.update_prototype(
node_id=node_id,
new_embedding=vector,
execution_success=True
)
logger.debug(f"Prototype updated for node {node_id}")
except Exception as e:
logger.warning(f"Failed to update prototype: {e}")
def _check_drift(
self,
node_id: str,
confidence: float
) -> Optional[DriftStatus]:
"""
Vérifier la dérive UI après un match.
Args:
node_id: ID du node matché
confidence: Confiance du match
Returns:
DriftStatus si dérive détectée
"""
if not self._continuous_learner or not CONTINUOUS_LEARNER_AVAILABLE:
return None
try:
# Ajouter à l'historique
if node_id not in self._confidence_history:
self._confidence_history[node_id] = []
self._confidence_history[node_id].append(confidence)
# Garder les 10 dernières
self._confidence_history[node_id] = self._confidence_history[node_id][-10:]
# Vérifier dérive
drift_status = self._continuous_learner.detect_drift(
node_id=node_id,
recent_confidences=self._confidence_history[node_id]
)
if drift_status.is_drifting:
logger.warning(
f"Drift detected for node {node_id}: "
f"severity={drift_status.drift_severity:.2f}, "
f"action={drift_status.recommended_action}"
)
# Notifier via callback si disponible
if self._on_error:
self._on_error(
"drift_detected",
Exception(f"UI drift detected for node {node_id}")
)
return drift_status
except Exception as e:
logger.warning(f"Failed to check drift: {e}")
return None
def get_learning_stats(self) -> Dict[str, Any]:
"""Obtenir les statistiques d'apprentissage continu."""
stats = {
"continuous_learner_available": CONTINUOUS_LEARNER_AVAILABLE,
"confidence_history": {}
}
for node_id, confidences in self._confidence_history.items():
if confidences:
import numpy as np
stats["confidence_history"][node_id] = {
"count": len(confidences),
"mean": float(np.mean(confidences)),
"min": float(min(confidences)),
"max": float(max(confidences))
}
return stats
# =============================================================================
# Factory function
# =============================================================================
def create_execution_loop(
pipeline: "WorkflowPipeline",
capture_interval_ms: int = 500
) -> ExecutionLoop:
"""
Créer une boucle d'exécution avec configuration par défaut.
Args:
pipeline: WorkflowPipeline
capture_interval_ms: Intervalle de capture
Returns:
ExecutionLoop configuré
"""
return ExecutionLoop(
pipeline=pipeline,
capture_interval_ms=capture_interval_ms
)