feat(grounding): pipeline centralisé + serveur UI-TARS transformers + nettoyage code mort

Architecture grounding complète :
- core/grounding/server.py : serveur FastAPI (port 8200) avec UI-TARS-1.5-7B en 4-bit NF4
  Process séparé avec son propre contexte CUDA (résout le crash Flask/CUDA)
- core/grounding/pipeline.py : orchestrateur cascade template→OCR→UI-TARS→static
- core/grounding/template_matcher.py : TemplateMatcher centralisé (remplace 5 copies)
- core/grounding/ui_tars_grounder.py : client HTTP vers le serveur de grounding
- core/grounding/target.py : GroundingTarget + GroundingResult

ORA modifié :
- _act_click() : capture unique de l'écran envoyée au serveur de grounding
- Pre-check VLM skippé pour ui_tars (redondant, et Ollama n'a plus de VRAM)
- verify_level='none' par défaut (vérification titre OCR prévue en Phase 2)
- Détection réponses négatives UI-TARS ("I don't see it" → fallback OCR)

Nettoyage :
- 9 fichiers morts archivés dans _archive/ (~6300 lignes supprimées)
- 21 tests ajoutés pour TemplateMatcher

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-25 17:48:18 +02:00
parent 16ff396dbf
commit 9da589c8c2
20 changed files with 1862 additions and 15 deletions

View File

@@ -1,877 +0,0 @@
"""
Workflow Simulation Report - Fiche #16++
Système de simulation complète de workflows pour tester la chaîne complète :
Node Matching (FAISS) → Target Resolution → Post-conditions → Transition
Utilise des "scenario packs" avec frames séquentielles pour simuler des workflows
réalistes et générer des rapports de performance détaillés.
Auteur : Dom, Alice Kiro - 22 décembre 2025
"""
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Union
import numpy as np
from datetime import datetime
from ..models.screen_state import ScreenState
from ..models.ui_element import UIElement
from ..models.workflow_graph import Workflow, WorkflowNode, WorkflowEdge, TargetSpec, PostConditions, PostConditionCheck
from ..graph.node_matcher import NodeMatcher
from ..embedding.state_embedding_builder import StateEmbeddingBuilder
from ..execution.target_resolver import TargetResolver
logger = logging.getLogger(__name__)
@dataclass
class ScenarioFrame:
"""Frame individuelle dans un scénario de workflow"""
frame_id: str
step_number: int
screen_state: ScreenState
expected_node_id: Optional[str] = None # Node attendu pour ce frame
expected_action: Optional[Dict[str, Any]] = None # Action attendue
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ScenarioPack:
"""Pack de scénario complet avec frames séquentielles"""
scenario_id: str
name: str
description: str
workflow_id: str # Workflow à tester
frames: List[ScenarioFrame]
expected_path: List[str] # Séquence de node_ids attendue
metadata: Dict[str, Any] = field(default_factory=dict)
@classmethod
def load_from_directory(cls, scenario_dir: Path) -> 'ScenarioPack':
"""Charger un scenario pack depuis un répertoire"""
scenario_file = scenario_dir / "scenario.json"
if not scenario_file.exists():
raise FileNotFoundError(f"scenario.json not found in {scenario_dir}")
with open(scenario_file, 'r', encoding='utf-8') as f:
scenario_data = json.load(f)
# Charger les frames
frames = []
for step_data in scenario_data.get("steps", []):
step_file = scenario_dir / f"step_{step_data['step_number']:03d}.json"
if not step_file.exists():
logger.warning(f"Step file not found: {step_file}")
continue
with open(step_file, 'r', encoding='utf-8') as f:
step_content = json.load(f)
# Reconstruire ScreenState depuis JSON
screen_state = ScreenState.from_dict(step_content["screen_state"])
frame = ScenarioFrame(
frame_id=f"{scenario_data['scenario_id']}_step_{step_data['step_number']:03d}",
step_number=step_data["step_number"],
screen_state=screen_state,
expected_node_id=step_data.get("expected_node_id"),
expected_action=step_data.get("expected_action"),
metadata=step_data.get("metadata", {})
)
frames.append(frame)
return cls(
scenario_id=scenario_data["scenario_id"],
name=scenario_data["name"],
description=scenario_data["description"],
workflow_id=scenario_data["workflow_id"],
frames=frames,
expected_path=scenario_data.get("expected_path", []),
metadata=scenario_data.get("metadata", {})
)
@dataclass
class NodeMatchingResult:
"""Résultat du matching de node"""
frame_id: str
expected_node_id: Optional[str]
matched_node_id: Optional[str]
confidence: float
success: bool
strategy_used: str
error_message: Optional[str] = None
alternatives: List[Tuple[str, float]] = field(default_factory=list) # (node_id, confidence)
@dataclass
class TargetResolutionResult:
"""Résultat de la résolution de cible"""
frame_id: str
target_spec: Optional[TargetSpec]
resolved_element_id: Optional[str]
expected_element_id: Optional[str]
confidence: float
success: bool
strategy_used: str
resolution_time_ms: float
error_message: Optional[str] = None
alternatives: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class PostConditionResult:
"""Résultat de vérification des post-conditions"""
frame_id: str
post_conditions: Optional[PostConditions]
checks_passed: int
checks_total: int
success: bool
timeout_occurred: bool
verification_time_ms: float
failed_checks: List[str] = field(default_factory=list)
error_message: Optional[str] = None
@dataclass
class TransitionResult:
"""Résultat de transition vers le node suivant"""
from_frame_id: str
to_frame_id: str
expected_transition: bool
actual_transition: bool
success: bool
transition_confidence: float
error_message: Optional[str] = None
@dataclass
class WorkflowStepResult:
"""Résultat complet d'une étape de workflow"""
frame_id: str
step_number: int
node_matching: NodeMatchingResult
target_resolution: Optional[TargetResolutionResult]
post_conditions: Optional[PostConditionResult]
transition: Optional[TransitionResult]
overall_success: bool
step_duration_ms: float
@property
def success_components(self) -> Dict[str, bool]:
"""Composants de succès pour analyse détaillée"""
return {
"node_matching": self.node_matching.success,
"target_resolution": self.target_resolution.success if self.target_resolution else True,
"post_conditions": self.post_conditions.success if self.post_conditions else True,
"transition": self.transition.success if self.transition else True
}
@dataclass
class WorkflowSimulationReport:
"""Rapport complet de simulation de workflow"""
scenario_id: str
workflow_id: str
timestamp: datetime
total_steps: int
successful_steps: int
step_results: List[WorkflowStepResult]
# Métriques globales
node_matching_accuracy: float
target_resolution_accuracy: float
post_condition_success_rate: float
transition_accuracy: float
# Performance
total_simulation_time_ms: float
avg_step_time_ms: float
# Analyse des erreurs
error_breakdown: Dict[str, int]
failure_points: List[str]
# Recommandations
recommendations: List[str]
@property
def overall_success_rate(self) -> float:
"""Taux de succès global"""
return self.successful_steps / max(1, self.total_steps)
def to_dict(self) -> Dict[str, Any]:
"""Sérialiser en dictionnaire"""
return {
"scenario_id": self.scenario_id,
"workflow_id": self.workflow_id,
"timestamp": self.timestamp.isoformat(),
"total_steps": self.total_steps,
"successful_steps": self.successful_steps,
"step_results": [
{
"frame_id": result.frame_id,
"step_number": result.step_number,
"overall_success": result.overall_success,
"step_duration_ms": result.step_duration_ms,
"success_components": result.success_components,
"node_matching": {
"expected_node_id": result.node_matching.expected_node_id,
"matched_node_id": result.node_matching.matched_node_id,
"confidence": result.node_matching.confidence,
"success": result.node_matching.success,
"strategy_used": result.node_matching.strategy_used,
"error_message": result.node_matching.error_message
},
"target_resolution": {
"resolved_element_id": result.target_resolution.resolved_element_id if result.target_resolution else None,
"confidence": result.target_resolution.confidence if result.target_resolution else 0.0,
"success": result.target_resolution.success if result.target_resolution else True,
"strategy_used": result.target_resolution.strategy_used if result.target_resolution else "N/A",
"resolution_time_ms": result.target_resolution.resolution_time_ms if result.target_resolution else 0.0
} if result.target_resolution else None,
"post_conditions": {
"checks_passed": result.post_conditions.checks_passed if result.post_conditions else 0,
"checks_total": result.post_conditions.checks_total if result.post_conditions else 0,
"success": result.post_conditions.success if result.post_conditions else True,
"verification_time_ms": result.post_conditions.verification_time_ms if result.post_conditions else 0.0
} if result.post_conditions else None,
"transition": {
"expected_transition": result.transition.expected_transition if result.transition else False,
"actual_transition": result.transition.actual_transition if result.transition else False,
"success": result.transition.success if result.transition else True,
"transition_confidence": result.transition.transition_confidence if result.transition else 0.0
} if result.transition else None
}
for result in self.step_results
],
"metrics": {
"node_matching_accuracy": self.node_matching_accuracy,
"target_resolution_accuracy": self.target_resolution_accuracy,
"post_condition_success_rate": self.post_condition_success_rate,
"transition_accuracy": self.transition_accuracy,
"overall_success_rate": self.overall_success_rate
},
"performance": {
"total_simulation_time_ms": self.total_simulation_time_ms,
"avg_step_time_ms": self.avg_step_time_ms
},
"analysis": {
"error_breakdown": self.error_breakdown,
"failure_points": self.failure_points,
"recommendations": self.recommendations
}
}
def save_to_file(self, filepath: Path) -> None:
"""Sauvegarder le rapport dans un fichier JSON"""
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
def generate_markdown_report(self) -> str:
"""Générer un rapport Markdown lisible"""
md_lines = [
f"# Workflow Simulation Report",
f"",
f"**Scenario:** {self.scenario_id}",
f"**Workflow:** {self.workflow_id}",
f"**Date:** {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
f"",
f"## Summary",
f"",
f"- **Total Steps:** {self.total_steps}",
f"- **Successful Steps:** {self.successful_steps}",
f"- **Overall Success Rate:** {self.overall_success_rate:.1%}",
f"- **Total Simulation Time:** {self.total_simulation_time_ms:.0f}ms",
f"- **Average Step Time:** {self.avg_step_time_ms:.0f}ms",
f"",
f"## Component Accuracy",
f"",
f"| Component | Accuracy |",
f"|-----------|----------|",
f"| Node Matching | {self.node_matching_accuracy:.1%} |",
f"| Target Resolution | {self.target_resolution_accuracy:.1%} |",
f"| Post-conditions | {self.post_condition_success_rate:.1%} |",
f"| Transitions | {self.transition_accuracy:.1%} |",
f"",
f"## Error Breakdown",
f""
]
if self.error_breakdown:
for error_type, count in self.error_breakdown.items():
md_lines.append(f"- **{error_type}:** {count}")
else:
md_lines.append("- No errors detected")
md_lines.extend([
f"",
f"## Failure Points",
f""
])
if self.failure_points:
for failure in self.failure_points:
md_lines.append(f"- {failure}")
else:
md_lines.append("- No critical failure points identified")
md_lines.extend([
f"",
f"## Recommendations",
f""
])
if self.recommendations:
for rec in self.recommendations:
md_lines.append(f"- {rec}")
else:
md_lines.append("- No specific recommendations at this time")
md_lines.extend([
f"",
f"## Detailed Step Results",
f"",
f"| Step | Node Match | Target Res | Post-Cond | Transition | Duration |",
f"|------|------------|------------|-----------|------------|----------|"
])
for result in self.step_results:
node_status = "" if result.node_matching.success else ""
target_status = "" if result.target_resolution and result.target_resolution.success else "N/A"
post_status = "" if result.post_conditions and result.post_conditions.success else "N/A"
trans_status = "" if result.transition and result.transition.success else "N/A"
md_lines.append(
f"| {result.step_number} | {node_status} | {target_status} | {post_status} | {trans_status} | {result.step_duration_ms:.0f}ms |"
)
return "\n".join(md_lines)
class WorkflowSimulator:
"""
Simulateur de workflow complet
Teste la chaîne complète : Node Matching → Target Resolution → Post-conditions → Transition
"""
def __init__(
self,
node_matcher: Optional[NodeMatcher] = None,
target_resolver: Optional[TargetResolver] = None,
state_embedding_builder: Optional[StateEmbeddingBuilder] = None
):
"""
Initialiser le simulateur
Args:
node_matcher: Matcher de nodes (créé par défaut si None)
target_resolver: Résolveur de cibles (créé par défaut si None)
state_embedding_builder: Builder d'embeddings (créé par défaut si None)
"""
self.node_matcher = node_matcher or NodeMatcher()
self.target_resolver = target_resolver or TargetResolver()
self.state_embedding_builder = state_embedding_builder or StateEmbeddingBuilder()
logger.info("WorkflowSimulator initialized")
def simulate_workflow(
self,
scenario_pack: ScenarioPack,
workflow: Workflow,
output_dir: Optional[Path] = None
) -> WorkflowSimulationReport:
"""
Simuler un workflow complet avec un scenario pack
Args:
scenario_pack: Pack de scénario avec frames séquentielles
workflow: Workflow à tester
output_dir: Répertoire de sortie pour les rapports (optionnel)
Returns:
Rapport de simulation complet
"""
start_time = time.time()
step_results = []
logger.info(f"Starting workflow simulation: {scenario_pack.scenario_id}")
logger.info(f"Workflow: {workflow.workflow_id}, Steps: {len(scenario_pack.frames)}")
# Simuler chaque étape
for i, frame in enumerate(scenario_pack.frames):
step_start = time.time()
# 1. Node Matching
node_matching_result = self._simulate_node_matching(frame, workflow)
# 2. Target Resolution (si node matché et action attendue)
target_resolution_result = None
if node_matching_result.success and frame.expected_action:
target_resolution_result = self._simulate_target_resolution(frame, workflow, node_matching_result.matched_node_id)
# 3. Post-conditions (si action résolue)
post_condition_result = None
if target_resolution_result and target_resolution_result.success:
post_condition_result = self._simulate_post_conditions(frame, workflow, node_matching_result.matched_node_id)
# 4. Transition (si pas dernière étape)
transition_result = None
if i < len(scenario_pack.frames) - 1:
next_frame = scenario_pack.frames[i + 1]
transition_result = self._simulate_transition(frame, next_frame, workflow)
# Calculer succès global de l'étape
overall_success = (
node_matching_result.success and
(target_resolution_result is None or target_resolution_result.success) and
(post_condition_result is None or post_condition_result.success) and
(transition_result is None or transition_result.success)
)
step_duration = (time.time() - step_start) * 1000
step_result = WorkflowStepResult(
frame_id=frame.frame_id,
step_number=frame.step_number,
node_matching=node_matching_result,
target_resolution=target_resolution_result,
post_conditions=post_condition_result,
transition=transition_result,
overall_success=overall_success,
step_duration_ms=step_duration
)
step_results.append(step_result)
logger.debug(f"Step {frame.step_number}: {'' if overall_success else ''} ({step_duration:.0f}ms)")
# Calculer métriques globales
total_time = (time.time() - start_time) * 1000
report = self._generate_report(scenario_pack, workflow, step_results, total_time)
# Sauvegarder si répertoire spécifié
if output_dir:
self._save_reports(report, output_dir)
logger.info(f"Simulation completed: {report.overall_success_rate:.1%} success rate")
return report
def _simulate_node_matching(self, frame: ScenarioFrame, workflow: Workflow) -> NodeMatchingResult:
"""Simuler le matching de node"""
try:
# Construire embedding pour le frame
state_embedding = self.state_embedding_builder.build(frame.screen_state)
# Tenter de matcher avec les nodes du workflow
candidate_nodes = workflow.nodes
match_result = self.node_matcher.match(frame.screen_state, candidate_nodes)
if match_result:
matched_node, confidence = match_result
success = True
matched_node_id = matched_node.node_id
strategy_used = "faiss_search" # ou autre selon NodeMatcher
error_message = None
else:
success = False
matched_node_id = None
confidence = 0.0
strategy_used = "none"
error_message = "No matching node found"
return NodeMatchingResult(
frame_id=frame.frame_id,
expected_node_id=frame.expected_node_id,
matched_node_id=matched_node_id,
confidence=confidence,
success=success,
strategy_used=strategy_used,
error_message=error_message
)
except Exception as e:
logger.error(f"Node matching failed for frame {frame.frame_id}: {e}")
return NodeMatchingResult(
frame_id=frame.frame_id,
expected_node_id=frame.expected_node_id,
matched_node_id=None,
confidence=0.0,
success=False,
strategy_used="error",
error_message=str(e)
)
def _simulate_target_resolution(
self,
frame: ScenarioFrame,
workflow: Workflow,
matched_node_id: str
) -> TargetResolutionResult:
"""Simuler la résolution de cible"""
try:
start_time = time.time()
# Récupérer l'action attendue
expected_action = frame.expected_action
if not expected_action or "target" not in expected_action:
return TargetResolutionResult(
frame_id=frame.frame_id,
target_spec=None,
resolved_element_id=None,
expected_element_id=None,
confidence=0.0,
success=True, # Pas d'action = succès
strategy_used="no_action",
resolution_time_ms=0.0
)
# Construire TargetSpec depuis l'action attendue
target_spec = TargetSpec.from_dict(expected_action["target"])
# Résoudre la cible
resolved_target = self.target_resolver.resolve_target(
target_spec,
frame.screen_state,
context={}
)
resolution_time = (time.time() - start_time) * 1000
if resolved_target:
return TargetResolutionResult(
frame_id=frame.frame_id,
target_spec=target_spec,
resolved_element_id=resolved_target.element.element_id,
expected_element_id=expected_action.get("expected_element_id"),
confidence=resolved_target.confidence,
success=True,
strategy_used=resolved_target.strategy_used,
resolution_time_ms=resolution_time
)
else:
return TargetResolutionResult(
frame_id=frame.frame_id,
target_spec=target_spec,
resolved_element_id=None,
expected_element_id=expected_action.get("expected_element_id"),
confidence=0.0,
success=False,
strategy_used="failed",
resolution_time_ms=resolution_time,
error_message="Target resolution failed"
)
except Exception as e:
logger.error(f"Target resolution failed for frame {frame.frame_id}: {e}")
return TargetResolutionResult(
frame_id=frame.frame_id,
target_spec=None,
resolved_element_id=None,
expected_element_id=None,
confidence=0.0,
success=False,
strategy_used="error",
resolution_time_ms=0.0,
error_message=str(e)
)
def _simulate_post_conditions(
self,
frame: ScenarioFrame,
workflow: Workflow,
matched_node_id: str
) -> PostConditionResult:
"""Simuler la vérification des post-conditions"""
try:
start_time = time.time()
# Trouver l'edge correspondant pour récupérer les post-conditions
outgoing_edges = workflow.get_outgoing_edges(matched_node_id)
if not outgoing_edges:
return PostConditionResult(
frame_id=frame.frame_id,
post_conditions=None,
checks_passed=0,
checks_total=0,
success=True, # Pas de post-conditions = succès
timeout_occurred=False,
verification_time_ms=0.0
)
# Prendre le premier edge (simplification)
edge = outgoing_edges[0]
post_conditions = edge.post_conditions
if not post_conditions or not post_conditions.success:
return PostConditionResult(
frame_id=frame.frame_id,
post_conditions=post_conditions,
checks_passed=0,
checks_total=0,
success=True,
timeout_occurred=False,
verification_time_ms=0.0
)
# Simuler vérification des post-conditions
checks_total = len(post_conditions.success)
checks_passed = 0
failed_checks = []
for check in post_conditions.success:
if self._verify_post_condition_check(check, frame.screen_state):
checks_passed += 1
else:
failed_checks.append(f"{check.kind}: {check.value}")
verification_time = (time.time() - start_time) * 1000
success = checks_passed == checks_total
return PostConditionResult(
frame_id=frame.frame_id,
post_conditions=post_conditions,
checks_passed=checks_passed,
checks_total=checks_total,
success=success,
timeout_occurred=False,
verification_time_ms=verification_time,
failed_checks=failed_checks
)
except Exception as e:
logger.error(f"Post-condition verification failed for frame {frame.frame_id}: {e}")
return PostConditionResult(
frame_id=frame.frame_id,
post_conditions=None,
checks_passed=0,
checks_total=0,
success=False,
timeout_occurred=False,
verification_time_ms=0.0,
error_message=str(e)
)
def _verify_post_condition_check(self, check: PostConditionCheck, screen_state: ScreenState) -> bool:
"""Vérifier une post-condition individuelle"""
try:
if check.kind == "text_present":
# Vérifier présence de texte
detected_texts = getattr(screen_state.perception, 'detected_text', []) if hasattr(screen_state, 'perception') else []
return any(check.value in text for text in detected_texts)
elif check.kind == "text_absent":
# Vérifier absence de texte
detected_texts = getattr(screen_state.perception, 'detected_text', []) if hasattr(screen_state, 'perception') else []
return not any(check.value in text for text in detected_texts)
elif check.kind == "element_present":
# Vérifier présence d'élément
if not check.target:
return False
resolved_target = self.target_resolver.resolve_target(check.target, screen_state, context={})
return resolved_target is not None
elif check.kind == "window_title_contains":
# Vérifier titre de fenêtre
window_title = getattr(screen_state.window, 'window_title', '') if hasattr(screen_state, 'window') else ''
return check.value in window_title
else:
logger.warning(f"Unknown post-condition check kind: {check.kind}")
return False
except Exception as e:
logger.error(f"Post-condition check failed: {e}")
return False
def _simulate_transition(
self,
current_frame: ScenarioFrame,
next_frame: ScenarioFrame,
workflow: Workflow
) -> TransitionResult:
"""Simuler la transition vers le frame suivant"""
try:
# Vérifier si une transition est attendue
expected_transition = (
current_frame.expected_node_id != next_frame.expected_node_id and
current_frame.expected_node_id is not None and
next_frame.expected_node_id is not None
)
# Simuler la transition (ici on assume qu'elle réussit si les nodes sont différents)
actual_transition = expected_transition
success = expected_transition == actual_transition
transition_confidence = 1.0 if success else 0.0
return TransitionResult(
from_frame_id=current_frame.frame_id,
to_frame_id=next_frame.frame_id,
expected_transition=expected_transition,
actual_transition=actual_transition,
success=success,
transition_confidence=transition_confidence
)
except Exception as e:
logger.error(f"Transition simulation failed: {e}")
return TransitionResult(
from_frame_id=current_frame.frame_id,
to_frame_id=next_frame.frame_id,
expected_transition=False,
actual_transition=False,
success=False,
transition_confidence=0.0,
error_message=str(e)
)
def _generate_report(
self,
scenario_pack: ScenarioPack,
workflow: Workflow,
step_results: List[WorkflowStepResult],
total_time_ms: float
) -> WorkflowSimulationReport:
"""Générer le rapport final"""
total_steps = len(step_results)
successful_steps = sum(1 for result in step_results if result.overall_success)
# Calculer métriques par composant
node_matching_successes = sum(1 for result in step_results if result.node_matching.success)
target_resolution_successes = sum(1 for result in step_results
if result.target_resolution is None or result.target_resolution.success)
post_condition_successes = sum(1 for result in step_results
if result.post_conditions is None or result.post_conditions.success)
transition_successes = sum(1 for result in step_results
if result.transition is None or result.transition.success)
node_matching_accuracy = node_matching_successes / max(1, total_steps)
target_resolution_accuracy = target_resolution_successes / max(1, total_steps)
post_condition_success_rate = post_condition_successes / max(1, total_steps)
transition_accuracy = transition_successes / max(1, total_steps)
# Analyser les erreurs
error_breakdown = {}
failure_points = []
for result in step_results:
if not result.overall_success:
failure_points.append(f"Step {result.step_number}: {result.frame_id}")
if not result.node_matching.success:
error_breakdown["node_matching_failures"] = error_breakdown.get("node_matching_failures", 0) + 1
if result.target_resolution and not result.target_resolution.success:
error_breakdown["target_resolution_failures"] = error_breakdown.get("target_resolution_failures", 0) + 1
if result.post_conditions and not result.post_conditions.success:
error_breakdown["post_condition_failures"] = error_breakdown.get("post_condition_failures", 0) + 1
if result.transition and not result.transition.success:
error_breakdown["transition_failures"] = error_breakdown.get("transition_failures", 0) + 1
# Générer recommandations
recommendations = []
if node_matching_accuracy < 0.9:
recommendations.append("Consider improving node matching accuracy by updating embedding prototypes")
if target_resolution_accuracy < 0.9:
recommendations.append("Review target resolution strategies and fallback mechanisms")
if post_condition_success_rate < 0.9:
recommendations.append("Verify post-condition definitions and timeout settings")
if transition_accuracy < 0.9:
recommendations.append("Check workflow edge definitions and transition logic")
avg_step_time = total_time_ms / max(1, total_steps)
return WorkflowSimulationReport(
scenario_id=scenario_pack.scenario_id,
workflow_id=workflow.workflow_id,
timestamp=datetime.now(),
total_steps=total_steps,
successful_steps=successful_steps,
step_results=step_results,
node_matching_accuracy=node_matching_accuracy,
target_resolution_accuracy=target_resolution_accuracy,
post_condition_success_rate=post_condition_success_rate,
transition_accuracy=transition_accuracy,
total_simulation_time_ms=total_time_ms,
avg_step_time_ms=avg_step_time,
error_breakdown=error_breakdown,
failure_points=failure_points,
recommendations=recommendations
)
def _save_reports(self, report: WorkflowSimulationReport, output_dir: Path) -> None:
"""Sauvegarder les rapports JSON et Markdown"""
output_dir.mkdir(parents=True, exist_ok=True)
# Rapport JSON
json_path = output_dir / f"workflow_simulation_{report.scenario_id}_{report.timestamp.strftime('%Y%m%d_%H%M%S')}.json"
report.save_to_file(json_path)
# Rapport Markdown
md_path = output_dir / f"workflow_simulation_{report.scenario_id}_{report.timestamp.strftime('%Y%m%d_%H%M%S')}.md"
with open(md_path, 'w', encoding='utf-8') as f:
f.write(report.generate_markdown_report())
logger.info(f"Reports saved to {output_dir}")
# ============================================================================
# Fonctions utilitaires
# ============================================================================
def load_scenario_pack(scenario_dir: Union[str, Path]) -> ScenarioPack:
"""Charger un scenario pack depuis un répertoire"""
return ScenarioPack.load_from_directory(Path(scenario_dir))
def simulate_workflow_from_files(
scenario_dir: Union[str, Path],
workflow_file: Union[str, Path],
output_dir: Optional[Union[str, Path]] = None
) -> WorkflowSimulationReport:
"""
Simuler un workflow depuis des fichiers
Args:
scenario_dir: Répertoire du scenario pack
workflow_file: Fichier JSON du workflow
output_dir: Répertoire de sortie (optionnel)
Returns:
Rapport de simulation
"""
# Charger scenario pack
scenario_pack = load_scenario_pack(scenario_dir)
# Charger workflow
workflow = Workflow.load_from_file(Path(workflow_file))
# Créer simulateur
simulator = WorkflowSimulator()
# Exécuter simulation
output_path = Path(output_dir) if output_dir else None
return simulator.simulate_workflow(scenario_pack, workflow, output_path)
if __name__ == "__main__":
# Test basique
logging.basicConfig(level=logging.INFO)
# Exemple d'utilisation
scenario_dir = Path("tests/scenarios/login_flow")
workflow_file = Path("data/workflows/login_workflow.json")
output_dir = Path("data/simulation_reports")
if scenario_dir.exists() and workflow_file.exists():
report = simulate_workflow_from_files(scenario_dir, workflow_file, output_dir)
print(f"Simulation completed: {report.overall_success_rate:.1%} success rate")
else:
print("Example files not found - create test scenarios first")