""" Tests unitaires pour la remontée des champs vision-aware (C1) vers analytics. Couvre : - StepMetrics.to_dict / from_dict avec les nouveaux champs - AnalyticsExecutionIntegration.on_step_result passe bien les champs - Persistance SQLite (schema + migration) des colonnes C1 """ from __future__ import annotations import sqlite3 import tempfile from datetime import datetime from pathlib import Path from unittest.mock import MagicMock, patch import pytest from core.analytics.collection.metrics_collector import StepMetrics # ----------------------------------------------------------------------------- # StepMetrics : sérialisation des champs C1 # ----------------------------------------------------------------------------- def _make_step_metrics(**overrides) -> StepMetrics: base = dict( step_id="s1", execution_id="exec1", workflow_id="wf1", node_id="n1", action_type="click", target_element="", started_at=datetime(2026, 4, 13, 10, 0, 0), completed_at=datetime(2026, 4, 13, 10, 0, 1), duration_ms=1000.0, status="completed", confidence_score=0.9, retry_count=0, error_details=None, ) base.update(overrides) return StepMetrics(**base) class TestStepMetricsVisionFields: def test_default_vision_fields(self): m = _make_step_metrics() assert m.ocr_ms == 0.0 assert m.ui_ms == 0.0 assert m.analyze_ms == 0.0 assert m.total_ms == 0.0 assert m.cache_hit is False assert m.degraded is False def test_to_dict_includes_vision_fields(self): m = _make_step_metrics( ocr_ms=120.5, ui_ms=45.0, analyze_ms=200.0, total_ms=1050.0, cache_hit=True, degraded=True, ) d = m.to_dict() assert d["ocr_ms"] == 120.5 assert d["ui_ms"] == 45.0 assert d["analyze_ms"] == 200.0 assert d["total_ms"] == 1050.0 assert d["cache_hit"] is True assert d["degraded"] is True def test_from_dict_roundtrip(self): original = _make_step_metrics( ocr_ms=10.0, ui_ms=20.0, analyze_ms=30.0, total_ms=100.0, cache_hit=True, degraded=False, ) restored = StepMetrics.from_dict(original.to_dict()) assert restored.ocr_ms == 10.0 assert restored.ui_ms == 20.0 assert restored.analyze_ms == 30.0 assert restored.total_ms == 100.0 assert restored.cache_hit is True assert restored.degraded is False def test_from_dict_missing_vision_fields_defaults_to_zero(self): """Rétrocompatibilité : un dict sans champs C1 doit produire 0/False.""" restored = StepMetrics.from_dict({ 'step_id': 's1', 'execution_id': 'e1', 'workflow_id': 'w1', 'node_id': 'n1', 'action_type': 'click', 'target_element': '', 'started_at': datetime.now().isoformat(), 'completed_at': datetime.now().isoformat(), 'duration_ms': 100.0, 'status': 'completed', 'confidence_score': 0.5, }) assert restored.ocr_ms == 0.0 assert restored.cache_hit is False assert restored.degraded is False # ----------------------------------------------------------------------------- # AnalyticsExecutionIntegration.on_step_result # ----------------------------------------------------------------------------- class _FakeStepResult: """Stand-in minimal pour core.execution.execution_loop.StepResult.""" def __init__(self, **kw): self.success = kw.get("success", True) self.node_id = kw.get("node_id", "n1") self.edge_id = kw.get("edge_id", None) self.action_result = kw.get("action_result", None) self.match_confidence = kw.get("match_confidence", 0.9) self.duration_ms = kw.get("duration_ms", 100.0) self.message = kw.get("message", "") self.ocr_ms = kw.get("ocr_ms", 0.0) self.ui_ms = kw.get("ui_ms", 0.0) self.analyze_ms = kw.get("analyze_ms", 0.0) self.total_ms = kw.get("total_ms", 0.0) self.cache_hit = kw.get("cache_hit", False) self.degraded = kw.get("degraded", False) class TestAnalyticsOnStepResult: def test_on_step_result_passes_vision_fields(self): from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) # Analytics system mocké fake_system = MagicMock() integration = AnalyticsExecutionIntegration(fake_system) step = _FakeStepResult( node_id="node_click", success=True, match_confidence=0.87, duration_ms=1234.0, ocr_ms=111.0, ui_ms=222.0, analyze_ms=333.0, total_ms=1234.0, cache_hit=True, degraded=False, ) integration.on_step_result( execution_id="exec1", workflow_id="wf1", step_result=step, ) # Vérifie qu'un StepMetrics avec les bons champs a été enregistré record_calls = fake_system.metrics_collector.record_step.call_args_list assert len(record_calls) == 1 recorded: StepMetrics = record_calls[0].args[0] assert isinstance(recorded, StepMetrics) assert recorded.node_id == "node_click" assert recorded.workflow_id == "wf1" assert recorded.execution_id == "exec1" assert recorded.confidence_score == 0.87 assert recorded.duration_ms == 1234.0 assert recorded.ocr_ms == 111.0 assert recorded.ui_ms == 222.0 assert recorded.analyze_ms == 333.0 assert recorded.total_ms == 1234.0 assert recorded.cache_hit is True assert recorded.degraded is False assert recorded.status == "completed" def test_on_step_result_failed_step(self): from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) fake_system = MagicMock() integration = AnalyticsExecutionIntegration(fake_system) step = _FakeStepResult( success=False, message="Click failed", degraded=True, ) integration.on_step_result("e1", "w1", step) recorded: StepMetrics = fake_system.metrics_collector.record_step.call_args.args[0] assert recorded.status == "failed" assert recorded.error_details == "Click failed" assert recorded.degraded is True def test_on_step_result_disabled_integration_is_noop(self): from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) integration = AnalyticsExecutionIntegration(None) # désactivé assert integration.enabled is False step = _FakeStepResult() # Ne doit rien faire ni lever d'exception integration.on_step_result("e1", "w1", step) # ----------------------------------------------------------------------------- # AnalyticsExecutionIntegration.on_execution_complete (Lot A — avril 2026) # ----------------------------------------------------------------------------- class TestAnalyticsOnExecutionComplete: """Contrat normalisé : duration_ms (ms) + status (str), pas de magie.""" def _make_integration(self): from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) fake_system = MagicMock() # Pas d'execution active : l'intégration doit emprunter le fallback # "ExecutionMetrics synthétique pushé dans _buffer". fake_system.metrics_collector._active_executions = {} fake_system.metrics_collector._lock = MagicMock() fake_system.metrics_collector._lock.__enter__ = MagicMock( return_value=None ) fake_system.metrics_collector._lock.__exit__ = MagicMock( return_value=None ) fake_system.metrics_collector._buffer = [] return AnalyticsExecutionIntegration(fake_system), fake_system def test_fallback_builds_execution_metrics_with_correct_fields(self): """Sans record_execution_start préalable, on construit un ExecutionMetrics synthétique avec les bons noms de champs.""" from core.analytics.collection.metrics_collector import ExecutionMetrics integration, fake_system = self._make_integration() integration.on_execution_complete( execution_id="exec1", workflow_id="wf1", duration_ms=1500.0, status="completed", steps_total=3, steps_completed=3, steps_failed=0, ) # Un ExecutionMetrics a été pushé dans le buffer buffer = fake_system.metrics_collector._buffer assert len(buffer) == 1 metric: ExecutionMetrics = buffer[0] assert isinstance(metric, ExecutionMetrics) assert metric.execution_id == "exec1" assert metric.workflow_id == "wf1" assert metric.duration_ms == 1500.0 assert metric.status == "completed" assert metric.steps_total == 3 assert metric.steps_completed == 3 assert metric.steps_failed == 0 # started_at / completed_at sont cohérents delta_ms = ( metric.completed_at - metric.started_at ).total_seconds() * 1000 assert abs(delta_ms - 1500.0) < 1.0 def test_uses_record_execution_complete_if_active(self): """Si l'execution a été ouverte via on_execution_start, on délègue à record_execution_complete (chemin nominal).""" integration, fake_system = self._make_integration() # Simuler une execution active fake_system.metrics_collector._active_executions = {"exec1": object()} integration.on_execution_complete( execution_id="exec1", workflow_id="wf1", duration_ms=800.0, status="failed", steps_total=2, steps_completed=1, steps_failed=1, error_message="timeout", ) call = fake_system.metrics_collector.record_execution_complete.call_args assert call is not None kwargs = call.kwargs assert kwargs["execution_id"] == "exec1" assert kwargs["status"] == "failed" assert kwargs["steps_total"] == 2 assert kwargs["steps_completed"] == 1 assert kwargs["steps_failed"] == 1 assert kwargs["error_message"] == "timeout" def test_steps_total_derived_when_not_provided(self): """steps_total déduit par somme si absent, pas d'erreur silencieuse.""" integration, fake_system = self._make_integration() integration.on_execution_complete( execution_id="exec1", workflow_id="wf1", duration_ms=500.0, status="completed", steps_completed=2, steps_failed=1, ) metric = fake_system.metrics_collector._buffer[0] assert metric.steps_total == 3 # 2 + 1 def test_disabled_integration_is_noop(self): from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) integration = AnalyticsExecutionIntegration(None) assert integration.enabled is False # Ne doit rien faire ni lever d'exception integration.on_execution_complete( execution_id="exec1", workflow_id="wf1", duration_ms=100.0, status="completed", ) def test_realtime_complete_called(self): """Le tracking temps réel est clos avec le bon status.""" integration, fake_system = self._make_integration() integration.on_execution_complete( execution_id="exec1", workflow_id="wf1", duration_ms=100.0, status="stopped", ) fake_system.realtime_analytics.complete_execution.assert_called_once_with( execution_id="exec1", status="stopped", ) # ----------------------------------------------------------------------------- # AnalyticsExecutionIntegration.on_recovery_attempt (Lot A — avril 2026) # ----------------------------------------------------------------------------- class TestAnalyticsOnRecoveryAttempt: """Contrat normalisé : StepMetrics construit avec les vrais champs.""" def test_success_recovery_builds_valid_step_metrics(self): from core.analytics.collection.metrics_collector import StepMetrics from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) fake_system = MagicMock() integration = AnalyticsExecutionIntegration(fake_system) integration.on_recovery_attempt( execution_id="exec1", workflow_id="wf1", node_id="node_click", strategy="retry_with_delay", success=True, duration_ms=250.0, ) call = fake_system.metrics_collector.record_step.call_args assert call is not None recorded: StepMetrics = call.args[0] assert isinstance(recorded, StepMetrics) assert recorded.execution_id == "exec1" assert recorded.workflow_id == "wf1" assert recorded.node_id == "node_click_recovery" assert recorded.action_type == "recovery_retry_with_delay" assert recorded.duration_ms == 250.0 assert recorded.status == "completed" assert recorded.error_details is None # Champs obligatoires du dataclass assert recorded.step_id # non vide assert recorded.target_element == "" assert recorded.confidence_score == 0.0 def test_failed_recovery_sets_status_and_error_details(self): from core.analytics.collection.metrics_collector import StepMetrics from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) fake_system = MagicMock() integration = AnalyticsExecutionIntegration(fake_system) integration.on_recovery_attempt( execution_id="e1", workflow_id="w1", node_id="n1", strategy="fallback_to_parent", success=False, duration_ms=80.0, ) recorded: StepMetrics = ( fake_system.metrics_collector.record_step.call_args.args[0] ) assert recorded.status == "failed" assert recorded.error_details == "Recovery failed: fallback_to_parent" assert recorded.duration_ms == 80.0 def test_disabled_integration_is_noop(self): from core.analytics.integration.execution_integration import ( AnalyticsExecutionIntegration, ) integration = AnalyticsExecutionIntegration(None) integration.on_recovery_attempt( execution_id="e1", workflow_id="w1", node_id="n1", strategy="x", success=True, duration_ms=10.0, ) # ----------------------------------------------------------------------------- # Persistance SQLite : schema + migration # ----------------------------------------------------------------------------- class TestTimeSeriesStoreSchema: def test_new_store_has_vision_columns(self, tmp_path): from core.analytics.storage.timeseries_store import TimeSeriesStore store = TimeSeriesStore(tmp_path) with sqlite3.connect(str(store.db_path)) as conn: cols = {row[1] for row in conn.execute( "PRAGMA table_info(step_metrics)" )} # Colonnes legacy assert "duration_ms" in cols assert "confidence_score" in cols # Colonnes C1 assert "ocr_ms" in cols assert "ui_ms" in cols assert "analyze_ms" in cols assert "total_ms" in cols assert "cache_hit" in cols assert "degraded" in cols def test_migration_adds_missing_columns(self, tmp_path): """Base pré-existante sans les colonnes C1 — la migration doit les ajouter.""" from core.analytics.storage.timeseries_store import TimeSeriesStore # Créer une base "legacy" manuellement, sans les nouvelles colonnes storage_dir = tmp_path / "legacy" storage_dir.mkdir() legacy_db = storage_dir / "timeseries.db" with sqlite3.connect(str(legacy_db)) as conn: conn.executescript(""" CREATE TABLE step_metrics ( step_id TEXT PRIMARY KEY, execution_id TEXT NOT NULL, workflow_id TEXT NOT NULL, node_id TEXT NOT NULL, action_type TEXT NOT NULL, target_element TEXT, started_at TIMESTAMP NOT NULL, completed_at TIMESTAMP NOT NULL, duration_ms REAL NOT NULL, status TEXT NOT NULL, confidence_score REAL, retry_count INTEGER DEFAULT 0, error_details TEXT ); """) conn.commit() # Instancier TimeSeriesStore → doit migrer _ = TimeSeriesStore(storage_dir) with sqlite3.connect(str(legacy_db)) as conn: cols = {row[1] for row in conn.execute( "PRAGMA table_info(step_metrics)" )} assert "ocr_ms" in cols assert "cache_hit" in cols assert "degraded" in cols def test_write_and_read_vision_metrics(self, tmp_path): from core.analytics.storage.timeseries_store import TimeSeriesStore store = TimeSeriesStore(tmp_path) metric = _make_step_metrics( ocr_ms=50.0, ui_ms=60.0, analyze_ms=110.0, total_ms=500.0, cache_hit=True, degraded=True, ) store.write_metrics([metric]) with sqlite3.connect(str(store.db_path)) as conn: conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM step_metrics WHERE step_id = ?", (metric.step_id,) ).fetchone() assert row is not None assert row["ocr_ms"] == 50.0 assert row["ui_ms"] == 60.0 assert row["analyze_ms"] == 110.0 assert row["total_ms"] == 500.0 # SQLite stocke les bool comme INTEGER assert row["cache_hit"] == 1 assert row["degraded"] == 1