From af4ffa189a8629358f39ddbc82b9b6e6fca6d6bb Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 15 Apr 2026 09:06:19 +0200 Subject: [PATCH] feat(analytics): normalise API + contrat explicite get_next_action (Lot A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Contrat get_next_action() — suppression du None ambigu : {"status": "selected", "edge": ..., ...} {"status": "terminal"} {"status": "blocked", "reason": "no_valid_edge" | ...} ExecutionLoop dispatche proprement : blocked -> PAUSED + _pause_requested, terminal -> succès légitime. Rétrocompat défensive (None legacy -> blocked). Analytics API normalisée (kwargs-only) : on_execution_complete(duration_ms, status, steps_total|completed|failed) on_step_complete(duration_ms, ...) on_recovery_attempt(duration_ms, ...) Découverte critique : les anciens appels utilisaient des méthodes et champs inexistants (ExecutionMetrics.duration, metrics_collector.record_execution). Le code n'avait jamais tourné au runtime — zéro analytics remontée. L'exception était avalée par le try/except englobant. 58 tests (18 analytics + 11 contrat + 20 ExecutionLoop + 12 edge_scorer non-régression). Migration complète, pas de pont legacy. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../analytics/collection/metrics_collector.py | 29 +- .../integration/execution_integration.py | 405 ++++++++++---- core/analytics/storage/timeseries_store.py | 54 +- core/pipeline/workflow_pipeline.py | 434 ++++++++++++--- core/pipeline/workflow_pipeline_enhanced.py | 38 +- .../test_workflow_pipeline_enhanced.py | 37 +- tests/unit/test_analytics_vision_metrics.py | 520 ++++++++++++++++++ .../test_workflow_pipeline_get_next_action.py | 264 +++++++++ .../backend/services/execution_integration.py | 25 +- 9 files changed, 1573 insertions(+), 233 deletions(-) create mode 100644 tests/unit/test_analytics_vision_metrics.py create mode 100644 tests/unit/test_workflow_pipeline_get_next_action.py diff --git a/core/analytics/collection/metrics_collector.py b/core/analytics/collection/metrics_collector.py index b2dd5f845..9b73a59cb 100644 --- a/core/analytics/collection/metrics_collector.py +++ b/core/analytics/collection/metrics_collector.py @@ -76,7 +76,16 @@ class StepMetrics: confidence_score: float retry_count: int = 0 error_details: Optional[str] = None - + # C1 — Instrumentation vision-aware (ExecutionLoop) + # Ces champs proviennent de `StepResult` (core/execution/execution_loop.py). + # Tous optionnels avec valeurs par défaut pour rétrocompatibilité. + ocr_ms: float = 0.0 # Temps OCR sur ce step + ui_ms: float = 0.0 # Temps détection UI sur ce step + analyze_ms: float = 0.0 # Temps analyse ScreenState (OCR + UI + reste) + total_ms: float = 0.0 # Temps total du step (alias duration_ms) + cache_hit: bool = False # True si ScreenState vient du cache perceptuel + degraded: bool = False # True si mode dégradé (timeout analyse) + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for storage.""" return { @@ -92,9 +101,15 @@ class StepMetrics: 'status': self.status, 'confidence_score': self.confidence_score, 'retry_count': self.retry_count, - 'error_details': self.error_details + 'error_details': self.error_details, + 'ocr_ms': self.ocr_ms, + 'ui_ms': self.ui_ms, + 'analyze_ms': self.analyze_ms, + 'total_ms': self.total_ms, + 'cache_hit': self.cache_hit, + 'degraded': self.degraded, } - + @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'StepMetrics': """Create from dictionary.""" @@ -111,7 +126,13 @@ class StepMetrics: status=data['status'], confidence_score=data['confidence_score'], retry_count=data.get('retry_count', 0), - error_details=data.get('error_details') + error_details=data.get('error_details'), + ocr_ms=float(data.get('ocr_ms') or 0.0), + ui_ms=float(data.get('ui_ms') or 0.0), + analyze_ms=float(data.get('analyze_ms') or 0.0), + total_ms=float(data.get('total_ms') or 0.0), + cache_hit=bool(data.get('cache_hit') or False), + degraded=bool(data.get('degraded') or False), ) diff --git a/core/analytics/integration/execution_integration.py b/core/analytics/integration/execution_integration.py index 3e564b80d..04232ceb9 100644 --- a/core/analytics/integration/execution_integration.py +++ b/core/analytics/integration/execution_integration.py @@ -1,8 +1,8 @@ """Integration of analytics with ExecutionLoop.""" import logging -from typing import Optional -from datetime import datetime +from typing import Any, Optional +from datetime import datetime, timedelta import uuid from ..analytics_system import get_analytics_system @@ -14,17 +14,35 @@ logger = logging.getLogger(__name__) class AnalyticsExecutionIntegration: """Integrate analytics collection with workflow execution.""" - def __init__(self, enabled: bool = True): + def __init__(self, analytics_system: Any = True, enabled: Optional[bool] = None): """ Initialize analytics integration. - + + Accepte deux formes d'appel pour la rétrocompatibilité : + - ``AnalyticsExecutionIntegration(enabled=True)`` → auto-load du système + - ``AnalyticsExecutionIntegration(analytics_system_instance)`` → + utilise l'instance fournie (utilisé par ExecutionLoop) + Args: - enabled: Whether analytics collection is enabled + analytics_system: Instance d'AnalyticsSystem pré-construite, ou + True/False pour activer/désactiver (legacy). + enabled: Legacy — si défini, prime sur analytics_system. """ - self.enabled = enabled - self.analytics = None - - if enabled: + # Détection de la forme d'appel + if enabled is not None: + # Appel legacy explicite: AnalyticsExecutionIntegration(enabled=...) + self.enabled = bool(enabled) + self.analytics = None + elif isinstance(analytics_system, bool): + # Appel legacy: AnalyticsExecutionIntegration(True/False) + self.enabled = analytics_system + self.analytics = None + else: + # Nouvelle forme: instance injectée + self.enabled = analytics_system is not None + self.analytics = analytics_system + + if self.enabled and self.analytics is None: try: self.analytics = get_analytics_system() logger.info("Analytics integration enabled") @@ -36,37 +54,50 @@ class AnalyticsExecutionIntegration: self, workflow_id: str, execution_id: Optional[str] = None, - total_steps: int = 0 + total_steps: int = 0, + mode: Optional[str] = None, ) -> str: """ - Called when workflow execution starts. - + Appelé au démarrage d'une exécution de workflow. + Args: - workflow_id: Workflow identifier - execution_id: Execution identifier (generated if None) - total_steps: Total number of steps - + workflow_id: Identifiant du workflow + execution_id: Identifiant d'exécution (généré si None) + total_steps: Nombre total d'étapes prévues + mode: Mode d'exécution (OBSERVATION / COACHING / SUPERVISED / + AUTOMATIC). Propagé en contexte pour MetricsCollector. + Returns: - Execution ID + Identifiant d'exécution (celui fourni ou nouvellement généré). """ if not self.enabled or not self.analytics: return execution_id or str(uuid.uuid4()) - + if execution_id is None: execution_id = str(uuid.uuid4()) - + try: - # Start real-time tracking + # Démarrage du tracking temps réel self.analytics.realtime_analytics.track_execution( execution_id=execution_id, workflow_id=workflow_id, - total_steps=total_steps + total_steps=total_steps, ) - + + # Ouverture de l'ExecutionMetrics côté collector (état "running"). + # Cela permet à `on_execution_complete` d'appeler + # `record_execution_complete` qui clôture proprement. + context = {"mode": mode} if mode else {} + self.analytics.metrics_collector.record_execution_start( + execution_id=execution_id, + workflow_id=workflow_id, + context=context, + ) + logger.debug(f"Started tracking execution: {execution_id}") except Exception as e: logger.error(f"Error starting execution tracking: {e}") - + return execution_id def on_step_start( @@ -101,110 +132,249 @@ class AnalyticsExecutionIntegration: execution_id: str, workflow_id: str, node_id: str, - action_type: str, - started_at: datetime, - completed_at: datetime, - duration: float, + *, + duration_ms: float, success: bool, - error_message: Optional[str] = None + action_type: str = "", + started_at: Optional[datetime] = None, + completed_at: Optional[datetime] = None, + error_message: Optional[str] = None, + confidence: float = 0.0, + target_element: str = "", + retry_count: int = 0, + ocr_ms: float = 0.0, + ui_ms: float = 0.0, + analyze_ms: float = 0.0, + total_ms: float = 0.0, + cache_hit: bool = False, + degraded: bool = False, + step_id: Optional[str] = None, ) -> None: """ - Called when a step completes. - + Appelé à la fin d'un step. + + Contrat normalisé (Lot A — avril 2026) : ``duration_ms`` est + obligatoire et en millisecondes. Plus de rétrocompat silencieuse + sur ``duration`` en secondes. + Args: - execution_id: Execution identifier - workflow_id: Workflow identifier - node_id: Node identifier - action_type: Type of action - started_at: Start timestamp - completed_at: Completion timestamp - duration: Duration in seconds - success: Whether step succeeded - error_message: Error message if failed + execution_id: Identifiant d'exécution + workflow_id: Identifiant du workflow + node_id: Identifiant du node + duration_ms: Durée du step en millisecondes (obligatoire) + success: Vrai si le step a réussi + action_type: Type d'action (``click``, ``type``, …) + started_at: Timestamp de début (déduit de duration_ms si None) + completed_at: Timestamp de fin (``now()`` si None) + error_message: Message d'erreur si ``success=False`` + confidence: Score de matching [0, 1] + target_element: Élément ciblé (optionnel) + retry_count: Nombre de retries + ocr_ms: Temps OCR (C1) + ui_ms: Temps détection UI (C1) + analyze_ms: Temps analyse ScreenState (C1) + total_ms: Temps total du step (C1, alias duration_ms) + cache_hit: ScreenState depuis cache perceptuel (C1) + degraded: Mode dégradé activé (C1) + step_id: ID unique du step (généré si None) """ if not self.enabled or not self.analytics: return - + try: - # Record step metrics + duration_ms_final = float(duration_ms) + + # Normaliser les timestamps + if completed_at is None: + completed_at = datetime.now() + if started_at is None: + started_at = completed_at - timedelta(milliseconds=duration_ms_final) + step_metrics = StepMetrics( + step_id=step_id or f"{execution_id}:{node_id}:{completed_at.isoformat()}", execution_id=execution_id, workflow_id=workflow_id, node_id=node_id, - action_type=action_type, + action_type=action_type or "unknown", + target_element=target_element, started_at=started_at, completed_at=completed_at, - duration=duration, - success=success, - error_message=error_message + duration_ms=duration_ms_final, + status="completed" if success else "failed", + confidence_score=float(confidence), + retry_count=retry_count, + error_details=error_message, + # C1 — vision-aware + ocr_ms=float(ocr_ms or 0.0), + ui_ms=float(ui_ms or 0.0), + analyze_ms=float(analyze_ms or 0.0), + total_ms=float(total_ms or duration_ms_final), + cache_hit=bool(cache_hit), + degraded=bool(degraded), ) - + self.analytics.metrics_collector.record_step(step_metrics) - - # Update real-time tracking - self.analytics.realtime_analytics.record_step_complete( - execution_id=execution_id, - success=success + + # Tracking temps réel + try: + self.analytics.realtime_analytics.record_step_complete( + execution_id=execution_id, + success=success, + ) + except Exception as rt_err: + logger.debug(f"Realtime tracking skipped: {rt_err}") + + logger.debug( + f"Recorded step: {node_id} " + f"({'success' if success else 'failed'}, " + f"analyze_ms={analyze_ms:.0f}, cache_hit={cache_hit}, " + f"degraded={degraded})" ) - - logger.debug(f"Recorded step: {node_id} ({'success' if success else 'failed'})") except Exception as e: logger.error(f"Error recording step completion: {e}") + + def on_step_result( + self, + execution_id: str, + workflow_id: str, + step_result: Any, + ) -> None: + """ + Raccourci C1 — enregistre un `StepResult` complet. + + Évite aux appelants d'extraire manuellement les champs vision-aware. + Utilisé par ExecutionLoop pour pousser StepResult au système analytics. + + Args: + execution_id: Identifiant d'exécution + workflow_id: Identifiant de workflow + step_result: Instance de `core.execution.execution_loop.StepResult` + """ + if not self.enabled or not self.analytics: + return + + action_type = "unknown" + try: + if getattr(step_result, "action_result", None) is not None: + ar = step_result.action_result + # ExecutionResult.action est optionnel selon la branche + action_type = ( + getattr(ar, "action_type", None) + or getattr(ar, "action", None) + or "unknown" + ) + except Exception: + action_type = "unknown" + + self.on_step_complete( + execution_id=execution_id, + workflow_id=workflow_id, + node_id=getattr(step_result, "node_id", "unknown"), + action_type=str(action_type), + success=bool(getattr(step_result, "success", False)), + error_message=None + if getattr(step_result, "success", False) + else getattr(step_result, "message", None), + duration_ms=float(getattr(step_result, "duration_ms", 0.0) or 0.0), + confidence=float(getattr(step_result, "match_confidence", 0.0) or 0.0), + ocr_ms=float(getattr(step_result, "ocr_ms", 0.0) or 0.0), + ui_ms=float(getattr(step_result, "ui_ms", 0.0) or 0.0), + analyze_ms=float(getattr(step_result, "analyze_ms", 0.0) or 0.0), + total_ms=float(getattr(step_result, "total_ms", 0.0) or 0.0), + cache_hit=bool(getattr(step_result, "cache_hit", False)), + degraded=bool(getattr(step_result, "degraded", False)), + ) def on_execution_complete( self, execution_id: str, workflow_id: str, - started_at: datetime, - completed_at: datetime, - duration: float, + *, + duration_ms: float, status: str, - error_message: Optional[str] = None, + steps_total: Optional[int] = None, steps_completed: int = 0, - steps_failed: int = 0 + steps_failed: int = 0, + error_message: Optional[str] = None, ) -> None: """ - Called when workflow execution completes. - + Appelé à la fin d'une exécution de workflow. + + Contrat normalisé (Lot A — avril 2026) : + - ``duration_ms`` en millisecondes, toujours. Plus de rétrocompat + silencieuse sur ``duration`` en secondes. + - ``status`` est une chaîne libre (``"completed"``, ``"failed"``, + ``"stopped"``, ``"timeout"``, …). L'appelant décide. + - ``steps_total`` / ``steps_completed`` / ``steps_failed`` : noms + alignés sur le dataclass ``ExecutionMetrics``. Si ``steps_total`` + n'est pas fourni, on le déduit par somme. + Args: - execution_id: Execution identifier - workflow_id: Workflow identifier - started_at: Start timestamp - completed_at: Completion timestamp - duration: Duration in seconds - status: Final status (success, failed, timeout) - error_message: Error message if failed - steps_completed: Number of steps completed - steps_failed: Number of steps failed + execution_id: Identifiant d'exécution + workflow_id: Identifiant du workflow + duration_ms: Durée totale en millisecondes + status: Statut final (``"completed"`` / ``"failed"`` / ``"stopped"``) + steps_total: Nombre total de steps exécutés (tous statuts confondus) + steps_completed: Nombre de steps réussis + steps_failed: Nombre de steps en échec + error_message: Message d'erreur si ``status != "completed"`` """ if not self.enabled or not self.analytics: return - + + # steps_total dérivé si non fourni explicitement + if steps_total is None: + steps_total = int(steps_completed) + int(steps_failed) + try: - # Record execution metrics - execution_metrics = ExecutionMetrics( - execution_id=execution_id, - workflow_id=workflow_id, - started_at=started_at, - completed_at=completed_at, - duration=duration, - status=status, - error_message=error_message, - steps_completed=steps_completed, - steps_failed=steps_failed - ) - - self.analytics.metrics_collector.record_execution(execution_metrics) - - # Flush to ensure persistence - self.analytics.metrics_collector.flush() - - # Complete real-time tracking + collector = self.analytics.metrics_collector + + # record_execution_complete clôture proprement un ExecutionMetrics + # ouvert par record_execution_start (chemin nominal via + # on_execution_start). Si l'état n'est pas présent (tests, legacy), + # on pousse un ExecutionMetrics synthétique directement. + completed_at = datetime.now() + started_at = completed_at - timedelta(milliseconds=float(duration_ms)) + + active = getattr(collector, "_active_executions", None) + if active is not None and execution_id in active: + collector.record_execution_complete( + execution_id=execution_id, + status=status, + steps_total=int(steps_total), + steps_completed=int(steps_completed), + steps_failed=int(steps_failed), + error_message=error_message, + ) + else: + # Fallback explicite : on construit directement un ExecutionMetrics + # aligné sur le dataclass (duration_ms, status, steps_*). + execution_metrics = ExecutionMetrics( + execution_id=execution_id, + workflow_id=workflow_id, + started_at=started_at, + completed_at=completed_at, + duration_ms=float(duration_ms), + status=status, + steps_total=int(steps_total), + steps_completed=int(steps_completed), + steps_failed=int(steps_failed), + error_message=error_message, + ) + # Le collector n'expose pas record_execution(...) : on pousse + # dans le buffer protégé par lock pour rester cohérent. + with collector._lock: + collector._buffer.append(execution_metrics) + + # Flush pour garantir la persistance immédiate + collector.flush() + + # Clôture du tracking temps réel self.analytics.realtime_analytics.complete_execution( execution_id=execution_id, - status=status + status=status, ) - + logger.info(f"Recorded execution: {execution_id} ({status})") except Exception as e: logger.error(f"Error recording execution completion: {e}") @@ -216,39 +386,54 @@ class AnalyticsExecutionIntegration: node_id: str, strategy: str, success: bool, - duration: float + duration_ms: float, ) -> None: """ - Called when self-healing attempts recovery. - + Appelé quand le self-healing tente une récupération. + + Contrat normalisé (Lot A — avril 2026) : ``duration_ms`` en + millisecondes, cohérent avec ``on_execution_complete`` et + ``on_step_complete``. Le StepMetrics construit respecte strictement + le dataclass (``status``, ``duration_ms``, ``error_details``, + ``confidence_score``, ``target_element``, ``step_id``). + Args: - execution_id: Execution identifier - workflow_id: Workflow identifier - node_id: Node identifier - strategy: Recovery strategy used - success: Whether recovery succeeded - duration: Recovery duration + execution_id: Identifiant d'exécution + workflow_id: Identifiant du workflow + node_id: Node où la récupération est tentée + strategy: Stratégie de récupération employée + success: Vrai si la récupération a réussi + duration_ms: Durée de la tentative en millisecondes """ if not self.enabled or not self.analytics: return - + try: - # Record as a special step metric + now = datetime.now() + started_at = now - timedelta(milliseconds=float(duration_ms)) + recovery_metrics = StepMetrics( + step_id=f"{execution_id}:{node_id}:recovery:{now.isoformat()}", execution_id=execution_id, workflow_id=workflow_id, node_id=f"{node_id}_recovery", action_type=f"recovery_{strategy}", - started_at=datetime.now(), - completed_at=datetime.now(), - duration=duration, - success=success, - error_message=None if success else f"Recovery failed: {strategy}" + target_element="", + started_at=started_at, + completed_at=now, + duration_ms=float(duration_ms), + status="completed" if success else "failed", + confidence_score=0.0, + retry_count=0, + error_details=None if success else f"Recovery failed: {strategy}", ) - + self.analytics.metrics_collector.record_step(recovery_metrics) - - logger.debug(f"Recorded recovery: {strategy} ({'success' if success else 'failed'})") + + logger.debug( + f"Recorded recovery: {strategy} " + f"({'success' if success else 'failed'})" + ) except Exception as e: logger.error(f"Error recording recovery attempt: {e}") diff --git a/core/analytics/storage/timeseries_store.py b/core/analytics/storage/timeseries_store.py index c731c1065..bb8dcf59c 100644 --- a/core/analytics/storage/timeseries_store.py +++ b/core/analytics/storage/timeseries_store.py @@ -42,6 +42,8 @@ class TimeSeriesStore: ON execution_metrics(started_at); -- Step metrics table + -- Les colonnes ocr_ms, ui_ms, analyze_ms, total_ms, cache_hit, degraded + -- proviennent de l'instrumentation vision-aware (C1) de ExecutionLoop. CREATE TABLE IF NOT EXISTS step_metrics ( step_id TEXT PRIMARY KEY, execution_id TEXT NOT NULL, @@ -56,6 +58,12 @@ class TimeSeriesStore: confidence_score REAL, retry_count INTEGER DEFAULT 0, error_details TEXT, + ocr_ms REAL DEFAULT 0.0, + ui_ms REAL DEFAULT 0.0, + analyze_ms REAL DEFAULT 0.0, + total_ms REAL DEFAULT 0.0, + cache_hit INTEGER DEFAULT 0, + degraded INTEGER DEFAULT 0, FOREIGN KEY (execution_id) REFERENCES execution_metrics(execution_id) ); @@ -101,11 +109,40 @@ class TimeSeriesStore: logger.info(f"TimeSeriesStore initialized at {self.db_path}") + # Colonnes ajoutées ultérieurement — appliquées via ALTER TABLE si absentes. + # (C1 — instrumentation vision-aware, avril 2026) + _STEP_METRICS_MIGRATIONS = [ + ("ocr_ms", "REAL DEFAULT 0.0"), + ("ui_ms", "REAL DEFAULT 0.0"), + ("analyze_ms", "REAL DEFAULT 0.0"), + ("total_ms", "REAL DEFAULT 0.0"), + ("cache_hit", "INTEGER DEFAULT 0"), + ("degraded", "INTEGER DEFAULT 0"), + ] + def _init_database(self) -> None: - """Initialize database schema.""" + """Initialize database schema and apply lightweight migrations.""" with self._get_connection() as conn: conn.executescript(self.SCHEMA) + self._migrate_step_metrics(conn) conn.commit() + + def _migrate_step_metrics(self, conn: sqlite3.Connection) -> None: + """Ajoute les colonnes C1 sur une base `step_metrics` pré-existante.""" + cursor = conn.execute("PRAGMA table_info(step_metrics)") + existing = {row[1] for row in cursor.fetchall()} + for column, ddl in self._STEP_METRICS_MIGRATIONS: + if column not in existing: + try: + conn.execute( + f"ALTER TABLE step_metrics ADD COLUMN {column} {ddl}" + ) + logger.info( + f"Migration step_metrics: ajout colonne {column}" + ) + except sqlite3.OperationalError as e: + # Collision bénigne (colonne déjà ajoutée par un autre process) + logger.debug(f"Migration colonne {column} ignorée: {e}") @contextmanager def _get_connection(self): @@ -164,13 +201,14 @@ class TimeSeriesStore: )) def _write_step_metric(self, conn: sqlite3.Connection, metric: StepMetrics) -> None: - """Write step metric.""" + """Write step metric (inclut les champs vision-aware C1).""" conn.execute(""" INSERT OR REPLACE INTO step_metrics (step_id, execution_id, workflow_id, node_id, action_type, target_element, started_at, completed_at, duration_ms, status, confidence_score, - retry_count, error_details) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + retry_count, error_details, + ocr_ms, ui_ms, analyze_ms, total_ms, cache_hit, degraded) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( metric.step_id, metric.execution_id, @@ -184,7 +222,13 @@ class TimeSeriesStore: metric.status, metric.confidence_score, metric.retry_count, - metric.error_details + metric.error_details, + getattr(metric, 'ocr_ms', 0.0), + getattr(metric, 'ui_ms', 0.0), + getattr(metric, 'analyze_ms', 0.0), + getattr(metric, 'total_ms', 0.0), + 1 if getattr(metric, 'cache_hit', False) else 0, + 1 if getattr(metric, 'degraded', False) else 0, )) def _write_resource_metric(self, conn: sqlite3.Connection, metric: ResourceMetrics) -> None: diff --git a/core/pipeline/workflow_pipeline.py b/core/pipeline/workflow_pipeline.py index ff49061b6..0010d4164 100644 --- a/core/pipeline/workflow_pipeline.py +++ b/core/pipeline/workflow_pipeline.py @@ -354,66 +354,306 @@ class WorkflowPipeline: # ========================================================================= # Mode MATCHING : Reconnaissance de l'état actuel # ========================================================================= - + + def match_current_state_from_state( + self, + screen_state: ScreenState, + workflow_id: Optional[str] = None, + *, + min_similarity: float = 0.5, + ) -> Optional[Dict[str, Any]]: + """ + Matcher un ``ScreenState`` enrichi contre les nodes d'un workflow. + + Lot E — premier vrai matching context-aware. Cette méthode consomme + directement le ``ScreenState`` déjà construit par ``ExecutionLoop`` + (avec ``window_title``, ``detected_text`` et ``ui_elements`` + renseignés par le ``ScreenAnalyzer``) au lieu de reconstruire un + stub vide avec ``window_title="Unknown"``. + + Stratégie : + 1. Si le ``HierarchicalMatcher`` est disponible ET que le workflow + cible est chargeable, on privilégie le matching multi-niveau + (fenêtre → région → élément) qui exploite pleinement les + ``ui_elements`` et le ``window_title``. + 2. Sinon on retombe sur le matching par embedding via FAISS + (même logique que l'ancien ``match_current_state``, mais avec + le ``ScreenState`` fourni, pas un stub). + + Args: + screen_state: ``ScreenState`` complet (ui_elements + detected_text + + window_info) construit en amont par l'``ExecutionLoop``. + workflow_id: ID du workflow cible (tous si None). + min_similarity: seuil minimum de confidence pour considérer un + match valide. Conserve la sémantique historique (0.5 pour + le hiérarchique, 0.85 pour le FAISS fallback). + + Returns: + Dict avec ``node_id``, ``workflow_id``, ``confidence`` (+ détails + du matching hiérarchique si applicable), ou ``None`` si aucun + match ne dépasse le seuil. + """ + logger.debug( + "Matching ScreenState (app=%s, title=%s, ui_elements=%d, " + "detected_text=%d)", + screen_state.window.app_name, + screen_state.window.window_title, + len(screen_state.ui_elements), + len(screen_state.perception.detected_text), + ) + + # --- Stratégie 1 : matching hiérarchique si workflow disponible --- + if workflow_id: + workflow = self.load_workflow(workflow_id) + if workflow is not None and getattr(workflow, "nodes", None): + try: + hier_result = self._match_hierarchical_from_state( + screen_state=screen_state, + workflow=workflow, + workflow_id=workflow_id, + min_similarity=min_similarity, + ) + if hier_result is not None: + return hier_result + except Exception as exc: + # Ne jamais casser le matching sur une erreur du + # matcher hiérarchique : on retombe sur FAISS. + logger.debug( + f"Hierarchical matching failed, fallback FAISS: {exc}" + ) + + # --- Stratégie 2 : fallback embedding + FAISS --- + return self._match_via_faiss( + screen_state=screen_state, + workflow_id=workflow_id, + min_similarity=min_similarity, + ) + + def _match_hierarchical_from_state( + self, + screen_state: ScreenState, + workflow: Workflow, + workflow_id: str, + min_similarity: float, + ) -> Optional[Dict[str, Any]]: + """ + Déléguer le matching au ``HierarchicalMatcher`` en extrayant + ``window_info``, ``detected_elements`` et le screenshot à partir du + ``ScreenState`` fourni. Factorise la logique de ``match_hierarchical`` + sans re-ouvrir l'image si ce n'est pas nécessaire. + """ + # Reconstruire window_info à partir du ScreenState (pas "Unknown") + window_info = { + "title": screen_state.window.window_title, + "app_name": screen_state.window.app_name, + "window_title": screen_state.window.window_title, + } + detected_elements = list(screen_state.ui_elements) + + # Ouvrir le screenshot si nécessaire (le matcher peut en avoir besoin + # pour du matching au niveau région). Si le chemin n'existe pas, on + # passe None et laisse le matcher travailler avec window + elements. + screenshot = None + path = screen_state.raw.screenshot_path + if path: + try: + from PIL import Image + screenshot = Image.open(path) + except Exception as exc: + logger.debug(f"Screenshot unavailable for hierarchical match: {exc}") + + # Contexte temporel par workflow + if workflow_id not in self._temporal_context: + self._temporal_context[workflow_id] = TemporalContext() + temporal_context = self._temporal_context[workflow_id] + + result: MatchResult = self.hierarchical_matcher.match( + screenshot=screenshot, + workflow=workflow, + window_info=window_info, + detected_elements=detected_elements, + temporal_context=temporal_context, + ) + + if result.confidence < min_similarity: + logger.debug( + f"Hierarchical match below threshold: {result.confidence:.3f} " + f"(min={min_similarity})" + ) + return None + + # Mémoriser le match pour le boost temporel suivant + temporal_context.add_match(result.node_id, result.confidence) + + return { + "node_id": result.node_id, + "workflow_id": workflow_id, + "confidence": result.confidence, + "window_confidence": result.window_confidence, + "region_confidence": result.region_confidence, + "element_confidence": result.element_confidence, + "temporal_boost": result.temporal_boost, + "matched_variant": result.matched_variant, + "alternatives": [ + {"node_id": alt.node_id, "confidence": alt.confidence} + for alt in result.alternatives + ], + "match_time_ms": result.match_time_ms, + "match_type": "hierarchical", + } + + def _match_via_faiss( + self, + screen_state: ScreenState, + workflow_id: Optional[str], + min_similarity: float, + ) -> Optional[Dict[str, Any]]: + """ + Fallback embedding + recherche FAISS. On réutilise le ``ScreenState`` + fourni (donc ses ``ui_elements`` et son ``window_title`` réels) + au lieu d'en recréer un stub. + """ + # Le seuil FAISS historique était 0.85. On l'honore comme plancher + # par défaut mais on respecte un ``min_similarity`` plus permissif + # si l'appelant en fournit un (hiérarchique pouvant déjà avoir échoué). + threshold = max(min_similarity, 0.85) + + state_embedding = self.embedding_builder.build(screen_state) + query_vector = state_embedding.get_vector() + + results = self.faiss_manager.search(query_vector, k=5) + if not results: + logger.debug("No match found in FAISS") + return None + + for result in results: + metadata = result.get("metadata", {}) + result_workflow_id = metadata.get("workflow_id") + + if workflow_id and result_workflow_id != workflow_id: + continue + + similarity = result.get("similarity", 0) + if similarity >= threshold: + return { + "node_id": metadata.get("node_id"), + "workflow_id": result_workflow_id, + "confidence": similarity, + "state_embedding_id": state_embedding.embedding_id, + "match_type": "faiss", + } + + logger.debug( + f"Best FAISS match below threshold: " + f"{results[0].get('similarity', 0):.3f} (min={threshold})" + ) + return None + def match_current_state( self, screenshot_path: str, workflow_id: Optional[str] = None, - window_title: Optional[str] = None + window_title: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ - Identifier dans quel node se trouve l'écran actuel. - + Identifier dans quel node se trouve l'écran actuel (API legacy). + + Lot E — cette méthode est désormais un **wrapper** de rétrocompat : + elle construit un ``ScreenState`` enrichi via ``ScreenAnalyzer`` + (au lieu d'un stub avec ``window_title="Unknown"``) puis délègue + à ``match_current_state_from_state``. Garantit la compat pour les + callers externes qui ne manipulent que le chemin du screenshot. + Args: - screenshot_path: Chemin vers le screenshot actuel - workflow_id: ID du workflow à matcher (tous si None) - window_title: Titre de fenêtre pour contexte - + screenshot_path: Chemin vers le screenshot actuel. + workflow_id: ID du workflow à matcher (tous si None). + window_title: Titre de fenêtre pour contexte (utilisé comme + hint si le ScreenAnalyzer n'est pas disponible). + Returns: - Dict avec node_id, workflow_id, confidence, ou None si pas de match + Dict avec ``node_id``, ``workflow_id``, ``confidence``, ou + ``None`` si pas de match. """ logger.debug(f"Matching screenshot: {screenshot_path}") - - # Créer un ScreenState temporaire + + # Construire un ScreenState enrichi via le ScreenAnalyzer partagé. + screen_state = self._build_screen_state_for_matching( + screenshot_path=screenshot_path, + workflow_id=workflow_id, + window_title=window_title, + ) + + return self.match_current_state_from_state( + screen_state=screen_state, + workflow_id=workflow_id, + ) + + def _build_screen_state_for_matching( + self, + screenshot_path: str, + workflow_id: Optional[str], + window_title: Optional[str], + ) -> ScreenState: + """ + Construire un ``ScreenState`` pour l'API legacy ``match_current_state``. + + Tente d'utiliser le ``ScreenAnalyzer`` partagé ; en cas d'échec, + retombe sur un stub minimaliste (équivalent fonctionnel de l'ancien + comportement, mais clairement isolé ici). + """ from core.models.screen_state import ( WindowContext, RawLevel, PerceptionLevel, ContextLevel, EmbeddingRef ) - - screenshot_path = Path(screenshot_path) - + + path = Path(screenshot_path) + + # Tentative 1 : ScreenAnalyzer partagé (résultat enrichi) + try: + from core.pipeline import get_screen_analyzer + analyzer = get_screen_analyzer() + if analyzer is not None: + window_info = None + if window_title: + window_info = {"title": window_title, "app_name": "unknown"} + return analyzer.analyze( + str(path), + window_info=window_info, + ) + except Exception as exc: + logger.debug( + f"ScreenAnalyzer unavailable in match_current_state wrapper: {exc}" + ) + + # Tentative 2 : stub minimal (comportement legacy d'urgence) window = WindowContext( app_name="unknown", window_title=window_title or "Unknown", screen_resolution=[1920, 1080], - workspace="main" + workspace="main", ) - raw = RawLevel( - screenshot_path=str(screenshot_path), + screenshot_path=str(path), capture_method="manual", - file_size_bytes=screenshot_path.stat().st_size if screenshot_path.exists() else 0 + file_size_bytes=path.stat().st_size if path.exists() else 0, ) - perception = PerceptionLevel( embedding=EmbeddingRef( provider="openclip_ViT-B-32", vector_id="temp", - dimensions=512 + dimensions=512, ), detected_text=[], text_detection_method="pending", - confidence_avg=0.0 + confidence_avg=0.0, ) - context = ContextLevel( current_workflow_candidate=workflow_id, workflow_step=None, user_id="matcher", tags=[], - business_variables={} + business_variables={}, ) - - current_state = ScreenState( + return ScreenState( screen_state_id=f"match_{datetime.now().strftime('%Y%m%d_%H%M%S')}", timestamp=datetime.now(), session_id="matching", @@ -421,39 +661,8 @@ class WorkflowPipeline: raw=raw, perception=perception, context=context, - ui_elements=[] + ui_elements=[], ) - - # Calculer embedding - state_embedding = self.embedding_builder.build(current_state) - query_vector = state_embedding.get_vector() - - # Rechercher dans FAISS - results = self.faiss_manager.search(query_vector, k=5) - - if not results: - logger.debug("No match found in FAISS") - return None - - # Filtrer par workflow si spécifié - for result in results: - metadata = result.get("metadata", {}) - result_workflow_id = metadata.get("workflow_id") - - if workflow_id and result_workflow_id != workflow_id: - continue - - similarity = result.get("similarity", 0) - if similarity >= 0.85: # Seuil de matching - return { - "node_id": metadata.get("node_id"), - "workflow_id": result_workflow_id, - "confidence": similarity, - "state_embedding_id": state_embedding.embedding_id - } - - logger.debug(f"Best match below threshold: {results[0].get('similarity', 0):.3f}") - return None def match_hierarchical( self, @@ -548,17 +757,56 @@ class WorkflowPipeline: def get_next_action( self, workflow_id: str, - current_node_id: str - ) -> Optional[Dict[str, Any]]: + current_node_id: str, + screen_state: Optional[ScreenState] = None, + strategy: str = "best", + source_similarity: float = 1.0, + ) -> Dict[str, Any]: """ Obtenir la prochaine action à exécuter. - + + Contrat normalisé (Lot A — avril 2026) : retourne **toujours** un + dict avec une clé ``status`` non-ambiguë. Le ``None`` ambigu qui + confondait "workflow terminé" et "aucun edge valide" a été + supprimé : l'appelant (ExecutionLoop) peut désormais distinguer + ces cas pour déclencher une pause supervisée plutôt qu'une fin + de workflow faux-positive. + + Sélection d'edge (C3) : + - Filtre dur sur ``pre_conditions`` (EdgeConstraints) + - Ranking par score composite (success_rate, target_match, recency) + - Tiebreak : success_rate le plus haut + Args: workflow_id: ID du workflow current_node_id: ID du node actuel - + screen_state: État courant, requis pour évaluer les + ``pre_conditions`` et le match ``target_spec``. Si None, + fallback sur la logique sans filtre de contraintes. + strategy: ``"best"`` (défaut, scoring complet) ou ``"first"`` + (mode legacy, premier edge sans tri) + source_similarity: confiance du matching (``match_current_state``) + qui a identifié ``current_node_id``. Propagée à l'EdgeScorer + pour activer la précondition ``min_source_similarity`` des + edges. Défaut ``1.0`` pour compat avec les appelants qui + ne la fournissent pas encore (Lot B — avril 2026). + Returns: - Dict avec action, target_node, confidence, ou None + Dict avec l'une des formes suivantes : + + - ``{"status": "selected", "edge_id": str, "action": dict, + "target_node": str, "confidence": float, "score": float}`` + → edge sélectionné, l'ExecutionLoop doit l'exécuter. + + - ``{"status": "terminal"}`` → le node courant n'a pas + d'outgoing_edge (fin légitime de workflow). + + - ``{"status": "blocked", "reason": str}`` → il existe des + outgoing_edges mais aucun ne satisfait les conditions + (``reason="no_valid_edge"``), ou le workflow est introuvable + (``reason="workflow_not_found"``). L'ExecutionLoop doit + déclencher une pause supervisée et ne **jamais** traiter + ce cas comme un succès. """ workflow = self._workflows.get(workflow_id) if not workflow: @@ -569,23 +817,44 @@ class WorkflowPipeline: self._workflows[workflow_id] = workflow else: logger.error(f"Workflow not found: {workflow_id}") - return None - + return {"status": "blocked", "reason": "workflow_not_found"} + # Trouver les edges sortants du node actuel outgoing_edges = workflow.get_outgoing_edges(current_node_id) - + if not outgoing_edges: + # Aucun outgoing_edge = fin légitime du workflow logger.info(f"No outgoing edges from node {current_node_id}") - return None - - # Pour l'instant, prendre le premier edge (TODO: logique de sélection) - edge = outgoing_edges[0] - + return {"status": "terminal"} + + # Sélection robuste via EdgeScorer (C3) + from core.pipeline.edge_scorer import EdgeScorer + + scorer = EdgeScorer() + edge = scorer.select_best( + outgoing_edges, + screen_state=screen_state, + strategy=strategy, + source_similarity=source_similarity, + ) + + if edge is None: + # Il y avait des candidats mais aucun n'a passé les filtres. + # On NE retourne PAS "terminal" : l'ExecutionLoop doit traiter + # ce cas comme un blocage et demander de l'aide. + logger.warning( + f"No valid edge from {current_node_id} " + f"({len(outgoing_edges)} candidates rejected)" + ) + return {"status": "blocked", "reason": "no_valid_edge"} + return { + "status": "selected", "edge_id": edge.edge_id, "action": edge.action.to_dict(), "target_node": edge.to_node, - "confidence": edge.stats.success_rate if edge.stats else 1.0 + "confidence": edge.stats.success_rate if edge.stats else 1.0, + "score": edge.stats.success_rate if edge.stats else 1.0, } def should_execute_automatically(self, workflow_id: str) -> bool: @@ -759,10 +1028,11 @@ class WorkflowPipeline: current_node_id = match_result["node_id"] logger.info(f"Matched current state to node: {current_node_id} (confidence: {match_result['confidence']:.3f})") - # 2. Obtenir la prochaine action + # 2. Obtenir la prochaine action (contrat dict avec status explicite) action_info = self.get_next_action(workflow_id, current_node_id) - - if not action_info: + action_status = action_info.get("status") + + if action_status == "terminal": return { "execution_id": execution_id, "workflow_id": workflow_id, @@ -771,9 +1041,21 @@ class WorkflowPipeline: "message": "Workflow completed - no more actions", "current_node": current_node_id, "execution_time_ms": (datetime.now() - start_time).total_seconds() * 1000, - "correlation_id": execution_id + "correlation_id": execution_id, } - + + if action_status == "blocked": + return { + "execution_id": execution_id, + "workflow_id": workflow_id, + "success": False, + "step_type": "action_selection", + "error": f"No valid edge: {action_info.get('reason', 'unknown')}", + "current_node": current_node_id, + "execution_time_ms": (datetime.now() - start_time).total_seconds() * 1000, + "correlation_id": execution_id, + } + logger.info(f"Next action: {action_info['action']['type']} -> {action_info['target_node']}") # 3. Charger le workflow pour obtenir l'edge complet diff --git a/core/pipeline/workflow_pipeline_enhanced.py b/core/pipeline/workflow_pipeline_enhanced.py index 66aa54533..6b7e6c803 100644 --- a/core/pipeline/workflow_pipeline_enhanced.py +++ b/core/pipeline/workflow_pipeline_enhanced.py @@ -125,25 +125,47 @@ class WorkflowPipelineEnhanced: current_node_id = match_result["node_id"] logger.info(f"Matched current state to node: {current_node_id} (confidence: {match_result['confidence']:.3f})") - # 2. Obtenir la prochaine action + # 2. Obtenir la prochaine action (contrat dict avec status explicite) action_info = self.get_next_action(workflow_id, current_node_id) - - if not action_info: - # Workflow terminé + action_status = action_info.get("status") + + if action_status == "terminal": + # Workflow terminé (aucun outgoing_edge = fin légitime) performance_metrics.total_execution_time_ms = (datetime.now() - start_time).total_seconds() * 1000 - + result = WorkflowExecutionResult.workflow_complete( execution_id=execution_id, workflow_id=workflow_id, current_node=current_node_id, - performance_metrics=performance_metrics + performance_metrics=performance_metrics, ) result.correlation_id = correlation_id result.match_result = match_result - + logger.info(f"Workflow {workflow_id} completed at node {current_node_id}") return result - + + if action_status == "blocked": + # Des edges existent mais aucun ne passe les filtres : + # c'est un blocage, pas une fin de workflow. + performance_metrics.total_execution_time_ms = (datetime.now() - start_time).total_seconds() * 1000 + + result = WorkflowExecutionResult.error( + execution_id=execution_id, + workflow_id=workflow_id, + error_message=f"No valid edge: {action_info.get('reason', 'unknown')}", + step_type="action_selection", + current_node=current_node_id, + performance_metrics=performance_metrics, + ) + result.correlation_id = correlation_id + + logger.warning( + f"Workflow {workflow_id} blocked at node {current_node_id}: " + f"{action_info.get('reason')}" + ) + return result + logger.info(f"Next action: {action_info['action']['type']} -> {action_info['target_node']}") # 3. Charger le workflow pour obtenir l'edge complet diff --git a/tests/integration/test_workflow_pipeline_enhanced.py b/tests/integration/test_workflow_pipeline_enhanced.py index b9d96de3f..b238618e7 100644 --- a/tests/integration/test_workflow_pipeline_enhanced.py +++ b/tests/integration/test_workflow_pipeline_enhanced.py @@ -96,14 +96,16 @@ class TestWorkflowPipelineEnhanced: "confidence": 0.92 } - # Mock de l'action suivante + # Mock de l'action suivante (contrat dict normalisé Lot A) mock_workflow_pipeline.get_next_action.return_value = { + "status": "selected", "edge_id": "edge_1", "action": {"type": "click", "target": "button"}, "target_node": "node_2", - "confidence": 0.95 + "confidence": 0.95, + "score": 0.95, } - + # Mock du workflow mock_workflow = Mock(spec=Workflow) mock_edge = Mock(spec=WorkflowEdge) @@ -112,7 +114,7 @@ class TestWorkflowPipelineEnhanced: mock_edge.to_node = "node_2" mock_workflow.edges = [mock_edge] mock_workflow_pipeline.load_workflow.return_value = mock_workflow - + # Mock du résultat d'exécution mock_execution_result = Mock(spec=ExecutionResult) mock_execution_result.status = ExecutionStatus.SUCCESS @@ -121,24 +123,24 @@ class TestWorkflowPipelineEnhanced: mock_execution_result.target_resolved = None mock_execution_result.error = None mock_workflow_pipeline.action_executor.execute_edge.return_value = mock_execution_result - + # Créer l'instance enhanced enhanced = WorkflowPipelineEnhanced() - + # Lier les méthodes du pipeline mock enhanced.match_current_state = mock_workflow_pipeline.match_current_state enhanced.get_next_action = mock_workflow_pipeline.get_next_action enhanced.load_workflow = mock_workflow_pipeline.load_workflow enhanced.action_executor = mock_workflow_pipeline.action_executor enhanced.error_handler = mock_workflow_pipeline.error_handler - + # Act result = enhanced.execute_workflow_step_enhanced( workflow_id=workflow_id, current_state=mock_screen_state, context={"test_context": "value"} ) - + # Assert assert isinstance(result, WorkflowExecutionResult) assert result.success is True @@ -242,7 +244,8 @@ class TestWorkflowPipelineEnhanced: } # Mock de l'action suivante (pas d'action = workflow terminé) - mock_workflow_pipeline.get_next_action.return_value = None + # Contrat dict normalisé Lot A : status="terminal" pour fin légitime + mock_workflow_pipeline.get_next_action.return_value = {"status": "terminal"} # Créer l'instance enhanced enhanced = WorkflowPipelineEnhanced() @@ -347,14 +350,16 @@ class TestWorkflowPipelineEnhanced: "confidence": 0.92 } - # Mock de l'action suivante + # Mock de l'action suivante (contrat dict normalisé Lot A) mock_workflow_pipeline.get_next_action.return_value = { + "status": "selected", "edge_id": "edge_1", "action": {"type": "click", "target": "button"}, "target_node": "node_2", - "confidence": 0.95 + "confidence": 0.95, + "score": 0.95, } - + # Mock du workflow mock_workflow = Mock(spec=Workflow) mock_edge = Mock(spec=WorkflowEdge) @@ -363,7 +368,7 @@ class TestWorkflowPipelineEnhanced: mock_edge.to_node = "node_2" mock_workflow.edges = [mock_edge] mock_workflow_pipeline.load_workflow.return_value = mock_workflow - + # Mock du résultat d'exécution mock_execution_result = Mock(spec=ExecutionResult) mock_execution_result.status = ExecutionStatus.SUCCESS @@ -372,17 +377,17 @@ class TestWorkflowPipelineEnhanced: mock_execution_result.target_resolved = None mock_execution_result.error = None mock_workflow_pipeline.action_executor.execute_edge.return_value = mock_execution_result - + # Créer l'instance enhanced enhanced = WorkflowPipelineEnhanced() - + # Lier les méthodes du pipeline mock enhanced.match_current_state = mock_workflow_pipeline.match_current_state enhanced.get_next_action = mock_workflow_pipeline.get_next_action enhanced.load_workflow = mock_workflow_pipeline.load_workflow enhanced.action_executor = mock_workflow_pipeline.action_executor enhanced.error_handler = mock_workflow_pipeline.error_handler - + # Act result = enhanced.execute_workflow_step_enhanced( workflow_id=workflow_id, diff --git a/tests/unit/test_analytics_vision_metrics.py b/tests/unit/test_analytics_vision_metrics.py new file mode 100644 index 000000000..085ff8a87 --- /dev/null +++ b/tests/unit/test_analytics_vision_metrics.py @@ -0,0 +1,520 @@ +""" +Tests unitaires pour la remontée des champs vision-aware (C1) vers analytics. + +Couvre : + - StepMetrics.to_dict / from_dict avec les nouveaux champs + - AnalyticsExecutionIntegration.on_step_result passe bien les champs + - Persistance SQLite (schema + migration) des colonnes C1 +""" + +from __future__ import annotations + +import sqlite3 +import tempfile +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from core.analytics.collection.metrics_collector import StepMetrics + + +# ----------------------------------------------------------------------------- +# StepMetrics : sérialisation des champs C1 +# ----------------------------------------------------------------------------- + + +def _make_step_metrics(**overrides) -> StepMetrics: + base = dict( + step_id="s1", + execution_id="exec1", + workflow_id="wf1", + node_id="n1", + action_type="click", + target_element="", + started_at=datetime(2026, 4, 13, 10, 0, 0), + completed_at=datetime(2026, 4, 13, 10, 0, 1), + duration_ms=1000.0, + status="completed", + confidence_score=0.9, + retry_count=0, + error_details=None, + ) + base.update(overrides) + return StepMetrics(**base) + + +class TestStepMetricsVisionFields: + def test_default_vision_fields(self): + m = _make_step_metrics() + assert m.ocr_ms == 0.0 + assert m.ui_ms == 0.0 + assert m.analyze_ms == 0.0 + assert m.total_ms == 0.0 + assert m.cache_hit is False + assert m.degraded is False + + def test_to_dict_includes_vision_fields(self): + m = _make_step_metrics( + ocr_ms=120.5, + ui_ms=45.0, + analyze_ms=200.0, + total_ms=1050.0, + cache_hit=True, + degraded=True, + ) + d = m.to_dict() + assert d["ocr_ms"] == 120.5 + assert d["ui_ms"] == 45.0 + assert d["analyze_ms"] == 200.0 + assert d["total_ms"] == 1050.0 + assert d["cache_hit"] is True + assert d["degraded"] is True + + def test_from_dict_roundtrip(self): + original = _make_step_metrics( + ocr_ms=10.0, ui_ms=20.0, analyze_ms=30.0, + total_ms=100.0, cache_hit=True, degraded=False, + ) + restored = StepMetrics.from_dict(original.to_dict()) + assert restored.ocr_ms == 10.0 + assert restored.ui_ms == 20.0 + assert restored.analyze_ms == 30.0 + assert restored.total_ms == 100.0 + assert restored.cache_hit is True + assert restored.degraded is False + + def test_from_dict_missing_vision_fields_defaults_to_zero(self): + """Rétrocompatibilité : un dict sans champs C1 doit produire 0/False.""" + restored = StepMetrics.from_dict({ + 'step_id': 's1', + 'execution_id': 'e1', + 'workflow_id': 'w1', + 'node_id': 'n1', + 'action_type': 'click', + 'target_element': '', + 'started_at': datetime.now().isoformat(), + 'completed_at': datetime.now().isoformat(), + 'duration_ms': 100.0, + 'status': 'completed', + 'confidence_score': 0.5, + }) + assert restored.ocr_ms == 0.0 + assert restored.cache_hit is False + assert restored.degraded is False + + +# ----------------------------------------------------------------------------- +# AnalyticsExecutionIntegration.on_step_result +# ----------------------------------------------------------------------------- + + +class _FakeStepResult: + """Stand-in minimal pour core.execution.execution_loop.StepResult.""" + def __init__(self, **kw): + self.success = kw.get("success", True) + self.node_id = kw.get("node_id", "n1") + self.edge_id = kw.get("edge_id", None) + self.action_result = kw.get("action_result", None) + self.match_confidence = kw.get("match_confidence", 0.9) + self.duration_ms = kw.get("duration_ms", 100.0) + self.message = kw.get("message", "") + self.ocr_ms = kw.get("ocr_ms", 0.0) + self.ui_ms = kw.get("ui_ms", 0.0) + self.analyze_ms = kw.get("analyze_ms", 0.0) + self.total_ms = kw.get("total_ms", 0.0) + self.cache_hit = kw.get("cache_hit", False) + self.degraded = kw.get("degraded", False) + + +class TestAnalyticsOnStepResult: + def test_on_step_result_passes_vision_fields(self): + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + # Analytics system mocké + fake_system = MagicMock() + integration = AnalyticsExecutionIntegration(fake_system) + + step = _FakeStepResult( + node_id="node_click", + success=True, + match_confidence=0.87, + duration_ms=1234.0, + ocr_ms=111.0, + ui_ms=222.0, + analyze_ms=333.0, + total_ms=1234.0, + cache_hit=True, + degraded=False, + ) + + integration.on_step_result( + execution_id="exec1", + workflow_id="wf1", + step_result=step, + ) + + # Vérifie qu'un StepMetrics avec les bons champs a été enregistré + record_calls = fake_system.metrics_collector.record_step.call_args_list + assert len(record_calls) == 1 + recorded: StepMetrics = record_calls[0].args[0] + assert isinstance(recorded, StepMetrics) + assert recorded.node_id == "node_click" + assert recorded.workflow_id == "wf1" + assert recorded.execution_id == "exec1" + assert recorded.confidence_score == 0.87 + assert recorded.duration_ms == 1234.0 + assert recorded.ocr_ms == 111.0 + assert recorded.ui_ms == 222.0 + assert recorded.analyze_ms == 333.0 + assert recorded.total_ms == 1234.0 + assert recorded.cache_hit is True + assert recorded.degraded is False + assert recorded.status == "completed" + + def test_on_step_result_failed_step(self): + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + fake_system = MagicMock() + integration = AnalyticsExecutionIntegration(fake_system) + + step = _FakeStepResult( + success=False, + message="Click failed", + degraded=True, + ) + + integration.on_step_result("e1", "w1", step) + + recorded: StepMetrics = fake_system.metrics_collector.record_step.call_args.args[0] + assert recorded.status == "failed" + assert recorded.error_details == "Click failed" + assert recorded.degraded is True + + def test_on_step_result_disabled_integration_is_noop(self): + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + integration = AnalyticsExecutionIntegration(None) # désactivé + assert integration.enabled is False + + step = _FakeStepResult() + # Ne doit rien faire ni lever d'exception + integration.on_step_result("e1", "w1", step) + + +# ----------------------------------------------------------------------------- +# AnalyticsExecutionIntegration.on_execution_complete (Lot A — avril 2026) +# ----------------------------------------------------------------------------- + + +class TestAnalyticsOnExecutionComplete: + """Contrat normalisé : duration_ms (ms) + status (str), pas de magie.""" + + def _make_integration(self): + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + fake_system = MagicMock() + # Pas d'execution active : l'intégration doit emprunter le fallback + # "ExecutionMetrics synthétique pushé dans _buffer". + fake_system.metrics_collector._active_executions = {} + fake_system.metrics_collector._lock = MagicMock() + fake_system.metrics_collector._lock.__enter__ = MagicMock( + return_value=None + ) + fake_system.metrics_collector._lock.__exit__ = MagicMock( + return_value=None + ) + fake_system.metrics_collector._buffer = [] + return AnalyticsExecutionIntegration(fake_system), fake_system + + def test_fallback_builds_execution_metrics_with_correct_fields(self): + """Sans record_execution_start préalable, on construit un + ExecutionMetrics synthétique avec les bons noms de champs.""" + from core.analytics.collection.metrics_collector import ExecutionMetrics + + integration, fake_system = self._make_integration() + + integration.on_execution_complete( + execution_id="exec1", + workflow_id="wf1", + duration_ms=1500.0, + status="completed", + steps_total=3, + steps_completed=3, + steps_failed=0, + ) + + # Un ExecutionMetrics a été pushé dans le buffer + buffer = fake_system.metrics_collector._buffer + assert len(buffer) == 1 + metric: ExecutionMetrics = buffer[0] + assert isinstance(metric, ExecutionMetrics) + assert metric.execution_id == "exec1" + assert metric.workflow_id == "wf1" + assert metric.duration_ms == 1500.0 + assert metric.status == "completed" + assert metric.steps_total == 3 + assert metric.steps_completed == 3 + assert metric.steps_failed == 0 + # started_at / completed_at sont cohérents + delta_ms = ( + metric.completed_at - metric.started_at + ).total_seconds() * 1000 + assert abs(delta_ms - 1500.0) < 1.0 + + def test_uses_record_execution_complete_if_active(self): + """Si l'execution a été ouverte via on_execution_start, on délègue + à record_execution_complete (chemin nominal).""" + integration, fake_system = self._make_integration() + # Simuler une execution active + fake_system.metrics_collector._active_executions = {"exec1": object()} + + integration.on_execution_complete( + execution_id="exec1", + workflow_id="wf1", + duration_ms=800.0, + status="failed", + steps_total=2, + steps_completed=1, + steps_failed=1, + error_message="timeout", + ) + + call = fake_system.metrics_collector.record_execution_complete.call_args + assert call is not None + kwargs = call.kwargs + assert kwargs["execution_id"] == "exec1" + assert kwargs["status"] == "failed" + assert kwargs["steps_total"] == 2 + assert kwargs["steps_completed"] == 1 + assert kwargs["steps_failed"] == 1 + assert kwargs["error_message"] == "timeout" + + def test_steps_total_derived_when_not_provided(self): + """steps_total déduit par somme si absent, pas d'erreur silencieuse.""" + integration, fake_system = self._make_integration() + + integration.on_execution_complete( + execution_id="exec1", + workflow_id="wf1", + duration_ms=500.0, + status="completed", + steps_completed=2, + steps_failed=1, + ) + + metric = fake_system.metrics_collector._buffer[0] + assert metric.steps_total == 3 # 2 + 1 + + def test_disabled_integration_is_noop(self): + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + integration = AnalyticsExecutionIntegration(None) + assert integration.enabled is False + + # Ne doit rien faire ni lever d'exception + integration.on_execution_complete( + execution_id="exec1", + workflow_id="wf1", + duration_ms=100.0, + status="completed", + ) + + def test_realtime_complete_called(self): + """Le tracking temps réel est clos avec le bon status.""" + integration, fake_system = self._make_integration() + + integration.on_execution_complete( + execution_id="exec1", + workflow_id="wf1", + duration_ms=100.0, + status="stopped", + ) + + fake_system.realtime_analytics.complete_execution.assert_called_once_with( + execution_id="exec1", + status="stopped", + ) + + +# ----------------------------------------------------------------------------- +# AnalyticsExecutionIntegration.on_recovery_attempt (Lot A — avril 2026) +# ----------------------------------------------------------------------------- + + +class TestAnalyticsOnRecoveryAttempt: + """Contrat normalisé : StepMetrics construit avec les vrais champs.""" + + def test_success_recovery_builds_valid_step_metrics(self): + from core.analytics.collection.metrics_collector import StepMetrics + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + fake_system = MagicMock() + integration = AnalyticsExecutionIntegration(fake_system) + + integration.on_recovery_attempt( + execution_id="exec1", + workflow_id="wf1", + node_id="node_click", + strategy="retry_with_delay", + success=True, + duration_ms=250.0, + ) + + call = fake_system.metrics_collector.record_step.call_args + assert call is not None + recorded: StepMetrics = call.args[0] + assert isinstance(recorded, StepMetrics) + assert recorded.execution_id == "exec1" + assert recorded.workflow_id == "wf1" + assert recorded.node_id == "node_click_recovery" + assert recorded.action_type == "recovery_retry_with_delay" + assert recorded.duration_ms == 250.0 + assert recorded.status == "completed" + assert recorded.error_details is None + # Champs obligatoires du dataclass + assert recorded.step_id # non vide + assert recorded.target_element == "" + assert recorded.confidence_score == 0.0 + + def test_failed_recovery_sets_status_and_error_details(self): + from core.analytics.collection.metrics_collector import StepMetrics + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + fake_system = MagicMock() + integration = AnalyticsExecutionIntegration(fake_system) + + integration.on_recovery_attempt( + execution_id="e1", + workflow_id="w1", + node_id="n1", + strategy="fallback_to_parent", + success=False, + duration_ms=80.0, + ) + + recorded: StepMetrics = ( + fake_system.metrics_collector.record_step.call_args.args[0] + ) + assert recorded.status == "failed" + assert recorded.error_details == "Recovery failed: fallback_to_parent" + assert recorded.duration_ms == 80.0 + + def test_disabled_integration_is_noop(self): + from core.analytics.integration.execution_integration import ( + AnalyticsExecutionIntegration, + ) + + integration = AnalyticsExecutionIntegration(None) + integration.on_recovery_attempt( + execution_id="e1", + workflow_id="w1", + node_id="n1", + strategy="x", + success=True, + duration_ms=10.0, + ) + + +# ----------------------------------------------------------------------------- +# Persistance SQLite : schema + migration +# ----------------------------------------------------------------------------- + + +class TestTimeSeriesStoreSchema: + def test_new_store_has_vision_columns(self, tmp_path): + from core.analytics.storage.timeseries_store import TimeSeriesStore + + store = TimeSeriesStore(tmp_path) + with sqlite3.connect(str(store.db_path)) as conn: + cols = {row[1] for row in conn.execute( + "PRAGMA table_info(step_metrics)" + )} + # Colonnes legacy + assert "duration_ms" in cols + assert "confidence_score" in cols + # Colonnes C1 + assert "ocr_ms" in cols + assert "ui_ms" in cols + assert "analyze_ms" in cols + assert "total_ms" in cols + assert "cache_hit" in cols + assert "degraded" in cols + + def test_migration_adds_missing_columns(self, tmp_path): + """Base pré-existante sans les colonnes C1 — la migration doit les ajouter.""" + from core.analytics.storage.timeseries_store import TimeSeriesStore + + # Créer une base "legacy" manuellement, sans les nouvelles colonnes + storage_dir = tmp_path / "legacy" + storage_dir.mkdir() + legacy_db = storage_dir / "timeseries.db" + with sqlite3.connect(str(legacy_db)) as conn: + conn.executescript(""" + CREATE TABLE step_metrics ( + step_id TEXT PRIMARY KEY, + execution_id TEXT NOT NULL, + workflow_id TEXT NOT NULL, + node_id TEXT NOT NULL, + action_type TEXT NOT NULL, + target_element TEXT, + started_at TIMESTAMP NOT NULL, + completed_at TIMESTAMP NOT NULL, + duration_ms REAL NOT NULL, + status TEXT NOT NULL, + confidence_score REAL, + retry_count INTEGER DEFAULT 0, + error_details TEXT + ); + """) + conn.commit() + + # Instancier TimeSeriesStore → doit migrer + _ = TimeSeriesStore(storage_dir) + + with sqlite3.connect(str(legacy_db)) as conn: + cols = {row[1] for row in conn.execute( + "PRAGMA table_info(step_metrics)" + )} + assert "ocr_ms" in cols + assert "cache_hit" in cols + assert "degraded" in cols + + def test_write_and_read_vision_metrics(self, tmp_path): + from core.analytics.storage.timeseries_store import TimeSeriesStore + + store = TimeSeriesStore(tmp_path) + metric = _make_step_metrics( + ocr_ms=50.0, ui_ms=60.0, analyze_ms=110.0, + total_ms=500.0, cache_hit=True, degraded=True, + ) + store.write_metrics([metric]) + + with sqlite3.connect(str(store.db_path)) as conn: + conn.row_factory = sqlite3.Row + row = conn.execute( + "SELECT * FROM step_metrics WHERE step_id = ?", (metric.step_id,) + ).fetchone() + assert row is not None + assert row["ocr_ms"] == 50.0 + assert row["ui_ms"] == 60.0 + assert row["analyze_ms"] == 110.0 + assert row["total_ms"] == 500.0 + # SQLite stocke les bool comme INTEGER + assert row["cache_hit"] == 1 + assert row["degraded"] == 1 diff --git a/tests/unit/test_workflow_pipeline_get_next_action.py b/tests/unit/test_workflow_pipeline_get_next_action.py new file mode 100644 index 000000000..5cefd1ee4 --- /dev/null +++ b/tests/unit/test_workflow_pipeline_get_next_action.py @@ -0,0 +1,264 @@ +""" +Tests de la sélection robuste d'edge dans WorkflowPipeline.get_next_action (C3). + +Vérifie que la nouvelle API utilise EdgeScorer et expose le contrat dict +normalisé (Lot A — avril 2026) : + - status="selected" → edge choisi + - status="terminal" → aucun outgoing_edge (fin légitime) + - status="blocked" → candidats rejetés (NE DOIT PAS être traité comme fin) +""" + +from __future__ import annotations + +from datetime import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from core.models.screen_state import ( + ContextLevel, + EmbeddingRef, + PerceptionLevel, + RawLevel, + ScreenState, + WindowContext, +) +from core.models.workflow_graph import ( + Action, + EdgeConstraints, + EdgeStats, + PostConditions, + TargetSpec, + Workflow, + WorkflowEdge, + WorkflowNode, +) + + +def _edge( + edge_id: str, + required_window_title: str = "", + success_rate: float = 0.5, + execution_count: int = 10, + min_source_similarity: float = 0.80, +) -> WorkflowEdge: + stats = EdgeStats() + if execution_count > 0: + stats.execution_count = execution_count + stats.success_count = int(round(success_rate * execution_count)) + stats.failure_count = execution_count - stats.success_count + + return WorkflowEdge( + edge_id=edge_id, + from_node="n1", + to_node="n2", + action=Action(type="mouse_click", target=TargetSpec()), + constraints=EdgeConstraints( + required_window_title=required_window_title, + min_source_similarity=min_source_similarity, + ), + post_conditions=PostConditions(), + stats=stats, + ) + + +def _state(window_title: str = "AppA") -> ScreenState: + return ScreenState( + screen_state_id="s", + timestamp=datetime.now(), + session_id="sess", + window=WindowContext( + app_name="app", window_title=window_title, screen_resolution=[1920, 1080] + ), + raw=RawLevel(screenshot_path="", capture_method="t", file_size_bytes=0), + perception=PerceptionLevel( + embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512), + detected_text=[], + text_detection_method="none", + confidence_avg=0.0, + ), + context=ContextLevel(), + ui_elements=[], + ) + + +@pytest.fixture +def pipeline_with_workflow(tmp_path): + """Pipeline minimal avec un workflow en mémoire (Workflow mocké). + + On évite la construction d'un vrai Workflow (ScreenTemplate trop lourd) + en utilisant un MagicMock configuré pour les méthodes utilisées par + `get_next_action` : `get_outgoing_edges`. + """ + from core.pipeline.workflow_pipeline import WorkflowPipeline + + # Stub pour éviter les lourds imports (mocks sur composants GPU) + with patch.multiple( + "core.pipeline.workflow_pipeline", + UIDetector=MagicMock(), + CLIPEmbedder=MagicMock(), + StateEmbeddingBuilder=MagicMock(), + FusionEngine=MagicMock(), + FAISSManager=MagicMock(), + GraphBuilder=MagicMock(), + NodeMatcher=MagicMock(), + HierarchicalMatcher=MagicMock(), + LearningManager=MagicMock(), + ActionExecutor=MagicMock(), + TargetResolver=MagicMock(), + ErrorHandler=MagicMock(), + ): + pipeline = WorkflowPipeline(data_dir=str(tmp_path), use_gpu=False) + + workflow = MagicMock(spec=Workflow) + workflow.workflow_id = "wf1" + workflow.edges = [] + workflow.get_outgoing_edges = lambda node_id: [ + e for e in workflow.edges if e.from_node == node_id + ] + pipeline._workflows["wf1"] = workflow + return pipeline, workflow + + +class TestGetNextActionC3: + + def test_picks_highest_success_rate(self, pipeline_with_workflow): + pipeline, wf = pipeline_with_workflow + wf.edges = [ + _edge("low", success_rate=0.1, execution_count=20), + _edge("high", success_rate=0.9, execution_count=20), + ] + result = pipeline.get_next_action("wf1", "n1", screen_state=_state()) + assert result["status"] == "selected" + assert result["edge_id"] == "high" + + def test_filters_out_invalid_preconditions(self, pipeline_with_workflow): + pipeline, wf = pipeline_with_workflow + wf.edges = [ + _edge("bad", required_window_title="NopeApp", success_rate=0.99, execution_count=20), + _edge("ok", success_rate=0.50, execution_count=20), + ] + result = pipeline.get_next_action( + "wf1", "n1", screen_state=_state(window_title="AppA") + ) + assert result["status"] == "selected" + assert result["edge_id"] == "ok" + + def test_blocked_when_no_valid_edge(self, pipeline_with_workflow): + """Des candidats existent mais aucun ne passe les contraintes. + + Lot A — cas critique : on NE DOIT PAS retourner "terminal" ici. Un + blocage doit remonter explicitement pour déclencher pause supervisée. + """ + pipeline, wf = pipeline_with_workflow + wf.edges = [ + _edge("e1", required_window_title="AppB"), + _edge("e2", required_window_title="AppC"), + ] + result = pipeline.get_next_action( + "wf1", "n1", screen_state=_state(window_title="AppA") + ) + assert result["status"] == "blocked" + assert result["reason"] == "no_valid_edge" + + def test_strategy_first_keeps_legacy_behavior(self, pipeline_with_workflow): + pipeline, wf = pipeline_with_workflow + wf.edges = [ + _edge("e1", success_rate=0.1, execution_count=20), + _edge("e2", success_rate=0.9, execution_count=20), + ] + result = pipeline.get_next_action( + "wf1", "n1", screen_state=_state(), strategy="first" + ) + # Mode legacy : premier edge sans tri + assert result["status"] == "selected" + assert result["edge_id"] == "e1" + + def test_no_screen_state_still_works(self, pipeline_with_workflow): + """Sans ScreenState, le scorer ne peut pas filtrer mais peut ranker.""" + pipeline, wf = pipeline_with_workflow + wf.edges = [ + _edge("e1", success_rate=0.1, execution_count=20), + _edge("e2", success_rate=0.9, execution_count=20), + ] + result = pipeline.get_next_action("wf1", "n1", screen_state=None) + assert result["status"] == "selected" + # Le ranking par success_rate fonctionne toujours + assert result["edge_id"] == "e2" + + def test_no_outgoing_edges_is_terminal(self, pipeline_with_workflow): + """Aucun outgoing_edge = fin légitime du workflow (status="terminal").""" + pipeline, wf = pipeline_with_workflow + wf.edges = [] + result = pipeline.get_next_action("wf1", "n1", screen_state=_state()) + assert result["status"] == "terminal" + + def test_blocked_distinct_from_terminal(self, pipeline_with_workflow): + """Régression Lot A : blocked != terminal. + + Le bug historique confondait ces deux cas. Un workflow bloqué + apparaissait comme "terminé avec succès" côté ExecutionLoop. + """ + pipeline, wf = pipeline_with_workflow + + # Cas terminal : pas d'outgoing + wf.edges = [] + terminal = pipeline.get_next_action("wf1", "n1", screen_state=_state()) + + # Cas bloqué : outgoing présent mais rejetés + wf.edges = [_edge("bad", required_window_title="NopeApp")] + blocked = pipeline.get_next_action("wf1", "n1", screen_state=_state(window_title="AppA")) + + assert terminal["status"] == "terminal" + assert blocked["status"] == "blocked" + # L'appelant doit pouvoir les distinguer sans ambiguïté + assert terminal["status"] != blocked["status"] + + def test_workflow_not_found_is_blocked(self, pipeline_with_workflow): + """Workflow inexistant = blocked avec reason explicite (pas silencieux).""" + pipeline, _wf = pipeline_with_workflow + result = pipeline.get_next_action( + "wf_inexistant", "n1", screen_state=_state() + ) + assert result["status"] == "blocked" + assert result["reason"] == "workflow_not_found" + + +class TestGetNextActionSourceSimilarity: + """Lot B — propagation de source_similarity jusqu'à EdgeScorer.""" + + def test_high_similarity_passes_min_source_similarity( + self, pipeline_with_workflow + ): + """source_similarity élevée → edge accepté.""" + pipeline, wf = pipeline_with_workflow + wf.edges = [_edge("e1", min_source_similarity=0.80)] + result = pipeline.get_next_action( + "wf1", "n1", screen_state=_state(), source_similarity=0.95 + ) + assert result["status"] == "selected" + assert result["edge_id"] == "e1" + + def test_low_similarity_blocks_edge(self, pipeline_with_workflow): + """source_similarity < min_source_similarity → edge rejeté → blocked. + + C'est la preuve que la précondition min_source_similarity est + redevenue effective (Lot B). Avant ce lot, l'EdgeScorer recevait + toujours 1.0 hardcodé et ne rejetait jamais l'edge pour ce motif. + """ + pipeline, wf = pipeline_with_workflow + wf.edges = [_edge("e1", min_source_similarity=0.80)] + result = pipeline.get_next_action( + "wf1", "n1", screen_state=_state(), source_similarity=0.40 + ) + assert result["status"] == "blocked" + assert result["reason"] == "no_valid_edge" + + def test_default_source_similarity_is_one(self, pipeline_with_workflow): + """Sans source_similarity fourni → défaut 1.0 → pas de rejet pour + ce motif (compat avec les call sites qui ne l'ont pas encore).""" + pipeline, wf = pipeline_with_workflow + # min_source_similarity très strict, mais défaut appelant = 1.0 + wf.edges = [_edge("e1", min_source_similarity=0.99)] + result = pipeline.get_next_action("wf1", "n1", screen_state=_state()) + assert result["status"] == "selected" diff --git a/visual_workflow_builder/backend/services/execution_integration.py b/visual_workflow_builder/backend/services/execution_integration.py index 68b1d3fcb..22209b631 100644 --- a/visual_workflow_builder/backend/services/execution_integration.py +++ b/visual_workflow_builder/backend/services/execution_integration.py @@ -227,12 +227,10 @@ class VisualWorkflowExecutor: self.analytics_integration.on_execution_complete( execution_id=execution_id, workflow_id=workflow_graph.workflow_id, - started_at=result.start_time, - completed_at=result.end_time, - duration=result._calculate_duration() / 1000.0, # en secondes - status='success', + duration_ms=float(result._calculate_duration() or 0.0), + status='completed', steps_completed=len(workflow_graph.nodes), - steps_failed=0 + steps_failed=0, ) # Collecter les métriques Analytics pour l'UI @@ -265,13 +263,11 @@ class VisualWorkflowExecutor: self.analytics_integration.on_execution_complete( execution_id=execution_id, workflow_id=visual_workflow.workflow_id, - started_at=result.start_time, - completed_at=result.end_time, - duration=result._calculate_duration() / 1000.0 if result._calculate_duration() else 0, + duration_ms=float(result._calculate_duration() or 0.0), status='failed', error_message=str(e), steps_completed=0, - steps_failed=1 + steps_failed=1, ) # Enregistrer l'échec dans le système d'apprentissage @@ -312,7 +308,8 @@ class VisualWorkflowExecutor: if result.success: self._log(execution_id, 'info', f'Workflow exécuté avec succès') - # Notifier Analytics pour chaque étape + # Notifier Analytics pour chaque étape (contrat normalisé Lot A : + # duration_ms en millisecondes, plus de "duration" en secondes) for i, step_result in enumerate(result.step_results): if self.analytics_integration: self.analytics_integration.on_step_complete( @@ -322,8 +319,8 @@ class VisualWorkflowExecutor: action_type=step_result.action_type, started_at=step_result.start_time, completed_at=step_result.end_time, - duration=step_result.duration_seconds, - success=step_result.success + duration_ms=float(step_result.duration_seconds or 0.0) * 1000.0, + success=step_result.success, ) # Notifier la progression @@ -383,8 +380,8 @@ class VisualWorkflowExecutor: action_type=getattr(node, 'action_type', 'unknown'), started_at=step_start_time, completed_at=step_end_time, - duration=step_duration, - success=True + duration_ms=float(step_duration or 0.0) * 1000.0, + success=True, ) progress = (i + 1) / total_nodes * 100