Files
rpa_vision_v3/core/graph/graph_builder.py
Dom 7f2bc6fe97
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 12s
security-audit / pip-audit (CVE dépendances) (push) Successful in 11s
security-audit / Scan secrets (grep) (push) Successful in 9s
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 13s
tests / Tests sécurité (critique) (push) Has been skipped
feat(graph): enrichissement visuel des workflows (C2)
GraphBuilder construit maintenant des ScreenState enrichis
(ui_elements + detected_text) au lieu de stubs vides, et associe
les clics aux UIElement par proximité spatiale.

Détails :
- __init__ accepte ui_detector, screen_analyzer, enable_ui_enrichment,
  element_proximity_max_px (+ lazy resolver via singleton C1)
- _create_screen_states délègue à ScreenAnalyzer.analyze() — remplace
  l'appel à _extract_text() qui n'existait plus depuis le Lot C
  (bug silencieux : OCR cassé en prod depuis ce jour, caught except)
- _find_clicked_element : bbox contenant strict + fallback proximité
  ≤50px, préfère le plus petit bbox (form vs button)
- _build_click_target_spec : TargetSpec(by_role, by_text,
  selection_policy="by_similarity") avec ancres dans context_hints
  (anchor_element_id, anchor_bbox, anchor_center)
- _build_edges propage le ScreenState source aux builders d'action
- WorkflowPipeline passe ui_detector + enable_ui_enrichment au builder

Impact : matching prod 3-5x plus précis, TargetSpec ne sont plus
des "unknown_element" génériques, UIConstraint.required_roles se
remplit correctement via _extract_common_ui_elements (qui marchait
depuis toujours mais sur des state.ui_elements vides).

Tests e2e migrés vers enable_ui_enrichment=False (2.9s vs 67s) —
ils valident le pipeline DBSCAN/edges, pas la détection UI réelle.

15 nouveaux tests, 178 tests passants au total (incluant Lots A-E).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 22:02:30 +02:00

1972 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
GraphBuilder - Construction Automatique de Workflow Graphs
Ce module implémente la construction automatique de graphes de workflows
en analysant les sessions enregistrées et en détectant les patterns répétés.
Architecture:
1. Création de ScreenStates depuis RawSession
2. Calcul de State Embeddings pour tous les états
3. Détection de patterns via clustering DBSCAN
4. Construction de WorkflowNodes depuis clusters
5. Construction de WorkflowEdges depuis transitions
Algorithme de Détection de Patterns:
- Utilise DBSCAN (Density-Based Spatial Clustering of Applications with Noise)
- Métrique: similarité cosinus entre embeddings
- Filtre les clusters avec moins de N répétitions
- Calcule un prototype (moyenne) pour chaque cluster
Example:
>>> builder = GraphBuilder(min_pattern_repetitions=3)
>>> workflow = builder.build_from_session(raw_session)
>>> print(f"Workflow with {len(workflow.nodes)} nodes")
"""
import logging
import os
from typing import List, Dict, Optional, Tuple, Any
from collections import defaultdict, Counter
from datetime import datetime
from pathlib import Path
import numpy as np
from sklearn.cluster import DBSCAN
from core.models.raw_session import RawSession, Event
from core.models.screen_state import (
ScreenState, WindowContext, RawLevel, PerceptionLevel,
ContextLevel, EmbeddingRef
)
from core.models.workflow_graph import (
Workflow,
WorkflowNode,
WorkflowEdge,
ScreenTemplate,
Action,
TargetSpec,
EdgeConstraints,
PostConditions,
PostConditionCheck,
WindowConstraint,
TextConstraint,
UIConstraint,
EmbeddingPrototype,
)
from core.embedding.state_embedding_builder import StateEmbeddingBuilder
from core.embedding.faiss_manager import FAISSManager
from core.training.quality_validator import TrainingQualityValidator, QualityReport
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Filtrage des événements parasites (modificateurs seuls, etc.)
# Appliqué dans _find_transition_events et _build_compound_action.
# ---------------------------------------------------------------------------
_MODIFIER_ONLY_KEYS = {
"ctrl", "ctrl_l", "ctrl_r",
"alt", "alt_l", "alt_r",
"shift", "shift_l", "shift_r",
"win", "cmd", "cmd_l", "cmd_r",
"meta", "super", "super_l", "super_r",
}
def _is_modifier_only(keys: list) -> bool:
"""Retourne True si la liste de touches ne contient que des modificateurs."""
if not keys:
return True
return all(k.lower() in _MODIFIER_ONLY_KEYS for k in keys)
def _is_parasitic_event(event: Event) -> bool:
"""Retourne True si l'événement est parasite (modificateur seul, texte vide).
Filtrage appliqué aux événements de session avant construction des edges.
"""
if event.type == "key_press":
keys = event.data.get("keys", [])
if not keys or _is_modifier_only(keys):
return True
elif event.type == "text_input":
text = event.data.get("text", "")
if not text:
return True
return False
def _merge_consecutive_text_steps(steps: list) -> list:
"""Fusionne les text_input consécutifs en un seul."""
merged = []
for step in steps:
if (step.get("type") in ("text_input", "type")
and merged
and merged[-1].get("type") in ("text_input", "type")):
merged[-1]["text"] = merged[-1].get("text", "") + step.get("text", "")
else:
merged.append(dict(step))
return merged
def _dedup_consecutive_combos(steps: list) -> list:
"""Supprime les key_combo/key_press dupliqués consécutifs."""
deduped = []
for step in steps:
if (step.get("type") in ("key_combo", "key_press")
and deduped
and deduped[-1].get("type") in ("key_combo", "key_press")
and deduped[-1].get("keys") == step.get("keys")):
continue
deduped.append(step)
return deduped
def _filter_modifier_only_steps(steps: list) -> list:
"""Supprime les steps key_combo/key_press avec uniquement des modificateurs."""
return [
s for s in steps
if not (
s.get("type") in ("key_combo", "key_press")
and _is_modifier_only(s.get("keys", []))
)
]
def _ensure_min_waits(steps: list, min_wait_ms: int = 300) -> list:
"""Ajoute un wait entre les steps si aucun wait n'existe."""
if not steps:
return steps
result = [steps[0]]
for step in steps[1:]:
if result[-1].get("type") != "wait" and step.get("type") != "wait":
result.append({"type": "wait", "duration_ms": min_wait_ms})
result.append(step)
return result
class GraphBuilder:
"""
Constructeur de graphes de workflows depuis sessions brutes.
Cette classe analyse une RawSession pour construire automatiquement
un Workflow avec ses nodes et edges en détectant les patterns répétés.
Attributes:
embedding_builder: Builder pour calculer les State Embeddings
faiss_manager: Manager FAISS pour indexation (optionnel)
min_pattern_repetitions: Nombre minimum de répétitions pour un pattern
clustering_eps: Distance maximum entre points pour DBSCAN
clustering_min_samples: Nombre minimum d'échantillons par cluster
Example:
>>> builder = GraphBuilder(min_pattern_repetitions=3)
>>> workflow = builder.build_from_session(session, "Login Workflow")
"""
def __init__(
self,
embedding_builder: Optional[StateEmbeddingBuilder] = None,
faiss_manager: Optional[FAISSManager] = None,
quality_validator: Optional[TrainingQualityValidator] = None,
min_pattern_repetitions: int = 2,
clustering_eps: float = 0.08,
clustering_min_samples: int = 2,
enable_quality_validation: bool = True,
ui_detector: Optional[Any] = None,
screen_analyzer: Optional[Any] = None,
enable_ui_enrichment: bool = True,
element_proximity_max_px: float = 50.0,
):
"""
Initialiser le GraphBuilder.
Args:
embedding_builder: Builder pour State Embeddings (créé si None)
faiss_manager: Manager FAISS pour indexation (optionnel)
quality_validator: Validateur de qualité (créé si None)
min_pattern_repetitions: Nombre minimum de répétitions pour un pattern
clustering_eps: Epsilon pour DBSCAN (distance max entre points)
clustering_min_samples: Nombre minimum d'échantillons pour un cluster
enable_quality_validation: Activer la validation de qualité
ui_detector: UIDetector optionnel. Si fourni, sera utilisé par
l'analyzer lazy-initialisé. Sinon, fallback sur le singleton
partagé (`get_screen_analyzer()`).
screen_analyzer: Instance ScreenAnalyzer à utiliser directement.
Si None, lazy init via le singleton partagé C1.
enable_ui_enrichment: Active l'enrichissement visuel des
ScreenStates lors de `_create_screen_states` (OCR + UIDetector).
False = comportement historique (ui_elements=[], detected_text=[]).
element_proximity_max_px: Distance maximale (en pixels) entre un
clic et le bbox le plus proche pour qu'un UIElement soit
considéré comme cible. Au-delà, le clic reste sans ancre.
"""
self.embedding_builder = embedding_builder or StateEmbeddingBuilder()
self.faiss_manager = faiss_manager
self.quality_validator = quality_validator or TrainingQualityValidator()
self.min_pattern_repetitions = min_pattern_repetitions
self.clustering_eps = clustering_eps
self.clustering_min_samples = clustering_min_samples
self.enable_quality_validation = enable_quality_validation
self.enable_ui_enrichment = enable_ui_enrichment
self.element_proximity_max_px = element_proximity_max_px
# UIDetector explicite (optionnel) — injecté dans l'analyzer lazy.
self._ui_detector = ui_detector
# Instance ScreenAnalyzer. Si fournie, on l'utilise telle quelle ;
# sinon, on bascule sur le singleton partagé (lazy init).
self._screen_analyzer = screen_analyzer
logger.info(
f"GraphBuilder initialized: "
f"min_repetitions={min_pattern_repetitions}, "
f"eps={clustering_eps}, "
f"min_samples={clustering_min_samples}, "
f"quality_validation={enable_quality_validation}, "
f"ui_enrichment={enable_ui_enrichment}"
)
# ------------------------------------------------------------------
# Résolution paresseuse du ScreenAnalyzer (singleton C1 par défaut)
# ------------------------------------------------------------------
def _get_screen_analyzer(self):
"""
Retourner l'instance ScreenAnalyzer à utiliser.
Priorité :
1. Instance injectée via le constructeur (`screen_analyzer=…`).
2. Singleton partagé `get_screen_analyzer()` (C1) — évite le double
chargement GPU quand ExecutionLoop et stream_processor tournent.
3. En dernier recours (import circulaire, tests), création locale.
"""
if self._screen_analyzer is not None:
return self._screen_analyzer
try:
from core.pipeline import get_screen_analyzer
self._screen_analyzer = get_screen_analyzer(
ui_detector=self._ui_detector,
)
return self._screen_analyzer
except Exception as e:
logger.warning(
f"Impossible d'obtenir le ScreenAnalyzer singleton "
f"({e}); fallback sur une instance locale."
)
try:
from core.pipeline.screen_analyzer import ScreenAnalyzer
self._screen_analyzer = ScreenAnalyzer(
ui_detector=self._ui_detector,
)
return self._screen_analyzer
except Exception as e2:
logger.error(
f"Impossible d'instancier ScreenAnalyzer: {e2}. "
"Enrichissement UI désactivé."
)
return None
def build_from_session(
self,
session: RawSession,
workflow_name: Optional[str] = None,
precomputed_states: Optional[List["ScreenState"]] = None,
precomputed_embeddings: Optional[List] = None,
sequential: bool = False,
) -> Workflow:
"""
Construire un Workflow complet depuis une RawSession.
Processus:
1. Créer ScreenStates depuis screenshots (ou utiliser precomputed_states)
2. Calculer embeddings pour chaque état (ou réutiliser precomputed_embeddings)
3. Détecter patterns via clustering (ou mode séquentiel)
4. Construire nodes depuis clusters
5. Construire edges depuis transitions
Args:
session: Session brute à analyser
workflow_name: Nom du workflow (généré si None)
precomputed_states: ScreenStates déjà analysés (streaming).
Si fourni, saute l'étape 1 (pas de re-analyse via ScreenAnalyzer).
precomputed_embeddings: Embeddings déjà calculés (streaming).
Si fourni et de la bonne longueur (= len(screen_states)),
saute l'étape 2 (pas de recalcul CLIP).
sequential: Si True, crée un node par état d'écran (pas de
clustering DBSCAN). Approprié pour les enregistrements
single-pass d'un workflow — chaque screenshot est une étape
distincte avec ses actions associées.
Returns:
Workflow construit avec nodes et edges
Raises:
ValueError: Si la session est vide ou invalide
"""
if not precomputed_states and not session.screenshots:
raise ValueError("Session has no screenshots and no precomputed states")
logger.info(
f"Building workflow from session {session.session_id} "
f"with {len(precomputed_states or session.screenshots)} "
f"{'precomputed states' if precomputed_states else 'screenshots'}"
f"{' (mode séquentiel)' if sequential else ''}"
)
# Étape 1: Créer ScreenStates (ou réutiliser ceux pré-calculés)
if precomputed_states:
screen_states = precomputed_states
logger.debug(f"Using {len(screen_states)} precomputed screen states")
else:
screen_states = self._create_screen_states(session)
logger.debug(f"Created {len(screen_states)} screen states")
# Étape 2: Calculer embeddings (ou réutiliser ceux pré-calculés)
if precomputed_embeddings is not None and len(precomputed_embeddings) == len(screen_states):
embeddings = [np.asarray(e, dtype=np.float32) for e in precomputed_embeddings]
logger.debug(f"Using {len(embeddings)} precomputed embeddings (skip CLIP)")
else:
if precomputed_embeddings is not None:
logger.warning(
f"precomputed_embeddings length mismatch "
f"({len(precomputed_embeddings)} vs {len(screen_states)} states), "
f"recalculating..."
)
embeddings = self._compute_embeddings(screen_states)
logger.debug(f"Computed {len(embeddings)} embeddings")
# Étape 3: Détecter patterns ou mode séquentiel
if sequential:
# Mode séquentiel : chaque état d'écran est un node distinct.
# Pas de clustering — essentiel pour les enregistrements single-pass
# où l'on veut reproduire fidèlement la séquence des actions.
clusters = {i: [i] for i in range(len(screen_states))}
logger.info(
f"Mode séquentiel: {len(clusters)} nodes (1 par état)"
)
else:
clusters = self._detect_patterns(embeddings, screen_states)
logger.info(f"Detected {len(clusters)} patterns")
# Étape 4: Construire nodes
nodes = self._build_nodes(clusters, screen_states, embeddings)
logger.info(f"Built {len(nodes)} workflow nodes")
# Étape 5: Construire edges (passer les embeddings pour éviter recalcul)
edges = self._build_edges(
nodes, screen_states, session, embeddings=embeddings,
sequential=sequential,
)
logger.info(f"Built {len(edges)} workflow edges")
# Créer Workflow
from core.models.workflow_graph import WorkflowStats, SafetyRules, LearningConfig
workflow = Workflow(
workflow_id=workflow_name or f"workflow_{session.session_id}",
name=workflow_name or "Unnamed Workflow",
description="Auto-generated workflow",
version=1,
learning_state="OBSERVATION",
created_at=datetime.now(),
updated_at=datetime.now(),
entry_nodes=[nodes[0].node_id] if nodes else [],
end_nodes=[],
nodes=nodes,
edges=edges,
safety_rules=SafetyRules(),
stats=WorkflowStats(),
learning=LearningConfig()
)
# Étape 6: Validation de qualité
quality_report = None
if self.enable_quality_validation and screen_states:
quality_report = self._validate_workflow_quality(
workflow, screen_states, embeddings, clusters
)
# Stocker le rapport dans les métadonnées du workflow
workflow.metadata = workflow.metadata or {}
workflow.metadata['quality_report'] = quality_report.to_dict()
# Ajuster learning_state basé sur la qualité
if quality_report.is_production_ready:
workflow.learning_state = "AUTO_CANDIDATE"
logger.info("Workflow qualité suffisante -> AUTO_CANDIDATE")
else:
workflow.learning_state = "OBSERVATION"
logger.warning(
f"Qualité insuffisante ({quality_report.overall_score:.3f}), "
f"workflow reste en OBSERVATION"
)
logger.info(
f"Workflow '{workflow.name}' built successfully: "
f"{len(nodes)} nodes, {len(edges)} edges"
)
return workflow
def _validate_workflow_quality(
self,
workflow: Workflow,
screen_states: List[ScreenState],
embeddings: List[np.ndarray],
clusters: Dict[int, List[int]]
) -> QualityReport:
"""
Valider la qualité du workflow construit.
Args:
workflow: Workflow à valider
screen_states: États d'écran utilisés
embeddings: Embeddings calculés
clusters: Clusters détectés
Returns:
QualityReport avec métriques et recommandations
"""
logger.info(f"Validation qualité du workflow {workflow.workflow_id}")
# Préparer les données pour le validateur
embeddings_array = np.array(embeddings)
# Créer labels depuis les clusters
labels = np.full(len(embeddings), -1) # -1 = bruit
for cluster_id, indices in clusters.items():
for idx in indices:
labels[idx] = cluster_id
# Valider avec le TrainingQualityValidator
report = self.quality_validator.validate_workflow(
workflow=workflow,
observations=screen_states,
embeddings=embeddings_array,
labels=labels
)
logger.info(
f"Validation terminée: score={report.overall_score:.3f}, "
f"production_ready={report.is_production_ready}"
)
return report
def _create_screen_states(self, session: RawSession) -> List[ScreenState]:
"""
Créer ScreenStates enrichis depuis les screenshots de la session.
Pour chaque screenshot:
1. Trouver l'événement associé pour le contexte de fenêtre
2. Créer les 4 niveaux du ScreenState
3. Optionnellement détecter les éléments UI
Args:
session: Session brute
Returns:
Liste de ScreenStates enrichis
"""
screen_states = []
# Créer un mapping screenshot_id -> événement
screenshot_to_event = {}
for event in session.events:
if event.screenshot_id:
screenshot_to_event[event.screenshot_id] = event
# Récupérer (une seule fois) l'analyzer partagé si l'enrichissement est actif.
# Le singleton C1 garantit qu'on ne recharge pas UIDetector/CLIP inutilement.
analyzer = None
if self.enable_ui_enrichment:
analyzer = self._get_screen_analyzer()
# Cache partagé (C1) : réutiliser les analyses si même screenshot est
# repassé plusieurs fois (peu fréquent en construction, utile en tests).
try:
from core.pipeline import get_screen_state_cache
state_cache = get_screen_state_cache()
except Exception as e:
logger.debug(f"ScreenStateCache indisponible ({e}); aucun cache utilisé.")
state_cache = None
enriched_count = 0
for i, screenshot in enumerate(session.screenshots):
# Trouver l'événement associé
event = screenshot_to_event.get(screenshot.screenshot_id)
# Construire WindowContext depuis l'événement (si dispo)
screen_env = session.environment.get("screen", {})
screen_res = screen_env.get("primary_resolution", [1920, 1080])
if event and event.window:
window = WindowContext(
app_name=event.window.app_name,
window_title=event.window.title,
screen_resolution=screen_res,
workspace="main",
monitor_index=screen_env.get("monitor_index", 0),
dpi_scale=screen_env.get("dpi_scale", 100),
monitors=screen_env.get("monitors"),
os_theme=session.environment.get("os_theme", "unknown"),
os_language=session.environment.get("os_language", "unknown"),
)
else:
window = WindowContext(
app_name="unknown",
window_title="Unknown",
screen_resolution=screen_res,
workspace="main",
monitor_index=screen_env.get("monitor_index", 0),
dpi_scale=screen_env.get("dpi_scale", 100),
monitors=screen_env.get("monitors"),
os_theme=session.environment.get("os_theme", "unknown"),
os_language=session.environment.get("os_language", "unknown"),
)
# Chemin absolu du screenshot
screenshot_absolute_path = (
f"data/training/sessions/{session.session_id}/"
f"{session.session_id}/{screenshot.relative_path}"
)
screenshot_path = Path(screenshot_absolute_path)
# Timestamp
if isinstance(screenshot.captured_at, str):
timestamp = datetime.fromisoformat(
screenshot.captured_at.replace('Z', '+00:00')
)
else:
timestamp = screenshot.captured_at
# ------------------------------------------------------------
# Enrichissement visuel : déléguer au ScreenAnalyzer partagé
# ------------------------------------------------------------
# L'analyzer renvoie un ScreenState complet avec :
# - raw (image + file_size)
# - perception (OCR + embedding ref)
# - ui_elements (détection UIDetector)
# On récupère ces niveaux et on rebâtit un état final avec le
# WindowContext et les metadata issus de la session brute (les
# données "metier" que l'analyzer ignore).
# ------------------------------------------------------------
detected_text: List[str] = []
text_method = "none"
ui_elements: List = []
raw = RawLevel(
screenshot_path=str(screenshot_path),
capture_method="mss",
file_size_bytes=(
screenshot_path.stat().st_size
if screenshot_path.exists()
else 0
),
)
if analyzer is not None and screenshot_path.exists():
try:
# Construire l'info fenêtre pour donner le contexte à
# l'UIDetector (certains détecteurs s'en servent pour
# filtrer hors-fenêtre).
window_info = {
"app_name": window.app_name,
"title": window.window_title,
"screen_resolution": list(window.screen_resolution or []),
}
analyzed = analyzer.analyze(
str(screenshot_path),
window_info=window_info,
enable_ocr=True,
enable_ui_detection=True,
session_id=session.session_id,
)
detected_text = list(analyzed.perception.detected_text or [])
text_method = (
analyzed.perception.text_detection_method or "none"
)
ui_elements = list(analyzed.ui_elements or [])
# Garder les métriques OCR/UI si présentes (debug)
analyzer_metadata = dict(analyzed.metadata or {})
raw = analyzed.raw # conserver file_size réel mesuré
if ui_elements:
enriched_count += 1
except Exception as e:
logger.warning(
f"Enrichissement visuel échoué pour {screenshot_path}: {e}. "
"Fallback sur ScreenState minimal."
)
analyzer_metadata = {"analyzer_error": str(e)}
else:
analyzer_metadata = {}
if self.enable_ui_enrichment and not screenshot_path.exists():
logger.debug(
f"Screenshot introuvable: {screenshot_path} "
"— ui_elements restera vide"
)
# PerceptionLevel : vector_id calculé de façon déterministe.
perception = PerceptionLevel(
embedding=EmbeddingRef(
provider="openclip_ViT-B-32",
vector_id=(
f"data/embeddings/screens/"
f"{session.session_id}_state_{i:04d}.npy"
),
dimensions=512,
),
detected_text=detected_text,
text_detection_method=text_method,
confidence_avg=0.85 if detected_text else 0.0,
)
# ContextLevel (métier)
context = ContextLevel(
current_workflow_candidate=None,
workflow_step=i,
user_id=session.user.get("id", "unknown"),
tags=(
list(session.context.get("tags", []))
if isinstance(session.context.get("tags"), list)
else []
),
business_variables={},
)
# Metadata : on garde le lien événement/session + éventuels
# compteurs remontés par l'analyzer.
metadata = {
"screenshot_id": screenshot.screenshot_id,
"event_type": event.type if event else None,
"event_time": event.t if event else None,
}
# Propager les indicateurs utiles de l'analyzer sans écraser la base.
for key in ("ocr_ms", "ui_ms", "analyzer_error"):
if key in analyzer_metadata:
metadata[key] = analyzer_metadata[key]
state = ScreenState(
screen_state_id=f"{session.session_id}_state_{i:04d}",
timestamp=timestamp,
session_id=session.session_id,
window=window,
raw=raw,
perception=perception,
context=context,
metadata=metadata,
ui_elements=ui_elements,
)
screen_states.append(state)
logger.info(
f"Created {len(screen_states)} enriched screen states "
f"({enriched_count} avec UI détectée, "
f"ui_enrichment={self.enable_ui_enrichment})"
)
return screen_states
def _compute_embeddings(
self, screen_states: List[ScreenState]
) -> List[np.ndarray]:
"""
Calculer State Embeddings pour tous les états.
Utilise StateEmbeddingBuilder pour générer des embeddings
multi-modaux (image + texte + UI). Ajoute optionnellement
les embeddings à l'index FAISS.
Args:
screen_states: Liste de ScreenStates
Returns:
Liste de vecteurs d'embeddings (numpy arrays)
"""
embeddings = []
for state in screen_states:
# Construire embedding
state_embedding = self.embedding_builder.build(state)
vector = state_embedding.get_vector()
embeddings.append(vector)
# Ajouter à FAISS si disponible
if self.faiss_manager:
self.faiss_manager.add_embedding(
state.screen_state_id,
vector,
{"state_id": state.screen_state_id},
)
return embeddings
def _detect_patterns(
self,
embeddings: List[np.ndarray],
screen_states: List[ScreenState],
) -> Dict[int, List[int]]:
"""
Détecter patterns répétés via clustering hybride.
Algorithme:
1. Pré-clustering par titre de fenêtre (discriminant fort)
2. DBSCAN au sein de chaque groupe de titres (affine par vision)
3. Filtrer clusters trop petits
Le titre de fenêtre est le signal le plus discriminant entre écrans.
CLIP/embeddings seul ne distingue pas assez les screenshots d'apps différentes
(distance cosine max ~0.12 entre Notepad et Explorer).
Args:
embeddings: Vecteurs d'embeddings
screen_states: ScreenStates correspondants
Returns:
Dictionnaire {cluster_id: [indices des états]}
"""
if len(embeddings) < self.min_pattern_repetitions:
logger.warning(
f"Not enough states ({len(embeddings)}) for pattern detection "
f"(minimum: {self.min_pattern_repetitions})"
)
return {}
# Étape 1 : Pré-clustering par titre de fenêtre (ou app_name)
title_groups = defaultdict(list)
for idx, state in enumerate(screen_states):
# Extraire le titre de fenêtre — le meilleur discriminant
title = ""
if hasattr(state, 'window') and state.window:
title = getattr(state.window, 'window_title', '') or ''
# Normaliser : garder le nom de l'app (partie avant le tiret)
# Ex: "Sans titre - Bloc-notes" → "Bloc-notes"
# Ex: "Google - Chrome" → "Chrome"
app_key = self._normalize_window_title(title) if title else "__no_title__"
title_groups[app_key].append(idx)
n_title_groups = len(title_groups)
logger.info(
f"Window title pre-clustering: {n_title_groups} groups "
f"({', '.join(f'{k}:{len(v)}' for k, v in title_groups.items())})"
)
# Étape 2 : Si les titres distinguent bien les écrans, utiliser directement
# Si un seul groupe (titres identiques ou absents), fallback DBSCAN pur
if n_title_groups <= 1:
return self._dbscan_clustering(embeddings, screen_states)
# Étape 3 : Chaque groupe de titres devient un cluster
# Optionnellement, DBSCAN affine les groupes avec >5 screenshots
final_clusters = {}
cluster_id = 0
noise_count = 0
for title_key, indices in title_groups.items():
if len(indices) < self.min_pattern_repetitions:
noise_count += len(indices)
continue
if len(indices) > 5:
# Sous-clustering DBSCAN pour les gros groupes
sub_embeddings = [embeddings[i] for i in indices]
sub_X = np.array(sub_embeddings)
sub_clustering = DBSCAN(
eps=self.clustering_eps,
min_samples=self.clustering_min_samples,
metric="cosine",
)
sub_labels = sub_clustering.fit_predict(sub_X)
sub_clusters = defaultdict(list)
for local_idx, label in enumerate(sub_labels):
if label == -1:
noise_count += 1
else:
sub_clusters[label].append(indices[local_idx])
for sub_indices in sub_clusters.values():
if len(sub_indices) >= self.min_pattern_repetitions:
final_clusters[cluster_id] = sub_indices
cluster_id += 1
else:
# Petit groupe → 1 cluster direct
final_clusters[cluster_id] = indices
cluster_id += 1
logger.info(
f"Hybrid clustering results: {len(final_clusters)} patterns "
f"(from {n_title_groups} title groups), {noise_count} noise points"
)
return final_clusters
def _normalize_window_title(self, title: str) -> str:
"""Normaliser un titre de fenêtre pour le clustering.
Extrait le nom de l'application (partie significative) en ignorant
le contenu variable (nom de fichier, URL, etc.).
Ex: "document.txt - Bloc-notes""Bloc-notes"
Ex: "Google - Chrome""Chrome"
Ex: "C:\\Users\\... - Explorateur""Explorateur"
"""
if not title:
return "__no_title__"
# Séparateurs courants dans les titres Windows
for sep in [" - ", "", " ", " | "]:
if sep in title:
parts = title.split(sep)
# Le nom de l'app est généralement le dernier segment
return parts[-1].strip()
return title.strip()
def _dbscan_clustering(
self,
embeddings: List[np.ndarray],
screen_states: List[ScreenState],
) -> Dict[int, List[int]]:
"""Clustering DBSCAN pur (fallback quand les titres ne discriminent pas)."""
X = np.array(embeddings)
clustering = DBSCAN(
eps=self.clustering_eps,
min_samples=self.clustering_min_samples,
metric="cosine",
)
labels = clustering.fit_predict(X)
clusters = defaultdict(list)
noise_count = 0
for idx, label in enumerate(labels):
if label == -1:
noise_count += 1
else:
clusters[label].append(idx)
filtered_clusters = {
cluster_id: indices
for cluster_id, indices in clusters.items()
if len(indices) >= self.min_pattern_repetitions
}
logger.info(
f"DBSCAN clustering results: {len(filtered_clusters)} patterns, "
f"{noise_count} noise points, "
f"{len(clusters) - len(filtered_clusters)} small clusters filtered"
)
return filtered_clusters
def _build_nodes(
self,
clusters: Dict[int, List[int]],
screen_states: List[ScreenState],
embeddings: List[np.ndarray],
) -> List[WorkflowNode]:
"""
Construire WorkflowNodes depuis les clusters détectés.
Pour chaque cluster:
1. Calculer embedding prototype (moyenne normalisée)
2. Extraire contraintes depuis états du cluster
3. Créer ScreenTemplate
4. Créer WorkflowNode
Args:
clusters: Clusters détectés {cluster_id: [indices]}
screen_states: ScreenStates
embeddings: Embeddings
Returns:
Liste de WorkflowNodes
"""
nodes = []
for cluster_id, indices in clusters.items():
# Calculer embedding prototype (moyenne)
cluster_embeddings = [embeddings[i] for i in indices]
prototype = np.mean(cluster_embeddings, axis=0)
prototype = prototype / np.linalg.norm(prototype) # Normaliser
# Extraire contraintes depuis les états du cluster
cluster_states = [screen_states[i] for i in indices]
template = self._create_screen_template(cluster_states, prototype)
# Créer node
node = WorkflowNode(
node_id=f"node_{cluster_id:03d}",
name=f"State Pattern {cluster_id}",
description=f"Pattern auto-détecté ({len(indices)} observations)",
template=template,
metadata={
"observation_count": len(indices),
"_prototype_vector": prototype.tolist(),
},
)
nodes.append(node)
logger.debug(
f"Created node {node.node_id} with {len(indices)} observations"
)
return nodes
def _create_screen_template(
self,
states: List[ScreenState],
prototype_embedding: np.ndarray,
) -> ScreenTemplate:
"""
Créer un ScreenTemplate depuis un cluster d'états.
Extrait les contraintes communes à tous les états du cluster :
- window_title_pattern : titre de fenêtre commun
- required_text_patterns : textes présents dans la majorité des états
- required_ui_elements : rôles/types UI récurrents
Args:
states: États du cluster
prototype_embedding: Embedding prototype
Returns:
ScreenTemplate avec contraintes extraites
"""
# --- Extraction du titre de fenêtre commun ---
window_title_pattern = self._extract_window_pattern(states)
# --- Extraction des textes récurrents ---
required_text_patterns = self._extract_common_texts(states)
# --- Extraction des éléments UI récurrents ---
required_ui_elements = self._extract_common_ui_elements(states)
# Construire les sous-objets de contraintes
window_constraint = WindowConstraint(
title_pattern=window_title_pattern,
title_contains=window_title_pattern,
)
text_constraint = TextConstraint(
required_texts=required_text_patterns,
)
ui_roles = [
e.get("role", "") for e in required_ui_elements if e.get("role")
]
ui_constraint = UIConstraint(
required_roles=ui_roles,
)
embedding_proto = EmbeddingPrototype(
provider="openclip_ViT-B-32",
vector_id="", # Le vecteur est stocké dans node.metadata._prototype_vector
min_cosine_similarity=0.85,
sample_count=len(states),
)
return ScreenTemplate(
window=window_constraint,
text=text_constraint,
ui=ui_constraint,
embedding=embedding_proto,
)
def _extract_window_pattern(self, states: List[ScreenState]) -> Optional[str]:
"""Extraire un pattern de titre de fenêtre commun aux états du cluster."""
titles = [s.window.window_title for s in states if s.window.window_title]
if not titles:
return None
# Si tous les titres sont identiques, retourner directement
if len(set(titles)) == 1:
return titles[0]
# Trouver le préfixe commun le plus long
prefix = os.path.commonprefix(titles)
if len(prefix) >= 5:
return prefix.rstrip(" -–—|")
# Fallback: le titre le plus fréquent
from collections import Counter
most_common = Counter(titles).most_common(1)[0][0]
return most_common
def _extract_common_texts(
self, states: List[ScreenState], min_presence_ratio: float = 0.6
) -> List[str]:
"""
Extraire les textes présents dans la majorité des états du cluster.
Args:
states: États du cluster
min_presence_ratio: Proportion minimale de présence (0.6 = 60% des états)
"""
if not states:
return []
# Collecter les textes de chaque état
text_counts: Dict[str, int] = defaultdict(int)
states_with_text = 0
for state in states:
if hasattr(state.perception, 'detected_text') and state.perception.detected_text:
states_with_text += 1
seen_in_state = set()
for text in state.perception.detected_text:
normalized = text.strip().lower()
if len(normalized) >= 3 and normalized not in seen_in_state:
text_counts[normalized] += 1
seen_in_state.add(normalized)
if states_with_text == 0:
return []
# Garder les textes présents dans au moins min_presence_ratio des états
threshold = max(2, int(states_with_text * min_presence_ratio))
common_texts = [
text for text, count in text_counts.items()
if count >= threshold
]
# Limiter à 10 textes les plus fréquents
common_texts.sort(key=lambda t: text_counts[t], reverse=True)
return common_texts[:10]
def _extract_common_ui_elements(
self, states: List[ScreenState], min_presence_ratio: float = 0.5
) -> List[Dict[str, Any]]:
"""
Extraire les types/rôles d'éléments UI récurrents dans le cluster.
Retourne une liste de contraintes UI au format:
[{"type": "button", "role": "validate", "min_count": 1}, ...]
"""
if not states:
return []
# Compter les paires (type, role) dans chaque état
role_counts: Dict[str, int] = defaultdict(int)
type_counts: Dict[str, int] = defaultdict(int)
states_with_ui = 0
for state in states:
if state.ui_elements:
states_with_ui += 1
seen_roles = set()
seen_types = set()
for el in state.ui_elements:
el_type = getattr(el, 'type', 'unknown')
el_role = getattr(el, 'role', 'unknown')
if el_role != 'unknown' and el_role not in seen_roles:
role_counts[el_role] += 1
seen_roles.add(el_role)
if el_type != 'unknown' and el_type not in seen_types:
type_counts[el_type] += 1
seen_types.add(el_type)
if states_with_ui == 0:
return []
threshold = max(2, int(states_with_ui * min_presence_ratio))
constraints = []
# Ajouter les rôles récurrents
for role, count in role_counts.items():
if count >= threshold:
constraints.append({
"role": role,
"min_count": 1,
})
# Limiter à 8 contraintes
constraints.sort(key=lambda c: role_counts.get(c.get("role", ""), 0), reverse=True)
return constraints[:8]
# ------------------------------------------------------------------
# Association spatiale clic → UIElement
# ------------------------------------------------------------------
def _find_clicked_element(
self,
event: Event,
ui_elements: List[Any],
) -> Optional[Any]:
"""
Identifier l'UIElement cible d'un clic par proximité spatiale.
Règle :
1. Si un bbox contient strictement la position du clic → match.
2. Sinon, on prend le bbox le plus proche (distance euclidienne
au bord) sous réserve qu'il soit à <= `element_proximity_max_px`.
3. Sinon, aucun ancrage possible → None.
Cette association transforme un clic "aveugle" (coordonnées brutes)
en un clic "intelligent" (rôle + label), permettant au matcher de
retrouver l'élément même si la résolution ou la position change.
Args:
event: Événement `mouse_click` (avec `data["pos"] = [x, y]`).
ui_elements: Liste des UIElement détectés sur l'écran source.
Returns:
UIElement le plus pertinent, ou None si rien ne correspond.
"""
if not ui_elements:
return None
if not event or event.type != "mouse_click":
return None
pos = event.data.get("pos") if event.data else None
if not pos or len(pos) < 2:
return None
try:
click_x = float(pos[0])
click_y = float(pos[1])
except (TypeError, ValueError):
return None
best_contained = None
best_contained_area = None
best_near = None
best_near_distance = None
for element in ui_elements:
bbox = getattr(element, "bbox", None)
if bbox is None:
continue
# Extraction défensive des coordonnées (BBox Pydantic ou tuple)
try:
bx = int(getattr(bbox, "x", bbox[0]))
by = int(getattr(bbox, "y", bbox[1]))
bw = int(getattr(bbox, "width", bbox[2]))
bh = int(getattr(bbox, "height", bbox[3]))
except (AttributeError, IndexError, TypeError):
continue
# Cas 1 : la position est strictement dans le bbox.
if bx <= click_x <= bx + bw and by <= click_y <= by + bh:
# Sélectionner le plus petit bbox qui contient (élément le plus spécifique)
area = max(1, bw * bh)
if best_contained is None or area < best_contained_area:
best_contained = element
best_contained_area = area
continue
# Cas 2 : calculer la distance au bord le plus proche.
dx = max(bx - click_x, 0, click_x - (bx + bw))
dy = max(by - click_y, 0, click_y - (by + bh))
distance = (dx * dx + dy * dy) ** 0.5
if best_near is None or distance < best_near_distance:
best_near = element
best_near_distance = distance
if best_contained is not None:
return best_contained
if (
best_near is not None
and best_near_distance is not None
and best_near_distance <= self.element_proximity_max_px
):
return best_near
return None
# Patterns d'erreur courants pour la détection fail_fast
_ERROR_PATTERNS = [
"erreur", "error", "échec", "failed", "impossible",
"accès refusé", "access denied", "not found",
"timeout", "connexion perdue", "session expirée",
]
def _build_edges(
self,
nodes: List[WorkflowNode],
screen_states: List[ScreenState],
session: RawSession,
embeddings: Optional[List[np.ndarray]] = None,
sequential: bool = False,
) -> List[WorkflowEdge]:
"""
Construire WorkflowEdges depuis les transitions observées.
Algorithme:
1. Mapper chaque ScreenState vers son node (via embedding similarity)
En mode séquentiel, le mapping est direct (state i → node i).
2. Identifier les transitions (state_i -> state_j où node change)
3. Extraire l'action depuis l'événement entre les deux états
4. Créer WorkflowEdge avec action, pré-conditions et post-conditions
Les EdgeConstraints sont peuplées depuis le ScreenTemplate du node source :
- required_window_title, required_app_name, min_source_similarity
- required_texts (textes OCR fréquents dans le cluster source)
Les PostConditions sont peuplées depuis le ScreenTemplate du node cible :
- expected_window_title, expected_app_name, min_target_similarity
- expected_texts + forbidden_texts (patterns d'erreur courants)
Args:
nodes: WorkflowNodes construits
screen_states: ScreenStates
session: Session brute (pour événements)
embeddings: Embeddings pré-calculés (évite un recalcul dans _map_states_to_nodes)
sequential: Mode séquentiel — chaque paire consécutive = transition
Returns:
Liste de WorkflowEdges
"""
if not nodes or len(screen_states) < 2:
logger.warning("Not enough data to build edges")
return []
edges = []
edge_counts = defaultdict(int) # Pour compter les occurrences de chaque transition
# Index des nodes par ID pour accès rapide
node_by_id = {node.node_id: node for node in nodes}
# Étape 1: Mapper chaque état vers son node
if sequential:
# Mode séquentiel : mapping direct state[i] → node[i]
state_to_node = {}
for i, state in enumerate(screen_states):
if i < len(nodes):
state_to_node[state.screen_state_id] = nodes[i].node_id
logger.debug(
f"Mode séquentiel: {len(state_to_node)} states mappés directement"
)
else:
state_to_node = self._map_states_to_nodes(
screen_states, nodes, embeddings=embeddings
)
# Étape 2: Récupérer la résolution d'écran pour normaliser les coordonnées
screen_env = session.environment.get("screen", {})
screen_resolution = tuple(screen_env.get("primary_resolution", [1920, 1080]))
# Étape 3: Parcourir les transitions
for i in range(len(screen_states) - 1):
current_state = screen_states[i]
next_state = screen_states[i + 1]
current_node_id = state_to_node.get(current_state.screen_state_id)
next_node_id = state_to_node.get(next_state.screen_state_id)
# En mode séquentiel, chaque paire consécutive est une transition
# En mode clustering, uniquement si les nodes sont différents
if current_node_id and next_node_id and (
sequential or current_node_id != next_node_id
):
# Trouver TOUS les événements entre les deux états
transition_events = self._find_transition_events(
current_state, next_state, session.events
)
# Créer l'edge
edge_key = f"{current_node_id}_to_{next_node_id}"
edge_counts[edge_key] += 1
# Ne créer l'edge qu'une fois, mais compter les occurrences
if edge_counts[edge_key] == 1:
source_node = node_by_id.get(current_node_id)
target_node = node_by_id.get(next_node_id)
edge = self._create_edge(
current_node_id, next_node_id,
transition_events[-1] if transition_events else None,
edge_key,
source_node=source_node,
target_node=target_node,
all_events=transition_events,
screen_resolution=screen_resolution,
source_state=current_state,
)
edges.append(edge)
# Mettre à jour les stats des edges avec les comptages
for edge in edges:
edge_key = f"{edge.from_node}_to_{edge.to_node}"
edge.stats.execution_count = edge_counts[edge_key]
edge.stats.success_count = edge_counts[edge_key]
logger.info(f"Built {len(edges)} edges from {sum(edge_counts.values())} transitions")
return edges
def _map_states_to_nodes(
self,
screen_states: List[ScreenState],
nodes: List[WorkflowNode],
embeddings: Optional[List[np.ndarray]] = None,
) -> Dict[str, str]:
"""
Mapper chaque ScreenState vers le node le plus proche.
Utilise la similarité d'embedding pour trouver le meilleur match.
Args:
screen_states: Liste de ScreenStates à mapper.
nodes: WorkflowNodes cibles.
embeddings: Embeddings pré-calculés (même ordre que screen_states).
Si fourni, évite de recalculer via self.embedding_builder.build().
"""
state_to_node = {}
# Récupérer les embeddings des prototypes de nodes
node_prototypes = {}
for node in nodes:
# Priorité : vecteur en mémoire (metadata), sinon chargement depuis disque
proto_list = node.metadata.get("_prototype_vector")
if proto_list is not None:
node_prototypes[node.node_id] = np.array(proto_list, dtype=np.float32)
elif node.template and node.template.embedding and node.template.embedding.vector_id:
proto_path = Path(node.template.embedding.vector_id)
if proto_path.exists():
node_prototypes[node.node_id] = np.load(proto_path)
if not node_prototypes:
logger.warning("No node prototypes available for mapping")
return state_to_node
# Vérifier si les embeddings pré-calculés sont utilisables
use_precomputed = (
embeddings is not None and len(embeddings) == len(screen_states)
)
if use_precomputed:
logger.debug("_map_states_to_nodes: using precomputed embeddings")
# Pour chaque état, trouver le node le plus proche
for i, state in enumerate(screen_states):
try:
# Utiliser l'embedding pré-calculé ou recalculer
if use_precomputed:
state_vector = np.asarray(embeddings[i], dtype=np.float32)
else:
state_embedding = self.embedding_builder.build(state)
state_vector = state_embedding.get_vector()
# Trouver le node avec la meilleure similarité
best_node_id = None
best_similarity = -1
for node_id, prototype in node_prototypes.items():
similarity = np.dot(state_vector, prototype)
if similarity > best_similarity:
best_similarity = similarity
best_node_id = node_id
if best_node_id and best_similarity > 0.7: # Seuil minimum
state_to_node[state.screen_state_id] = best_node_id
except Exception as e:
logger.warning(f"Failed to map state {state.screen_state_id}: {e}")
return state_to_node
def _get_state_time(self, state: ScreenState, fallback: float = 0) -> float:
"""Extraire le timestamp d'un ScreenState.
Priorité :
1. metadata['event_time'] (set par _create_screen_states)
2. metadata['shot_timestamp'] (set par le reprocessing)
3. state.timestamp converti en epoch si c'est un datetime
4. fallback
Note : event_time peut être 0.0 (timestamps relatifs), donc on
vérifie `is not None` et non `> 0`.
"""
if state.metadata:
et = state.metadata.get("event_time")
if et is not None:
return float(et)
st = state.metadata.get("shot_timestamp")
if st is not None:
return float(st)
if state.timestamp:
try:
return state.timestamp.timestamp()
except (AttributeError, OSError):
pass
return fallback
def _find_transition_events(
self,
current_state: ScreenState,
next_state: ScreenState,
events: List[Event],
) -> List[Event]:
"""
Trouver TOUS les événements entre deux états (pas juste le dernier).
Collecte toutes les actions utilisateur (clics, frappes, saisie texte)
qui se sont produites entre les timestamps des deux ScreenStates.
C'est essentiel pour le replay : une transition peut nécessiter
plusieurs actions (ex: Win+R → taper "notepad" → Entrée).
Timestamps : utilise _get_state_time() qui supporte plusieurs
sources (event_time, shot_timestamp, datetime).
Args:
current_state: État source
next_state: État cible
events: Tous les événements de la session
Returns:
Liste ordonnée (par timestamp) de tous les événements d'action
entre les deux états. Peut être vide.
"""
current_time = self._get_state_time(current_state, fallback=0)
next_time = self._get_state_time(next_state, fallback=float('inf'))
action_events = []
for event in events:
if current_time <= event.t < next_time:
if event.type in ["mouse_click", "key_press", "text_input"]:
# Filtrer les événements parasites (modificateurs seuls, texte vide)
if not _is_parasitic_event(event):
action_events.append(event)
return action_events
def _find_transition_event(
self,
current_state: ScreenState,
next_state: ScreenState,
events: List[Event]
) -> Optional[Event]:
"""
Trouver l'événement qui a causé la transition entre deux états.
DEPRECATED: Utiliser _find_transition_events() pour obtenir TOUS les événements.
Conservé pour rétrocompatibilité.
"""
all_events = self._find_transition_events(current_state, next_state, events)
return all_events[-1] if all_events else None
def _create_edge(
self,
from_node: str,
to_node: str,
event: Optional[Event],
edge_id: str,
source_node: Optional[WorkflowNode] = None,
target_node: Optional[WorkflowNode] = None,
all_events: Optional[List[Event]] = None,
screen_resolution: Tuple[int, int] = (1920, 1080),
source_state: Optional[ScreenState] = None,
) -> WorkflowEdge:
"""
Créer un WorkflowEdge depuis une transition observée.
Peuple les EdgeConstraints depuis le template du node source et
les PostConditions depuis le template du node cible.
Si plusieurs événements sont associés à la transition (all_events),
crée une action "compound" contenant toutes les étapes (steps),
ce qui permet au replay de reproduire fidèlement la séquence complète
(ex: Win+R → taper "notepad" → Entrée).
Args:
from_node: ID du node source
to_node: ID du node cible
event: Dernier événement de la transition (rétrocompatibilité)
edge_id: Identifiant de l'edge
source_node: Node source (pour extraire les pré-conditions)
target_node: Node cible (pour extraire les post-conditions)
all_events: TOUS les événements entre les deux états (si disponible)
screen_resolution: Résolution de référence pour normaliser les coordonnées
"""
# Si on a plusieurs événements, créer une action compound
events_to_use = all_events or ([event] if event else [])
# UIElements de l'écran source — sert à ancrer les clics sur un vrai
# élément UI (rôle, texte, bbox) plutôt que sur une coordonnée brute.
source_ui_elements = (
list(source_state.ui_elements)
if source_state and source_state.ui_elements
else []
)
if len(events_to_use) > 1:
action = self._build_compound_action(
events_to_use, screen_resolution,
source_ui_elements=source_ui_elements,
)
elif len(events_to_use) == 1:
action = self._build_single_action(
events_to_use[0],
source_ui_elements=source_ui_elements,
)
else:
action = Action(
type="unknown",
target=TargetSpec(
by_role="unknown",
selection_policy="first",
fallback_strategy="visual_similarity",
),
parameters={},
)
# ---------------------------------------------------------------
# Pré-conditions (EdgeConstraints) depuis le template du node source
# ---------------------------------------------------------------
constraints = self._build_edge_constraints(source_node)
# ---------------------------------------------------------------
# Post-conditions depuis le template du node cible
# ---------------------------------------------------------------
post_conditions = self._build_post_conditions(
to_node, source_node, target_node
)
# Créer l'edge
from core.models.workflow_graph import EdgeStats
# Metadata enrichie pour le diagnostic
edge_metadata = {
"auto_generated": True,
"event_count": len(events_to_use),
}
if events_to_use:
edge_metadata["created_from_event"] = events_to_use[-1].type
if len(events_to_use) > 1:
edge_metadata["event_types"] = [e.type for e in events_to_use]
else:
edge_metadata["created_from_event"] = None
return WorkflowEdge(
edge_id=edge_id,
from_node=from_node,
to_node=to_node,
action=action,
constraints=constraints,
post_conditions=post_conditions,
stats=EdgeStats(),
metadata=edge_metadata,
)
def _build_single_action(
self,
event: Event,
source_ui_elements: Optional[List[Any]] = None,
) -> Action:
"""
Construire une Action simple depuis un seul événement.
Pour un clic, si `source_ui_elements` est fourni, on tente d'ancrer
l'action sur l'UIElement le plus proche (par proximité spatiale).
Le TargetSpec devient alors discriminant :
- `by_role` = rôle sémantique de l'élément (ex: "primary_action")
- `by_text` = label détecté (ex: "Valider")
- `selection_policy` = "by_similarity" (laisse le matcher scorer)
- `context_hints["anchor_element_id"]` = traçabilité
- `context_hints["anchor_bbox"]` = invariant spatial debug
À défaut d'ancrage (pas d'UIElement ou clic hors de toute bbox
proche), on retombe sur `by_role="unknown_element"` (legacy).
"""
action_type = event.type
action_params: Dict[str, Any] = {}
target_spec: Optional[TargetSpec] = None
if action_type == "mouse_click":
action_params = {
"button": event.data.get("button", "left"),
"position": event.data.get("pos", [0, 0]),
"wait_after_ms": 500,
}
target_spec = self._build_click_target_spec(
event, source_ui_elements or []
)
elif action_type == "key_press":
action_params = {
"keys": event.data.get("keys", []),
"wait_after_ms": 200,
}
target_spec = TargetSpec(
by_role="keyboard_input",
selection_policy="first",
fallback_strategy="visual_similarity",
)
elif action_type == "text_input":
action_params = {
"text": event.data.get("text", ""),
"wait_after_ms": 300,
}
target_spec = TargetSpec(
by_role="text_field",
selection_policy="first",
fallback_strategy="visual_similarity",
)
else:
action_params = {}
target_spec = TargetSpec(
by_role="unknown",
selection_policy="first",
fallback_strategy="visual_similarity",
)
return Action(
type=action_type,
target=target_spec,
parameters=action_params,
)
def _build_click_target_spec(
self,
event: Event,
source_ui_elements: List[Any],
) -> TargetSpec:
"""
Construire un TargetSpec pour un clic, en essayant de l'ancrer à
un UIElement détecté sur l'écran source.
Retourne toujours un TargetSpec valide :
- ancré (role + text + context_hints) si un élément proche existe ;
- fallback `unknown_element` sinon (comportement historique).
"""
clicked = self._find_clicked_element(event, source_ui_elements)
if clicked is None:
return TargetSpec(
by_role="unknown_element",
selection_policy="first",
fallback_strategy="visual_similarity",
)
# Extraction défensive des attributs de l'élément.
role = getattr(clicked, "role", None) or "unknown_element"
label = getattr(clicked, "label", None) or None
element_id = getattr(clicked, "element_id", None)
# Contexte de traçabilité — `context_hints` est le seul dict libre
# disponible dans TargetSpec (pas de champ `metadata` dédié).
context_hints: Dict[str, Any] = {}
if element_id:
context_hints["anchor_element_id"] = str(element_id)
bbox = getattr(clicked, "bbox", None)
if bbox is not None:
try:
context_hints["anchor_bbox"] = {
"x": int(getattr(bbox, "x", bbox[0])),
"y": int(getattr(bbox, "y", bbox[1])),
"width": int(getattr(bbox, "width", bbox[2])),
"height": int(getattr(bbox, "height", bbox[3])),
}
except (AttributeError, IndexError, TypeError):
pass
# Center (utile comme ancre de fallback quand le matcher échoue)
center = getattr(clicked, "center", None)
if center is not None:
try:
context_hints["anchor_center"] = [int(center[0]), int(center[1])]
except (IndexError, TypeError):
pass
return TargetSpec(
by_role=role,
by_text=label,
selection_policy="by_similarity",
fallback_strategy="visual_similarity",
context_hints=context_hints,
)
def _build_compound_action(
self,
events: List[Event],
screen_resolution: Tuple[int, int] = (1920, 1080),
source_ui_elements: Optional[List[Any]] = None,
) -> Action:
"""
Construire une Action compound (multi-étapes) depuis plusieurs événements.
Quand l'utilisateur fait plusieurs actions entre deux changements d'écran
(ex: Win+R → taper "notepad" → Entrée), on les regroupe en une seule
action compound avec une liste de steps ordonnés.
Des waits sont insérés automatiquement entre les étapes quand le délai
réel dépasse 500ms (plafonné à 5000ms).
Args:
events: Liste ordonnée d'événements (>= 2)
screen_resolution: Résolution de référence (largeur, hauteur)
Returns:
Action de type "compound" avec steps dans parameters
"""
steps = []
ref_w, ref_h = screen_resolution
for i, event in enumerate(events):
# Insérer un wait si le délai entre deux événements est > 500ms
if i > 0:
delta_ms = int((event.t - events[i - 1].t) * 1000)
if delta_ms > 500:
steps.append({
"type": "wait",
"duration_ms": min(delta_ms, 5000),
})
if event.type == "mouse_click":
pos = event.data.get("pos", [0, 0])
steps.append({
"type": "mouse_click",
"position": list(pos),
"x_pct": round(pos[0] / ref_w, 6) if ref_w > 0 else 0.0,
"y_pct": round(pos[1] / ref_h, 6) if ref_h > 0 else 0.0,
"button": event.data.get("button", "left"),
"ref_width": ref_w,
"ref_height": ref_h,
})
elif event.type == "text_input":
steps.append({
"type": "text_input",
"text": event.data.get("text", ""),
})
elif event.type == "key_press":
keys = event.data.get("keys", [])
if keys:
steps.append({
"type": "key_press",
"keys": keys,
})
else:
# Type inconnu : stocker quand même pour ne rien perdre
steps.append({
"type": event.type,
"data": dict(event.data) if event.data else {},
})
# ---------------------------------------------------------------
# Nettoyage des steps parasites :
# 1. Supprimer les modificateurs seuls (ctrl, alt, shift, etc.)
# 2. Fusionner les text_input consécutifs (ex: "No","m",":"," " → "Nom: ")
# 3. Dédupliquer les key_combo consécutifs identiques
# 4. Garantir un wait minimum de 300ms entre chaque step
# ---------------------------------------------------------------
steps = _filter_modifier_only_steps(steps)
steps = _merge_consecutive_text_steps(steps)
steps = _dedup_consecutive_combos(steps)
steps = _ensure_min_waits(steps)
# La cible du compound = cible de la dernière action (le clic final, etc.)
last_event = events[-1]
if last_event.type == "mouse_click":
# On tente d'ancrer le clic final aux UIElements détectés,
# comme dans _build_single_action.
target_spec = self._build_click_target_spec(
last_event, source_ui_elements or []
)
elif last_event.type == "text_input":
target_spec = TargetSpec(
by_role="text_field",
selection_policy="first",
fallback_strategy="visual_similarity",
)
elif last_event.type == "key_press":
target_spec = TargetSpec(
by_role="keyboard_input",
selection_policy="first",
fallback_strategy="visual_similarity",
)
else:
target_spec = TargetSpec(
by_role="unknown",
selection_policy="first",
fallback_strategy="visual_similarity",
)
return Action(
type="compound",
target=target_spec,
parameters={
"steps": steps,
"step_count": len(steps),
"ref_width": ref_w,
"ref_height": ref_h,
},
)
def _build_edge_constraints(
self,
source_node: Optional[WorkflowNode],
) -> EdgeConstraints:
"""
Construire les EdgeConstraints (pré-conditions) depuis le node source.
Extrait du ScreenTemplate du node source :
- required_window_title : titre de fenêtre attendu
- required_app_name : nom de l'application
- min_source_similarity : seuil de similarité embedding
- window / text : contraintes WindowConstraint et TextConstraint
"""
if not source_node or not source_node.template:
return EdgeConstraints(
pre_conditions={},
required_confidence=0.8,
max_wait_time_ms=5000,
)
tpl = source_node.template
# Extraire le titre de fenêtre depuis le WindowConstraint
req_title = None
req_app = None
window_constraint = None
if tpl.window:
req_title = tpl.window.title_contains or tpl.window.title_pattern
req_app = tpl.window.process_name
# Copier la contrainte de fenêtre complète
window_constraint = WindowConstraint(
title_pattern=tpl.window.title_pattern,
title_contains=tpl.window.title_contains,
process_name=tpl.window.process_name,
)
# Extraire la contrainte de texte (required_texts du template)
text_constraint = None
if tpl.text and tpl.text.required_texts:
# Limiter à 5 textes les plus significatifs (>= 3 car, <= 100 car)
filtered = [
t for t in tpl.text.required_texts
if 3 <= len(t) <= 100
][:5]
if filtered:
text_constraint = TextConstraint(
required_texts=filtered,
)
# Seuil de similarité depuis l'embedding prototype
min_sim = 0.80
if tpl.embedding and tpl.embedding.min_cosine_similarity:
min_sim = tpl.embedding.min_cosine_similarity
return EdgeConstraints(
pre_conditions={},
required_confidence=min_sim,
max_wait_time_ms=5000,
window=window_constraint,
text=text_constraint,
min_source_similarity=min_sim,
required_app_name=req_app,
required_window_title=req_title,
)
def _build_post_conditions(
self,
to_node_id: str,
source_node: Optional[WorkflowNode],
target_node: Optional[WorkflowNode],
) -> PostConditions:
"""
Construire les PostConditions depuis le node cible.
Peuple :
- expected_window_title / expected_app_name depuis le template cible
- success checks : textes attendus (required_texts du template cible)
+ window_title_contains si le titre change
- fail_fast checks : patterns d'erreur courants (forbidden_texts)
- min_target_similarity depuis l'embedding prototype cible
"""
if not target_node or not target_node.template:
return PostConditions(
expected_node=to_node_id,
window_change_expected=False,
new_ui_elements_expected=[],
timeout_ms=10000,
)
tpl = target_node.template
# Extraire les infos du node cible
expected_title = None
expected_app = None
if tpl.window:
expected_title = tpl.window.title_contains or tpl.window.title_pattern
expected_app = tpl.window.process_name
# Seuil de similarité cible
min_sim = 0.80
if tpl.embedding and tpl.embedding.min_cosine_similarity:
min_sim = tpl.embedding.min_cosine_similarity
# Détecter si le titre de fenêtre change entre source et cible
window_change = False
if source_node and source_node.template and source_node.template.window:
src_title = source_node.template.window.title_contains or source_node.template.window.title_pattern
if src_title and expected_title and src_title != expected_title:
window_change = True
# --- Construire les checks de succès ---
success_checks = []
# Check sur le titre de fenêtre cible (si défini)
if expected_title:
success_checks.append(
PostConditionCheck(kind="window_title_contains", value=expected_title)
)
# Checks sur les textes attendus dans l'écran cible
if tpl.text and tpl.text.required_texts:
for text in tpl.text.required_texts[:5]:
if 3 <= len(text) <= 100:
success_checks.append(
PostConditionCheck(kind="text_present", value=text)
)
# --- Construire les checks fail_fast (détection d'erreurs) ---
fail_fast_checks = []
for pattern in self._ERROR_PATTERNS:
fail_fast_checks.append(
PostConditionCheck(kind="text_present", value=pattern)
)
return PostConditions(
success_mode="all",
timeout_ms=10000,
poll_ms=200,
success=success_checks,
fail_fast=fail_fast_checks,
retries=2,
backoff_ms=150,
expected_window_title=expected_title,
expected_app_name=expected_app,
min_target_similarity=min_sim,
expected_node=to_node_id,
window_change_expected=window_change,
new_ui_elements_expected=[],
)
def main():
"""Point d'entrée pour tests manuels."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
builder = GraphBuilder(min_pattern_repetitions=3)
logger.info(f"GraphBuilder initialized: {builder}")
logger.info("Ready to build workflows from sessions")
if __name__ == "__main__":
main()