Files
rpa_vision_v3/core/evaluation/replay_simulation.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

930 lines
34 KiB
Python

"""
Replay Simulation Report - Fiche #16
Système de test "dry-run" pour évaluer les règles de résolution de cibles
sans interaction UI réelle. Charge des cas de test depuis tests/dataset/**/
et génère des rapports de performance avec scores de risque.
Auteur : Dom, Alice Kiro - 22 décembre 2025
"""
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
import numpy as np
from datetime import datetime
from ..models.screen_state import ScreenState
from ..models.ui_element import UIElement
from ..models.workflow_graph import TargetSpec
from ..execution.target_resolver import TargetResolver
logger = logging.getLogger(__name__)
@dataclass
class TestCase:
"""Cas de test pour replay simulation"""
case_id: str
dataset_path: Path
screen_state: ScreenState
target_spec: TargetSpec
expected_element_id: str
expected_confidence: float
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RiskMetrics:
"""Métriques de risque pour une résolution"""
ambiguity_score: float # 0.0 = non ambigu, 1.0 = très ambigu
confidence_score: float # Confiance du resolver
margin_top1_top2: float # Marge entre top1 et top2
element_count: int # Nombre d'éléments candidats
resolution_time_ms: float # Temps de résolution
@property
def overall_risk(self) -> float:
"""Score de risque global (0.0 = faible risque, 1.0 = risque élevé)"""
# Pondération des facteurs de risque
risk = (
0.4 * self.ambiguity_score + # Ambiguïté = facteur principal
0.3 * (1.0 - self.confidence_score) + # Faible confiance = risque
0.2 * (1.0 - min(self.margin_top1_top2, 1.0)) + # Faible marge = risque
0.1 * min(self.resolution_time_ms / 1000.0, 1.0) # Temps élevé = risque
)
return min(max(risk, 0.0), 1.0)
@dataclass
class SimulationResult:
"""Résultat d'une simulation de cas de test"""
case_id: str
success: bool
resolved_element_id: Optional[str]
expected_element_id: str
risk_metrics: RiskMetrics
strategy_used: str
error_message: Optional[str] = None
alternatives: List[Dict[str, Any]] = field(default_factory=list)
@property
def is_correct(self) -> bool:
"""Vérifie si la résolution est correcte"""
return self.success and self.resolved_element_id == self.expected_element_id
@dataclass
class ReplayReport:
"""Rapport complet de replay simulation"""
timestamp: datetime
total_cases: int
successful_cases: int
correct_cases: int
failed_cases: int
results: List[SimulationResult]
performance_stats: Dict[str, float]
risk_analysis: Dict[str, Any]
@property
def success_rate(self) -> float:
"""Taux de succès (résolution trouvée)"""
return self.successful_cases / max(1, self.total_cases)
@property
def accuracy_rate(self) -> float:
"""Taux de précision (résolution correcte)"""
return self.correct_cases / max(1, self.total_cases)
@property
def average_risk(self) -> float:
"""Score de risque moyen"""
if not self.results:
return 0.0
risks = [r.risk_metrics.overall_risk for r in self.results if r.success]
return sum(risks) / max(1, len(risks))
class ReplaySimulation:
"""
Simulateur de replay pour tests headless des règles de résolution.
Fonctionnalités:
- Chargement de datasets de test depuis tests/dataset/**/
- Évaluation avec TargetResolver réel et règles des fiches #8-#14
- Calcul de scores de risque (ambiguïté, confiance, marge)
- Génération de rapports JSON et Markdown
- 100% headless, parfait pour itération rapide
"""
def __init__(
self,
target_resolver: Optional[TargetResolver] = None,
dataset_root: Path = None
):
"""
Initialiser le simulateur.
Args:
target_resolver: Resolver à utiliser (créé par défaut si None)
dataset_root: Racine des datasets (tests/dataset par défaut)
"""
self.target_resolver = target_resolver or TargetResolver()
self.dataset_root = dataset_root or Path("tests/dataset")
# Stats de performance
self.stats = {
"cases_loaded": 0,
"cases_processed": 0,
"total_load_time_ms": 0.0,
"total_resolution_time_ms": 0.0
}
logger.info(f"ReplaySimulation initialized with dataset root: {self.dataset_root}")
def load_test_cases(
self,
dataset_pattern: str = "**",
max_cases: Optional[int] = None
) -> List[TestCase]:
"""
Charger les cas de test depuis le dataset.
Format attendu par répertoire:
- screen_state.json: ScreenState sérialisé
- target_spec.json: TargetSpec sérialisé
- expected.json: {"element_id": "...", "confidence": 0.95}
Args:
dataset_pattern: Pattern de recherche (ex: "form_*", "**")
max_cases: Limite du nombre de cas (None = tous)
Returns:
Liste des cas de test chargés
"""
start_time = time.perf_counter()
test_cases = []
# Rechercher tous les répertoires correspondant au pattern
search_path = self.dataset_root / dataset_pattern
case_dirs = []
if search_path.is_dir():
case_dirs = [search_path]
else:
# Recherche avec glob pattern
case_dirs = list(self.dataset_root.glob(dataset_pattern))
case_dirs = [d for d in case_dirs if d.is_dir()]
logger.info(f"Found {len(case_dirs)} potential test case directories")
for case_dir in case_dirs:
if max_cases and len(test_cases) >= max_cases:
break
try:
test_case = self._load_single_test_case(case_dir)
if test_case:
test_cases.append(test_case)
self.stats["cases_loaded"] += 1
except Exception as e:
logger.warning(f"Failed to load test case from {case_dir}: {e}")
load_time = (time.perf_counter() - start_time) * 1000
self.stats["total_load_time_ms"] += load_time
logger.info(f"Loaded {len(test_cases)} test cases in {load_time:.1f}ms")
return test_cases
def _load_single_test_case(self, case_dir: Path) -> Optional[TestCase]:
"""
Charger un cas de test depuis un répertoire.
Args:
case_dir: Répertoire contenant les fichiers du cas de test
Returns:
TestCase chargé ou None si erreur
"""
required_files = ["screen_state.json", "target_spec.json", "expected.json"]
# Vérifier que tous les fichiers requis existent
for filename in required_files:
if not (case_dir / filename).exists():
logger.debug(f"Missing required file {filename} in {case_dir}")
return None
try:
# Charger screen_state
with open(case_dir / "screen_state.json", 'r', encoding='utf-8') as f:
screen_state_data = json.load(f)
screen_state = ScreenState.from_json(screen_state_data)
# Charger target_spec
with open(case_dir / "target_spec.json", 'r', encoding='utf-8') as f:
target_spec_data = json.load(f)
target_spec = TargetSpec.from_dict(target_spec_data)
# Charger expected
with open(case_dir / "expected.json", 'r', encoding='utf-8') as f:
expected_data = json.load(f)
# Métadonnées optionnelles
metadata = {}
metadata_file = case_dir / "metadata.json"
if metadata_file.exists():
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
return TestCase(
case_id=case_dir.name,
dataset_path=case_dir,
screen_state=screen_state,
target_spec=target_spec,
expected_element_id=expected_data["element_id"],
expected_confidence=expected_data.get("confidence", 0.95),
metadata=metadata
)
except Exception as e:
logger.error(f"Error loading test case from {case_dir}: {e}")
return None
def run_simulation(
self,
test_cases: List[TestCase],
include_alternatives: bool = True
) -> ReplayReport:
"""
Exécuter la simulation sur une liste de cas de test.
Args:
test_cases: Cas de test à évaluer
include_alternatives: Inclure les alternatives dans les résultats
Returns:
Rapport complet de simulation
"""
start_time = time.perf_counter()
results = []
logger.info(f"Starting replay simulation on {len(test_cases)} test cases")
for i, test_case in enumerate(test_cases):
if i % 10 == 0:
logger.info(f"Processing test case {i+1}/{len(test_cases)}")
try:
result = self._simulate_single_case(test_case, include_alternatives)
results.append(result)
self.stats["cases_processed"] += 1
except Exception as e:
logger.error(f"Error simulating case {test_case.case_id}: {e}")
# Créer un résultat d'erreur
error_result = SimulationResult(
case_id=test_case.case_id,
success=False,
resolved_element_id=None,
expected_element_id=test_case.expected_element_id,
risk_metrics=RiskMetrics(
ambiguity_score=1.0,
confidence_score=0.0,
margin_top1_top2=0.0,
element_count=0,
resolution_time_ms=0.0
),
strategy_used="ERROR",
error_message=str(e)
)
results.append(error_result)
# Calculer les statistiques globales
total_time = (time.perf_counter() - start_time) * 1000
successful_cases = sum(1 for r in results if r.success)
correct_cases = sum(1 for r in results if r.is_correct)
failed_cases = len(results) - successful_cases
# Statistiques de performance
resolution_times = [r.risk_metrics.resolution_time_ms for r in results if r.success]
performance_stats = {
"total_simulation_time_ms": total_time,
"avg_resolution_time_ms": sum(resolution_times) / max(1, len(resolution_times)),
"min_resolution_time_ms": min(resolution_times) if resolution_times else 0.0,
"max_resolution_time_ms": max(resolution_times) if resolution_times else 0.0,
"cases_per_second": len(test_cases) / max(0.001, total_time / 1000)
}
# Analyse des risques
risk_scores = [r.risk_metrics.overall_risk for r in results if r.success]
risk_analysis = {
"average_risk": sum(risk_scores) / max(1, len(risk_scores)),
"high_risk_cases": sum(1 for r in risk_scores if r > 0.7),
"medium_risk_cases": sum(1 for r in risk_scores if 0.3 <= r <= 0.7),
"low_risk_cases": sum(1 for r in risk_scores if r < 0.3),
"risk_distribution": self._calculate_risk_distribution(risk_scores)
}
report = ReplayReport(
timestamp=datetime.now(),
total_cases=len(test_cases),
successful_cases=successful_cases,
correct_cases=correct_cases,
failed_cases=failed_cases,
results=results,
performance_stats=performance_stats,
risk_analysis=risk_analysis
)
logger.info(f"Simulation completed: {successful_cases}/{len(test_cases)} successful, "
f"{correct_cases}/{len(test_cases)} correct, avg risk: {report.average_risk:.3f}")
return report
def _simulate_single_case(
self,
test_case: TestCase,
include_alternatives: bool
) -> SimulationResult:
"""
Simuler un cas de test unique.
Args:
test_case: Cas de test à évaluer
include_alternatives: Inclure les alternatives
Returns:
Résultat de simulation pour ce cas
"""
start_time = time.perf_counter()
try:
# Résoudre la cible avec le TargetResolver réel
resolved_target = self.target_resolver.resolve_target(
target_spec=test_case.target_spec,
screen_state=test_case.screen_state
)
resolution_time = (time.perf_counter() - start_time) * 1000
self.stats["total_resolution_time_ms"] += resolution_time
if resolved_target is None:
# Échec de résolution
return SimulationResult(
case_id=test_case.case_id,
success=False,
resolved_element_id=None,
expected_element_id=test_case.expected_element_id,
risk_metrics=RiskMetrics(
ambiguity_score=1.0,
confidence_score=0.0,
margin_top1_top2=0.0,
element_count=len(test_case.screen_state.ui_elements),
resolution_time_ms=resolution_time
),
strategy_used="FAILED"
)
# Calculer les métriques de risque
risk_metrics = self._calculate_risk_metrics(
resolved_target,
test_case.screen_state.ui_elements,
resolution_time
)
# Préparer les alternatives si demandées
alternatives = []
if include_alternatives and resolved_target.alternatives:
alternatives = [
{
"element_id": alt.element.element_id,
"confidence": alt.confidence,
"strategy": alt.strategy_used
}
for alt in resolved_target.alternatives[:3] # Top 3
]
return SimulationResult(
case_id=test_case.case_id,
success=True,
resolved_element_id=resolved_target.element.element_id,
expected_element_id=test_case.expected_element_id,
risk_metrics=risk_metrics,
strategy_used=resolved_target.strategy_used,
alternatives=alternatives
)
except Exception as e:
resolution_time = (time.perf_counter() - start_time) * 1000
return SimulationResult(
case_id=test_case.case_id,
success=False,
resolved_element_id=None,
expected_element_id=test_case.expected_element_id,
risk_metrics=RiskMetrics(
ambiguity_score=1.0,
confidence_score=0.0,
margin_top1_top2=0.0,
element_count=0,
resolution_time_ms=resolution_time
),
strategy_used="ERROR",
error_message=str(e)
)
def _calculate_risk_metrics(
self,
resolved_target,
ui_elements: List[UIElement],
resolution_time_ms: float
) -> RiskMetrics:
"""
Calculer les métriques de risque pour une résolution.
Args:
resolved_target: Résultat de résolution
ui_elements: Tous les éléments UI disponibles
resolution_time_ms: Temps de résolution
Returns:
Métriques de risque calculées
"""
# Score d'ambiguïté basé sur le nombre d'éléments similaires
similar_elements = self._count_similar_elements(
resolved_target.element,
ui_elements
)
ambiguity_score = min(similar_elements / 10.0, 1.0) # Normaliser sur 10 éléments max
# Score de confiance du resolver
confidence_score = resolved_target.confidence
# Marge entre top1 et top2
margin_top1_top2 = 0.0
if resolved_target.alternatives and len(resolved_target.alternatives) > 0:
top2_confidence = resolved_target.alternatives[0].confidence
margin_top1_top2 = max(0.0, confidence_score - top2_confidence)
else:
margin_top1_top2 = confidence_score # Pas d'alternative = marge maximale
return RiskMetrics(
ambiguity_score=ambiguity_score,
confidence_score=confidence_score,
margin_top1_top2=margin_top1_top2,
element_count=len(ui_elements),
resolution_time_ms=resolution_time_ms
)
def _count_similar_elements(
self,
target_element: UIElement,
ui_elements: List[UIElement]
) -> int:
"""
Compter les éléments similaires au target (même rôle/type).
Args:
target_element: Élément cible résolu
ui_elements: Tous les éléments UI
Returns:
Nombre d'éléments similaires
"""
target_role = (getattr(target_element, 'role', '') or '').lower()
target_type = (getattr(target_element, 'type', '') or '').lower()
similar_count = 0
for elem in ui_elements:
if elem.element_id == target_element.element_id:
continue # Ignorer l'élément lui-même
elem_role = (getattr(elem, 'role', '') or '').lower()
elem_type = (getattr(elem, 'type', '') or '').lower()
if elem_role == target_role or elem_type == target_type:
similar_count += 1
return similar_count
def _calculate_risk_distribution(self, risk_scores: List[float]) -> Dict[str, int]:
"""
Calculer la distribution des scores de risque par tranches.
Args:
risk_scores: Liste des scores de risque
Returns:
Distribution par tranches
"""
if not risk_scores:
return {}
distribution = {
"0.0-0.1": 0,
"0.1-0.2": 0,
"0.2-0.3": 0,
"0.3-0.4": 0,
"0.4-0.5": 0,
"0.5-0.6": 0,
"0.6-0.7": 0,
"0.7-0.8": 0,
"0.8-0.9": 0,
"0.9-1.0": 0
}
for score in risk_scores:
if score < 0.1:
distribution["0.0-0.1"] += 1
elif score < 0.2:
distribution["0.1-0.2"] += 1
elif score < 0.3:
distribution["0.2-0.3"] += 1
elif score < 0.4:
distribution["0.3-0.4"] += 1
elif score < 0.5:
distribution["0.4-0.5"] += 1
elif score < 0.6:
distribution["0.5-0.6"] += 1
elif score < 0.7:
distribution["0.6-0.7"] += 1
elif score < 0.8:
distribution["0.7-0.8"] += 1
elif score < 0.9:
distribution["0.8-0.9"] += 1
else:
distribution["0.9-1.0"] += 1
return distribution
def export_json_report(
self,
report: ReplayReport,
output_path: Path
) -> None:
"""
Exporter le rapport au format JSON machine-friendly.
Args:
report: Rapport à exporter
output_path: Chemin de sortie
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
# Sérialiser le rapport
report_data = {
"metadata": {
"timestamp": report.timestamp.isoformat(),
"total_cases": report.total_cases,
"successful_cases": report.successful_cases,
"correct_cases": report.correct_cases,
"failed_cases": report.failed_cases,
"success_rate": report.success_rate,
"accuracy_rate": report.accuracy_rate,
"average_risk": report.average_risk
},
"performance_stats": report.performance_stats,
"risk_analysis": report.risk_analysis,
"results": [
{
"case_id": r.case_id,
"success": r.success,
"is_correct": r.is_correct,
"resolved_element_id": r.resolved_element_id,
"expected_element_id": r.expected_element_id,
"strategy_used": r.strategy_used,
"error_message": r.error_message,
"risk_metrics": {
"ambiguity_score": r.risk_metrics.ambiguity_score,
"confidence_score": r.risk_metrics.confidence_score,
"margin_top1_top2": r.risk_metrics.margin_top1_top2,
"element_count": r.risk_metrics.element_count,
"resolution_time_ms": r.risk_metrics.resolution_time_ms,
"overall_risk": r.risk_metrics.overall_risk
},
"alternatives": r.alternatives
}
for r in report.results
]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
logger.info(f"JSON report exported to {output_path}")
def export_markdown_report(
self,
report: ReplayReport,
output_path: Path
) -> None:
"""
Exporter le rapport au format Markdown human-friendly.
Args:
report: Rapport à exporter
output_path: Chemin de sortie
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
# Générer le contenu Markdown
md_content = self._generate_markdown_content(report)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(md_content)
logger.info(f"Markdown report exported to {output_path}")
def _generate_markdown_content(self, report: ReplayReport) -> str:
"""
Générer le contenu Markdown du rapport.
Args:
report: Rapport à convertir
Returns:
Contenu Markdown formaté
"""
md_lines = [
"# Replay Simulation Report",
"",
f"**Généré le :** {report.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
f"**Auteur :** Dom, Alice Kiro",
"",
"## Résumé Exécutif",
"",
f"- **Cas de test traités :** {report.total_cases}",
f"- **Résolutions réussies :** {report.successful_cases} ({report.success_rate:.1%})",
f"- **Résolutions correctes :** {report.correct_cases} ({report.accuracy_rate:.1%})",
f"- **Échecs :** {report.failed_cases}",
f"- **Score de risque moyen :** {report.average_risk:.3f}",
"",
"## Performance",
"",
f"- **Temps total :** {report.performance_stats['total_simulation_time_ms']:.1f}ms",
f"- **Temps moyen par résolution :** {report.performance_stats['avg_resolution_time_ms']:.1f}ms",
f"- **Débit :** {report.performance_stats['cases_per_second']:.1f} cas/seconde",
f"- **Temps min/max :** {report.performance_stats['min_resolution_time_ms']:.1f}ms / {report.performance_stats['max_resolution_time_ms']:.1f}ms",
"",
"## Analyse des Risques",
"",
f"- **Cas à risque élevé (>0.7) :** {report.risk_analysis['high_risk_cases']}",
f"- **Cas à risque moyen (0.3-0.7) :** {report.risk_analysis['medium_risk_cases']}",
f"- **Cas à faible risque (<0.3) :** {report.risk_analysis['low_risk_cases']}",
"",
"### Distribution des Risques",
"",
"| Tranche | Nombre de cas |",
"|---------|---------------|"
]
# Ajouter la distribution des risques
for tranche, count in report.risk_analysis['risk_distribution'].items():
md_lines.append(f"| {tranche} | {count} |")
md_lines.extend([
"",
"## Détails par Stratégie",
"",
"| Stratégie | Cas | Succès | Précision |",
"|-----------|-----|--------|-----------|"
])
# Analyser par stratégie
strategy_stats = {}
for result in report.results:
strategy = result.strategy_used
if strategy not in strategy_stats:
strategy_stats[strategy] = {"total": 0, "success": 0, "correct": 0}
strategy_stats[strategy]["total"] += 1
if result.success:
strategy_stats[strategy]["success"] += 1
if result.is_correct:
strategy_stats[strategy]["correct"] += 1
for strategy, stats in strategy_stats.items():
success_rate = stats["success"] / max(1, stats["total"])
accuracy_rate = stats["correct"] / max(1, stats["total"])
md_lines.append(f"| {strategy} | {stats['total']} | {success_rate:.1%} | {accuracy_rate:.1%} |")
md_lines.extend([
"",
"## Cas Problématiques (Risque > 0.7)",
""
])
# Lister les cas à risque élevé
high_risk_cases = [r for r in report.results if r.success and r.risk_metrics.overall_risk > 0.7]
high_risk_cases.sort(key=lambda x: x.risk_metrics.overall_risk, reverse=True)
if high_risk_cases:
md_lines.extend([
"| Cas | Risque | Confiance | Ambiguïté | Marge | Temps |",
"|-----|--------|-----------|-----------|-------|-------|"
])
for case in high_risk_cases[:10]: # Top 10
md_lines.append(
f"| {case.case_id} | {case.risk_metrics.overall_risk:.3f} | "
f"{case.risk_metrics.confidence_score:.3f} | "
f"{case.risk_metrics.ambiguity_score:.3f} | "
f"{case.risk_metrics.margin_top1_top2:.3f} | "
f"{case.risk_metrics.resolution_time_ms:.1f}ms |"
)
else:
md_lines.append("*Aucun cas à risque élevé détecté.*")
md_lines.extend([
"",
"## Échecs de Résolution",
""
])
# Lister les échecs
failed_cases = [r for r in report.results if not r.success]
if failed_cases:
md_lines.extend([
"| Cas | Erreur |",
"|-----|--------|"
])
for case in failed_cases[:10]: # Top 10
error_msg = case.error_message or "Aucune résolution trouvée"
md_lines.append(f"| {case.case_id} | {error_msg} |")
else:
md_lines.append("*Aucun échec de résolution.*")
md_lines.extend([
"",
"## Recommandations",
"",
self._generate_recommendations(report),
"",
"---",
f"*Rapport généré par RPA Vision V3 - Replay Simulation Engine*"
])
return "\n".join(md_lines)
def _generate_recommendations(self, report: ReplayReport) -> str:
"""
Générer des recommandations basées sur l'analyse du rapport.
Args:
report: Rapport analysé
Returns:
Recommandations formatées en Markdown
"""
recommendations = []
# Analyse du taux de succès
if report.success_rate < 0.8:
recommendations.append(
"⚠️ **Taux de succès faible** : Considérer l'amélioration des stratégies de fallback"
)
# Analyse du taux de précision
if report.accuracy_rate < 0.9:
recommendations.append(
"⚠️ **Précision insuffisante** : Revoir les critères de scoring et les seuils de confiance"
)
# Analyse des risques
if report.average_risk > 0.5:
recommendations.append(
"⚠️ **Risque élevé** : Améliorer la désambiguïsation et les marges de confiance"
)
# Analyse des performances
avg_time = report.performance_stats['avg_resolution_time_ms']
if avg_time > 100:
recommendations.append(
f"⚠️ **Performance** : Temps de résolution élevé ({avg_time:.1f}ms), optimiser les algorithmes"
)
# Analyse des stratégies
strategy_stats = {}
for result in report.results:
strategy = result.strategy_used
if strategy not in strategy_stats:
strategy_stats[strategy] = {"total": 0, "correct": 0}
strategy_stats[strategy]["total"] += 1
if result.is_correct:
strategy_stats[strategy]["correct"] += 1
for strategy, stats in strategy_stats.items():
accuracy = stats["correct"] / max(1, stats["total"])
if accuracy < 0.8 and stats["total"] > 5:
recommendations.append(
f"⚠️ **Stratégie {strategy}** : Précision faible ({accuracy:.1%}), revoir l'implémentation"
)
if not recommendations:
recommendations.append("✅ **Excellent** : Toutes les métriques sont dans les objectifs")
return "\n".join(f"- {rec}" for rec in recommendations)
def create_replay_simulation_cli():
"""
Créer une interface CLI pour le replay simulation.
Returns:
Fonction CLI configurée
"""
import argparse
def cli_main():
parser = argparse.ArgumentParser(
description="Replay Simulation Report - Test headless des règles de résolution"
)
parser.add_argument(
"--dataset",
type=str,
default="**",
help="Pattern de dataset à charger (ex: 'form_*', '**')"
)
parser.add_argument(
"--max-cases",
type=int,
help="Nombre maximum de cas à traiter"
)
parser.add_argument(
"--out-json",
type=str,
default="replay_report.json",
help="Fichier de sortie JSON"
)
parser.add_argument(
"--out-md",
type=str,
default="replay_report.md",
help="Fichier de sortie Markdown"
)
parser.add_argument(
"--dataset-root",
type=str,
default="tests/dataset",
help="Racine des datasets de test"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Mode verbose"
)
args = parser.parse_args()
# Configuration du logging
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level, format='%(asctime)s - %(levelname)s - %(message)s')
# Créer le simulateur
simulator = ReplaySimulation(dataset_root=Path(args.dataset_root))
# Charger les cas de test
print(f"Chargement des cas de test depuis {args.dataset_root} (pattern: {args.dataset})")
test_cases = simulator.load_test_cases(args.dataset, args.max_cases)
if not test_cases:
print("❌ Aucun cas de test trouvé")
return 1
print(f"{len(test_cases)} cas de test chargés")
# Exécuter la simulation
print("🚀 Démarrage de la simulation...")
report = simulator.run_simulation(test_cases)
# Exporter les rapports
json_path = Path(args.out_json)
md_path = Path(args.out_md)
simulator.export_json_report(report, json_path)
simulator.export_markdown_report(report, md_path)
# Afficher le résumé
print("\n" + "="*60)
print("📊 RÉSUMÉ DE SIMULATION")
print("="*60)
print(f"Cas traités : {report.total_cases}")
print(f"Succès : {report.successful_cases} ({report.success_rate:.1%})")
print(f"Précision : {report.correct_cases} ({report.accuracy_rate:.1%})")
print(f"Risque moyen : {report.average_risk:.3f}")
print(f"Temps total : {report.performance_stats['total_simulation_time_ms']:.1f}ms")
print(f"Débit : {report.performance_stats['cases_per_second']:.1f} cas/sec")
print("\n📄 Rapports générés :")
print(f" - JSON : {json_path}")
print(f" - Markdown : {md_path}")
return 0
return cli_main
if __name__ == "__main__":
cli_main = create_replay_simulation_cli()
exit(cli_main())