""" GraphBuilder - Construction Automatique de Workflow Graphs Ce module implémente la construction automatique de graphes de workflows en analysant les sessions enregistrées et en détectant les patterns répétés. Architecture: 1. Création de ScreenStates depuis RawSession 2. Calcul de State Embeddings pour tous les états 3. Détection de patterns via clustering DBSCAN 4. Construction de WorkflowNodes depuis clusters 5. Construction de WorkflowEdges depuis transitions Algorithme de Détection de Patterns: - Utilise DBSCAN (Density-Based Spatial Clustering of Applications with Noise) - Métrique: similarité cosinus entre embeddings - Filtre les clusters avec moins de N répétitions - Calcule un prototype (moyenne) pour chaque cluster Example: >>> builder = GraphBuilder(min_pattern_repetitions=3) >>> workflow = builder.build_from_session(raw_session) >>> print(f"Workflow with {len(workflow.nodes)} nodes") """ import logging import os from typing import List, Dict, Optional, Tuple, Any from collections import defaultdict, Counter from datetime import datetime from pathlib import Path import numpy as np from sklearn.cluster import DBSCAN from core.models.raw_session import RawSession, Event from core.models.screen_state import ( ScreenState, WindowContext, RawLevel, PerceptionLevel, ContextLevel, EmbeddingRef ) from core.models.workflow_graph import ( Workflow, WorkflowNode, WorkflowEdge, ScreenTemplate, Action, TargetSpec, EdgeConstraints, PostConditions, PostConditionCheck, WindowConstraint, TextConstraint, UIConstraint, EmbeddingPrototype, ) from core.embedding.state_embedding_builder import StateEmbeddingBuilder from core.embedding.faiss_manager import FAISSManager from core.training.quality_validator import TrainingQualityValidator, QualityReport logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Filtrage des événements parasites (modificateurs seuls, etc.) # Appliqué dans _find_transition_events et _build_compound_action. # --------------------------------------------------------------------------- _MODIFIER_ONLY_KEYS = { "ctrl", "ctrl_l", "ctrl_r", "alt", "alt_l", "alt_r", "shift", "shift_l", "shift_r", "win", "cmd", "cmd_l", "cmd_r", "meta", "super", "super_l", "super_r", } def _is_modifier_only(keys: list) -> bool: """Retourne True si la liste de touches ne contient que des modificateurs.""" if not keys: return True return all(k.lower() in _MODIFIER_ONLY_KEYS for k in keys) def _is_parasitic_event(event: Event) -> bool: """Retourne True si l'événement est parasite (modificateur seul, texte vide). Filtrage appliqué aux événements de session avant construction des edges. """ if event.type == "key_press": keys = event.data.get("keys", []) if not keys or _is_modifier_only(keys): return True elif event.type == "text_input": text = event.data.get("text", "") if not text: return True return False def _merge_consecutive_text_steps(steps: list) -> list: """Fusionne les text_input consécutifs en un seul.""" merged = [] for step in steps: if (step.get("type") in ("text_input", "type") and merged and merged[-1].get("type") in ("text_input", "type")): merged[-1]["text"] = merged[-1].get("text", "") + step.get("text", "") else: merged.append(dict(step)) return merged def _dedup_consecutive_combos(steps: list) -> list: """Supprime les key_combo/key_press dupliqués consécutifs.""" deduped = [] for step in steps: if (step.get("type") in ("key_combo", "key_press") and deduped and deduped[-1].get("type") in ("key_combo", "key_press") and deduped[-1].get("keys") == step.get("keys")): continue deduped.append(step) return deduped def _filter_modifier_only_steps(steps: list) -> list: """Supprime les steps key_combo/key_press avec uniquement des modificateurs.""" return [ s for s in steps if not ( s.get("type") in ("key_combo", "key_press") and _is_modifier_only(s.get("keys", [])) ) ] def _ensure_min_waits(steps: list, min_wait_ms: int = 300) -> list: """Ajoute un wait entre les steps si aucun wait n'existe.""" if not steps: return steps result = [steps[0]] for step in steps[1:]: if result[-1].get("type") != "wait" and step.get("type") != "wait": result.append({"type": "wait", "duration_ms": min_wait_ms}) result.append(step) return result class GraphBuilder: """ Constructeur de graphes de workflows depuis sessions brutes. Cette classe analyse une RawSession pour construire automatiquement un Workflow avec ses nodes et edges en détectant les patterns répétés. Attributes: embedding_builder: Builder pour calculer les State Embeddings faiss_manager: Manager FAISS pour indexation (optionnel) min_pattern_repetitions: Nombre minimum de répétitions pour un pattern clustering_eps: Distance maximum entre points pour DBSCAN clustering_min_samples: Nombre minimum d'échantillons par cluster Example: >>> builder = GraphBuilder(min_pattern_repetitions=3) >>> workflow = builder.build_from_session(session, "Login Workflow") """ def __init__( self, embedding_builder: Optional[StateEmbeddingBuilder] = None, faiss_manager: Optional[FAISSManager] = None, quality_validator: Optional[TrainingQualityValidator] = None, min_pattern_repetitions: int = 2, clustering_eps: float = 0.08, clustering_min_samples: int = 2, enable_quality_validation: bool = True, ui_detector: Optional[Any] = None, screen_analyzer: Optional[Any] = None, enable_ui_enrichment: bool = True, element_proximity_max_px: float = 50.0, ): """ Initialiser le GraphBuilder. Args: embedding_builder: Builder pour State Embeddings (créé si None) faiss_manager: Manager FAISS pour indexation (optionnel) quality_validator: Validateur de qualité (créé si None) min_pattern_repetitions: Nombre minimum de répétitions pour un pattern clustering_eps: Epsilon pour DBSCAN (distance max entre points) clustering_min_samples: Nombre minimum d'échantillons pour un cluster enable_quality_validation: Activer la validation de qualité ui_detector: UIDetector optionnel. Si fourni, sera utilisé par l'analyzer lazy-initialisé. Sinon, fallback sur le singleton partagé (`get_screen_analyzer()`). screen_analyzer: Instance ScreenAnalyzer à utiliser directement. Si None, lazy init via le singleton partagé C1. enable_ui_enrichment: Active l'enrichissement visuel des ScreenStates lors de `_create_screen_states` (OCR + UIDetector). False = comportement historique (ui_elements=[], detected_text=[]). element_proximity_max_px: Distance maximale (en pixels) entre un clic et le bbox le plus proche pour qu'un UIElement soit considéré comme cible. Au-delà, le clic reste sans ancre. """ self.embedding_builder = embedding_builder or StateEmbeddingBuilder() self.faiss_manager = faiss_manager self.quality_validator = quality_validator or TrainingQualityValidator() self.min_pattern_repetitions = min_pattern_repetitions self.clustering_eps = clustering_eps self.clustering_min_samples = clustering_min_samples self.enable_quality_validation = enable_quality_validation self.enable_ui_enrichment = enable_ui_enrichment self.element_proximity_max_px = element_proximity_max_px # UIDetector explicite (optionnel) — injecté dans l'analyzer lazy. self._ui_detector = ui_detector # Instance ScreenAnalyzer. Si fournie, on l'utilise telle quelle ; # sinon, on bascule sur le singleton partagé (lazy init). self._screen_analyzer = screen_analyzer logger.info( f"GraphBuilder initialized: " f"min_repetitions={min_pattern_repetitions}, " f"eps={clustering_eps}, " f"min_samples={clustering_min_samples}, " f"quality_validation={enable_quality_validation}, " f"ui_enrichment={enable_ui_enrichment}" ) # ------------------------------------------------------------------ # Résolution paresseuse du ScreenAnalyzer (singleton C1 par défaut) # ------------------------------------------------------------------ def _get_screen_analyzer(self): """ Retourner l'instance ScreenAnalyzer à utiliser. Priorité : 1. Instance injectée via le constructeur (`screen_analyzer=…`). 2. Singleton partagé `get_screen_analyzer()` (C1) — évite le double chargement GPU quand ExecutionLoop et stream_processor tournent. 3. En dernier recours (import circulaire, tests), création locale. """ if self._screen_analyzer is not None: return self._screen_analyzer try: from core.pipeline import get_screen_analyzer self._screen_analyzer = get_screen_analyzer( ui_detector=self._ui_detector, ) return self._screen_analyzer except Exception as e: logger.warning( f"Impossible d'obtenir le ScreenAnalyzer singleton " f"({e}); fallback sur une instance locale." ) try: from core.pipeline.screen_analyzer import ScreenAnalyzer self._screen_analyzer = ScreenAnalyzer( ui_detector=self._ui_detector, ) return self._screen_analyzer except Exception as e2: logger.error( f"Impossible d'instancier ScreenAnalyzer: {e2}. " "Enrichissement UI désactivé." ) return None def build_from_session( self, session: RawSession, workflow_name: Optional[str] = None, precomputed_states: Optional[List["ScreenState"]] = None, precomputed_embeddings: Optional[List] = None, sequential: bool = False, ) -> Workflow: """ Construire un Workflow complet depuis une RawSession. Processus: 1. Créer ScreenStates depuis screenshots (ou utiliser precomputed_states) 2. Calculer embeddings pour chaque état (ou réutiliser precomputed_embeddings) 3. Détecter patterns via clustering (ou mode séquentiel) 4. Construire nodes depuis clusters 5. Construire edges depuis transitions Args: session: Session brute à analyser workflow_name: Nom du workflow (généré si None) precomputed_states: ScreenStates déjà analysés (streaming). Si fourni, saute l'étape 1 (pas de re-analyse via ScreenAnalyzer). precomputed_embeddings: Embeddings déjà calculés (streaming). Si fourni et de la bonne longueur (= len(screen_states)), saute l'étape 2 (pas de recalcul CLIP). sequential: Si True, crée un node par état d'écran (pas de clustering DBSCAN). Approprié pour les enregistrements single-pass d'un workflow — chaque screenshot est une étape distincte avec ses actions associées. Returns: Workflow construit avec nodes et edges Raises: ValueError: Si la session est vide ou invalide """ if not precomputed_states and not session.screenshots: raise ValueError("Session has no screenshots and no precomputed states") logger.info( f"Building workflow from session {session.session_id} " f"with {len(precomputed_states or session.screenshots)} " f"{'precomputed states' if precomputed_states else 'screenshots'}" f"{' (mode séquentiel)' if sequential else ''}" ) # Étape 1: Créer ScreenStates (ou réutiliser ceux pré-calculés) if precomputed_states: screen_states = precomputed_states logger.debug(f"Using {len(screen_states)} precomputed screen states") else: screen_states = self._create_screen_states(session) logger.debug(f"Created {len(screen_states)} screen states") # Étape 2: Calculer embeddings (ou réutiliser ceux pré-calculés) if precomputed_embeddings is not None and len(precomputed_embeddings) == len(screen_states): embeddings = [np.asarray(e, dtype=np.float32) for e in precomputed_embeddings] logger.debug(f"Using {len(embeddings)} precomputed embeddings (skip CLIP)") else: if precomputed_embeddings is not None: logger.warning( f"precomputed_embeddings length mismatch " f"({len(precomputed_embeddings)} vs {len(screen_states)} states), " f"recalculating..." ) embeddings = self._compute_embeddings(screen_states) logger.debug(f"Computed {len(embeddings)} embeddings") # Étape 3: Détecter patterns ou mode séquentiel if sequential: # Mode séquentiel : chaque état d'écran est un node distinct. # Pas de clustering — essentiel pour les enregistrements single-pass # où l'on veut reproduire fidèlement la séquence des actions. clusters = {i: [i] for i in range(len(screen_states))} logger.info( f"Mode séquentiel: {len(clusters)} nodes (1 par état)" ) else: clusters = self._detect_patterns(embeddings, screen_states) logger.info(f"Detected {len(clusters)} patterns") # Étape 4: Construire nodes nodes = self._build_nodes(clusters, screen_states, embeddings) logger.info(f"Built {len(nodes)} workflow nodes") # Étape 5: Construire edges (passer les embeddings pour éviter recalcul) edges = self._build_edges( nodes, screen_states, session, embeddings=embeddings, sequential=sequential, ) logger.info(f"Built {len(edges)} workflow edges") # Créer Workflow from core.models.workflow_graph import WorkflowStats, SafetyRules, LearningConfig workflow = Workflow( workflow_id=workflow_name or f"workflow_{session.session_id}", name=workflow_name or "Unnamed Workflow", description="Auto-generated workflow", version=1, learning_state="OBSERVATION", created_at=datetime.now(), updated_at=datetime.now(), entry_nodes=[nodes[0].node_id] if nodes else [], end_nodes=[], nodes=nodes, edges=edges, safety_rules=SafetyRules(), stats=WorkflowStats(), learning=LearningConfig() ) # Étape 6: Validation de qualité quality_report = None if self.enable_quality_validation and screen_states: quality_report = self._validate_workflow_quality( workflow, screen_states, embeddings, clusters ) # Stocker le rapport dans les métadonnées du workflow workflow.metadata = workflow.metadata or {} workflow.metadata['quality_report'] = quality_report.to_dict() # Ajuster learning_state basé sur la qualité if quality_report.is_production_ready: workflow.learning_state = "AUTO_CANDIDATE" logger.info("Workflow qualité suffisante -> AUTO_CANDIDATE") else: workflow.learning_state = "OBSERVATION" logger.warning( f"Qualité insuffisante ({quality_report.overall_score:.3f}), " f"workflow reste en OBSERVATION" ) logger.info( f"Workflow '{workflow.name}' built successfully: " f"{len(nodes)} nodes, {len(edges)} edges" ) return workflow def _validate_workflow_quality( self, workflow: Workflow, screen_states: List[ScreenState], embeddings: List[np.ndarray], clusters: Dict[int, List[int]] ) -> QualityReport: """ Valider la qualité du workflow construit. Args: workflow: Workflow à valider screen_states: États d'écran utilisés embeddings: Embeddings calculés clusters: Clusters détectés Returns: QualityReport avec métriques et recommandations """ logger.info(f"Validation qualité du workflow {workflow.workflow_id}") # Préparer les données pour le validateur embeddings_array = np.array(embeddings) # Créer labels depuis les clusters labels = np.full(len(embeddings), -1) # -1 = bruit for cluster_id, indices in clusters.items(): for idx in indices: labels[idx] = cluster_id # Valider avec le TrainingQualityValidator report = self.quality_validator.validate_workflow( workflow=workflow, observations=screen_states, embeddings=embeddings_array, labels=labels ) logger.info( f"Validation terminée: score={report.overall_score:.3f}, " f"production_ready={report.is_production_ready}" ) return report def _create_screen_states(self, session: RawSession) -> List[ScreenState]: """ Créer ScreenStates enrichis depuis les screenshots de la session. Pour chaque screenshot: 1. Trouver l'événement associé pour le contexte de fenêtre 2. Créer les 4 niveaux du ScreenState 3. Optionnellement détecter les éléments UI Args: session: Session brute Returns: Liste de ScreenStates enrichis """ screen_states = [] # Créer un mapping screenshot_id -> événement screenshot_to_event = {} for event in session.events: if event.screenshot_id: screenshot_to_event[event.screenshot_id] = event # Récupérer (une seule fois) l'analyzer partagé si l'enrichissement est actif. # Le singleton C1 garantit qu'on ne recharge pas UIDetector/CLIP inutilement. analyzer = None if self.enable_ui_enrichment: analyzer = self._get_screen_analyzer() # Cache partagé (C1) : réutiliser les analyses si même screenshot est # repassé plusieurs fois (peu fréquent en construction, utile en tests). try: from core.pipeline import get_screen_state_cache state_cache = get_screen_state_cache() except Exception as e: logger.debug(f"ScreenStateCache indisponible ({e}); aucun cache utilisé.") state_cache = None enriched_count = 0 for i, screenshot in enumerate(session.screenshots): # Trouver l'événement associé event = screenshot_to_event.get(screenshot.screenshot_id) # Construire WindowContext depuis l'événement (si dispo) screen_env = session.environment.get("screen", {}) screen_res = screen_env.get("primary_resolution", [1920, 1080]) if event and event.window: window = WindowContext( app_name=event.window.app_name, window_title=event.window.title, screen_resolution=screen_res, workspace="main", monitor_index=screen_env.get("monitor_index", 0), dpi_scale=screen_env.get("dpi_scale", 100), monitors=screen_env.get("monitors"), os_theme=session.environment.get("os_theme", "unknown"), os_language=session.environment.get("os_language", "unknown"), ) else: window = WindowContext( app_name="unknown", window_title="Unknown", screen_resolution=screen_res, workspace="main", monitor_index=screen_env.get("monitor_index", 0), dpi_scale=screen_env.get("dpi_scale", 100), monitors=screen_env.get("monitors"), os_theme=session.environment.get("os_theme", "unknown"), os_language=session.environment.get("os_language", "unknown"), ) # Chemin absolu du screenshot screenshot_absolute_path = ( f"data/training/sessions/{session.session_id}/" f"{session.session_id}/{screenshot.relative_path}" ) screenshot_path = Path(screenshot_absolute_path) # Timestamp if isinstance(screenshot.captured_at, str): timestamp = datetime.fromisoformat( screenshot.captured_at.replace('Z', '+00:00') ) else: timestamp = screenshot.captured_at # ------------------------------------------------------------ # Enrichissement visuel : déléguer au ScreenAnalyzer partagé # ------------------------------------------------------------ # L'analyzer renvoie un ScreenState complet avec : # - raw (image + file_size) # - perception (OCR + embedding ref) # - ui_elements (détection UIDetector) # On récupère ces niveaux et on rebâtit un état final avec le # WindowContext et les metadata issus de la session brute (les # données "metier" que l'analyzer ignore). # ------------------------------------------------------------ detected_text: List[str] = [] text_method = "none" ui_elements: List = [] raw = RawLevel( screenshot_path=str(screenshot_path), capture_method="mss", file_size_bytes=( screenshot_path.stat().st_size if screenshot_path.exists() else 0 ), ) if analyzer is not None and screenshot_path.exists(): try: # Construire l'info fenêtre pour donner le contexte à # l'UIDetector (certains détecteurs s'en servent pour # filtrer hors-fenêtre). window_info = { "app_name": window.app_name, "title": window.window_title, "screen_resolution": list(window.screen_resolution or []), } analyzed = analyzer.analyze( str(screenshot_path), window_info=window_info, enable_ocr=True, enable_ui_detection=True, session_id=session.session_id, ) detected_text = list(analyzed.perception.detected_text or []) text_method = ( analyzed.perception.text_detection_method or "none" ) ui_elements = list(analyzed.ui_elements or []) # Garder les métriques OCR/UI si présentes (debug) analyzer_metadata = dict(analyzed.metadata or {}) raw = analyzed.raw # conserver file_size réel mesuré if ui_elements: enriched_count += 1 except Exception as e: logger.warning( f"Enrichissement visuel échoué pour {screenshot_path}: {e}. " "Fallback sur ScreenState minimal." ) analyzer_metadata = {"analyzer_error": str(e)} else: analyzer_metadata = {} if self.enable_ui_enrichment and not screenshot_path.exists(): logger.debug( f"Screenshot introuvable: {screenshot_path} " "— ui_elements restera vide" ) # PerceptionLevel : vector_id calculé de façon déterministe. perception = PerceptionLevel( embedding=EmbeddingRef( provider="openclip_ViT-B-32", vector_id=( f"data/embeddings/screens/" f"{session.session_id}_state_{i:04d}.npy" ), dimensions=512, ), detected_text=detected_text, text_detection_method=text_method, confidence_avg=0.85 if detected_text else 0.0, ) # ContextLevel (métier) context = ContextLevel( current_workflow_candidate=None, workflow_step=i, user_id=session.user.get("id", "unknown"), tags=( list(session.context.get("tags", [])) if isinstance(session.context.get("tags"), list) else [] ), business_variables={}, ) # Metadata : on garde le lien événement/session + éventuels # compteurs remontés par l'analyzer. metadata = { "screenshot_id": screenshot.screenshot_id, "event_type": event.type if event else None, "event_time": event.t if event else None, } # Propager les indicateurs utiles de l'analyzer sans écraser la base. for key in ("ocr_ms", "ui_ms", "analyzer_error"): if key in analyzer_metadata: metadata[key] = analyzer_metadata[key] state = ScreenState( screen_state_id=f"{session.session_id}_state_{i:04d}", timestamp=timestamp, session_id=session.session_id, window=window, raw=raw, perception=perception, context=context, metadata=metadata, ui_elements=ui_elements, ) screen_states.append(state) logger.info( f"Created {len(screen_states)} enriched screen states " f"({enriched_count} avec UI détectée, " f"ui_enrichment={self.enable_ui_enrichment})" ) return screen_states def _compute_embeddings( self, screen_states: List[ScreenState] ) -> List[np.ndarray]: """ Calculer State Embeddings pour tous les états. Utilise StateEmbeddingBuilder pour générer des embeddings multi-modaux (image + texte + UI). Ajoute optionnellement les embeddings à l'index FAISS. Args: screen_states: Liste de ScreenStates Returns: Liste de vecteurs d'embeddings (numpy arrays) """ embeddings = [] for state in screen_states: # Construire embedding state_embedding = self.embedding_builder.build(state) vector = state_embedding.get_vector() embeddings.append(vector) # Ajouter à FAISS si disponible if self.faiss_manager: self.faiss_manager.add_embedding( state.screen_state_id, vector, {"state_id": state.screen_state_id}, ) return embeddings def _detect_patterns( self, embeddings: List[np.ndarray], screen_states: List[ScreenState], ) -> Dict[int, List[int]]: """ Détecter patterns répétés via clustering hybride. Algorithme: 1. Pré-clustering par titre de fenêtre (discriminant fort) 2. DBSCAN au sein de chaque groupe de titres (affine par vision) 3. Filtrer clusters trop petits Le titre de fenêtre est le signal le plus discriminant entre écrans. CLIP/embeddings seul ne distingue pas assez les screenshots d'apps différentes (distance cosine max ~0.12 entre Notepad et Explorer). Args: embeddings: Vecteurs d'embeddings screen_states: ScreenStates correspondants Returns: Dictionnaire {cluster_id: [indices des états]} """ if len(embeddings) < self.min_pattern_repetitions: logger.warning( f"Not enough states ({len(embeddings)}) for pattern detection " f"(minimum: {self.min_pattern_repetitions})" ) return {} # Étape 1 : Pré-clustering par titre de fenêtre (ou app_name) title_groups = defaultdict(list) for idx, state in enumerate(screen_states): # Extraire le titre de fenêtre — le meilleur discriminant title = "" if hasattr(state, 'window') and state.window: title = getattr(state.window, 'window_title', '') or '' # Normaliser : garder le nom de l'app (partie avant le tiret) # Ex: "Sans titre - Bloc-notes" → "Bloc-notes" # Ex: "Google - Chrome" → "Chrome" app_key = self._normalize_window_title(title) if title else "__no_title__" title_groups[app_key].append(idx) n_title_groups = len(title_groups) logger.info( f"Window title pre-clustering: {n_title_groups} groups " f"({', '.join(f'{k}:{len(v)}' for k, v in title_groups.items())})" ) # Étape 2 : Si les titres distinguent bien les écrans, utiliser directement # Si un seul groupe (titres identiques ou absents), fallback DBSCAN pur if n_title_groups <= 1: return self._dbscan_clustering(embeddings, screen_states) # Étape 3 : Chaque groupe de titres devient un cluster # Optionnellement, DBSCAN affine les groupes avec >5 screenshots final_clusters = {} cluster_id = 0 noise_count = 0 for title_key, indices in title_groups.items(): if len(indices) < self.min_pattern_repetitions: noise_count += len(indices) continue if len(indices) > 5: # Sous-clustering DBSCAN pour les gros groupes sub_embeddings = [embeddings[i] for i in indices] sub_X = np.array(sub_embeddings) sub_clustering = DBSCAN( eps=self.clustering_eps, min_samples=self.clustering_min_samples, metric="cosine", ) sub_labels = sub_clustering.fit_predict(sub_X) sub_clusters = defaultdict(list) for local_idx, label in enumerate(sub_labels): if label == -1: noise_count += 1 else: sub_clusters[label].append(indices[local_idx]) for sub_indices in sub_clusters.values(): if len(sub_indices) >= self.min_pattern_repetitions: final_clusters[cluster_id] = sub_indices cluster_id += 1 else: # Petit groupe → 1 cluster direct final_clusters[cluster_id] = indices cluster_id += 1 logger.info( f"Hybrid clustering results: {len(final_clusters)} patterns " f"(from {n_title_groups} title groups), {noise_count} noise points" ) return final_clusters def _normalize_window_title(self, title: str) -> str: """Normaliser un titre de fenêtre pour le clustering. Extrait le nom de l'application (partie significative) en ignorant le contenu variable (nom de fichier, URL, etc.). Ex: "document.txt - Bloc-notes" → "Bloc-notes" Ex: "Google - Chrome" → "Chrome" Ex: "C:\\Users\\... - Explorateur" → "Explorateur" """ if not title: return "__no_title__" # Séparateurs courants dans les titres Windows for sep in [" - ", " — ", " – ", " | "]: if sep in title: parts = title.split(sep) # Le nom de l'app est généralement le dernier segment return parts[-1].strip() return title.strip() def _dbscan_clustering( self, embeddings: List[np.ndarray], screen_states: List[ScreenState], ) -> Dict[int, List[int]]: """Clustering DBSCAN pur (fallback quand les titres ne discriminent pas).""" X = np.array(embeddings) clustering = DBSCAN( eps=self.clustering_eps, min_samples=self.clustering_min_samples, metric="cosine", ) labels = clustering.fit_predict(X) clusters = defaultdict(list) noise_count = 0 for idx, label in enumerate(labels): if label == -1: noise_count += 1 else: clusters[label].append(idx) filtered_clusters = { cluster_id: indices for cluster_id, indices in clusters.items() if len(indices) >= self.min_pattern_repetitions } logger.info( f"DBSCAN clustering results: {len(filtered_clusters)} patterns, " f"{noise_count} noise points, " f"{len(clusters) - len(filtered_clusters)} small clusters filtered" ) return filtered_clusters def _build_nodes( self, clusters: Dict[int, List[int]], screen_states: List[ScreenState], embeddings: List[np.ndarray], ) -> List[WorkflowNode]: """ Construire WorkflowNodes depuis les clusters détectés. Pour chaque cluster: 1. Calculer embedding prototype (moyenne normalisée) 2. Extraire contraintes depuis états du cluster 3. Créer ScreenTemplate 4. Créer WorkflowNode Args: clusters: Clusters détectés {cluster_id: [indices]} screen_states: ScreenStates embeddings: Embeddings Returns: Liste de WorkflowNodes """ nodes = [] for cluster_id, indices in clusters.items(): # Calculer embedding prototype (moyenne) cluster_embeddings = [embeddings[i] for i in indices] prototype = np.mean(cluster_embeddings, axis=0) prototype = prototype / np.linalg.norm(prototype) # Normaliser # Extraire contraintes depuis les états du cluster cluster_states = [screen_states[i] for i in indices] template = self._create_screen_template(cluster_states, prototype) # Créer node node = WorkflowNode( node_id=f"node_{cluster_id:03d}", name=f"State Pattern {cluster_id}", description=f"Pattern auto-détecté ({len(indices)} observations)", template=template, metadata={ "observation_count": len(indices), "_prototype_vector": prototype.tolist(), }, ) nodes.append(node) logger.debug( f"Created node {node.node_id} with {len(indices)} observations" ) return nodes def _create_screen_template( self, states: List[ScreenState], prototype_embedding: np.ndarray, ) -> ScreenTemplate: """ Créer un ScreenTemplate depuis un cluster d'états. Extrait les contraintes communes à tous les états du cluster : - window_title_pattern : titre de fenêtre commun - required_text_patterns : textes présents dans la majorité des états - required_ui_elements : rôles/types UI récurrents Args: states: États du cluster prototype_embedding: Embedding prototype Returns: ScreenTemplate avec contraintes extraites """ # --- Extraction du titre de fenêtre commun --- window_title_pattern = self._extract_window_pattern(states) # --- Extraction des textes récurrents --- required_text_patterns = self._extract_common_texts(states) # --- Extraction des éléments UI récurrents --- required_ui_elements = self._extract_common_ui_elements(states) # Construire les sous-objets de contraintes window_constraint = WindowConstraint( title_pattern=window_title_pattern, title_contains=window_title_pattern, ) text_constraint = TextConstraint( required_texts=required_text_patterns, ) ui_roles = [ e.get("role", "") for e in required_ui_elements if e.get("role") ] ui_constraint = UIConstraint( required_roles=ui_roles, ) embedding_proto = EmbeddingPrototype( provider="openclip_ViT-B-32", vector_id="", # Le vecteur est stocké dans node.metadata._prototype_vector min_cosine_similarity=0.85, sample_count=len(states), ) return ScreenTemplate( window=window_constraint, text=text_constraint, ui=ui_constraint, embedding=embedding_proto, ) def _extract_window_pattern(self, states: List[ScreenState]) -> Optional[str]: """Extraire un pattern de titre de fenêtre commun aux états du cluster.""" titles = [s.window.window_title for s in states if s.window.window_title] if not titles: return None # Si tous les titres sont identiques, retourner directement if len(set(titles)) == 1: return titles[0] # Trouver le préfixe commun le plus long prefix = os.path.commonprefix(titles) if len(prefix) >= 5: return prefix.rstrip(" -–—|") # Fallback: le titre le plus fréquent from collections import Counter most_common = Counter(titles).most_common(1)[0][0] return most_common def _extract_common_texts( self, states: List[ScreenState], min_presence_ratio: float = 0.6 ) -> List[str]: """ Extraire les textes présents dans la majorité des états du cluster. Args: states: États du cluster min_presence_ratio: Proportion minimale de présence (0.6 = 60% des états) """ if not states: return [] # Collecter les textes de chaque état text_counts: Dict[str, int] = defaultdict(int) states_with_text = 0 for state in states: if hasattr(state.perception, 'detected_text') and state.perception.detected_text: states_with_text += 1 seen_in_state = set() for text in state.perception.detected_text: normalized = text.strip().lower() if len(normalized) >= 3 and normalized not in seen_in_state: text_counts[normalized] += 1 seen_in_state.add(normalized) if states_with_text == 0: return [] # Garder les textes présents dans au moins min_presence_ratio des états threshold = max(2, int(states_with_text * min_presence_ratio)) common_texts = [ text for text, count in text_counts.items() if count >= threshold ] # Limiter à 10 textes les plus fréquents common_texts.sort(key=lambda t: text_counts[t], reverse=True) return common_texts[:10] def _extract_common_ui_elements( self, states: List[ScreenState], min_presence_ratio: float = 0.5 ) -> List[Dict[str, Any]]: """ Extraire les types/rôles d'éléments UI récurrents dans le cluster. Retourne une liste de contraintes UI au format: [{"type": "button", "role": "validate", "min_count": 1}, ...] """ if not states: return [] # Compter les paires (type, role) dans chaque état role_counts: Dict[str, int] = defaultdict(int) type_counts: Dict[str, int] = defaultdict(int) states_with_ui = 0 for state in states: if state.ui_elements: states_with_ui += 1 seen_roles = set() seen_types = set() for el in state.ui_elements: el_type = getattr(el, 'type', 'unknown') el_role = getattr(el, 'role', 'unknown') if el_role != 'unknown' and el_role not in seen_roles: role_counts[el_role] += 1 seen_roles.add(el_role) if el_type != 'unknown' and el_type not in seen_types: type_counts[el_type] += 1 seen_types.add(el_type) if states_with_ui == 0: return [] threshold = max(2, int(states_with_ui * min_presence_ratio)) constraints = [] # Ajouter les rôles récurrents for role, count in role_counts.items(): if count >= threshold: constraints.append({ "role": role, "min_count": 1, }) # Limiter à 8 contraintes constraints.sort(key=lambda c: role_counts.get(c.get("role", ""), 0), reverse=True) return constraints[:8] # ------------------------------------------------------------------ # Association spatiale clic → UIElement # ------------------------------------------------------------------ def _find_clicked_element( self, event: Event, ui_elements: List[Any], ) -> Optional[Any]: """ Identifier l'UIElement cible d'un clic par proximité spatiale. Règle : 1. Si un bbox contient strictement la position du clic → match. 2. Sinon, on prend le bbox le plus proche (distance euclidienne au bord) sous réserve qu'il soit à <= `element_proximity_max_px`. 3. Sinon, aucun ancrage possible → None. Cette association transforme un clic "aveugle" (coordonnées brutes) en un clic "intelligent" (rôle + label), permettant au matcher de retrouver l'élément même si la résolution ou la position change. Args: event: Événement `mouse_click` (avec `data["pos"] = [x, y]`). ui_elements: Liste des UIElement détectés sur l'écran source. Returns: UIElement le plus pertinent, ou None si rien ne correspond. """ if not ui_elements: return None if not event or event.type != "mouse_click": return None pos = event.data.get("pos") if event.data else None if not pos or len(pos) < 2: return None try: click_x = float(pos[0]) click_y = float(pos[1]) except (TypeError, ValueError): return None best_contained = None best_contained_area = None best_near = None best_near_distance = None for element in ui_elements: bbox = getattr(element, "bbox", None) if bbox is None: continue # Extraction défensive des coordonnées (BBox Pydantic ou tuple) try: bx = int(getattr(bbox, "x", bbox[0])) by = int(getattr(bbox, "y", bbox[1])) bw = int(getattr(bbox, "width", bbox[2])) bh = int(getattr(bbox, "height", bbox[3])) except (AttributeError, IndexError, TypeError): continue # Cas 1 : la position est strictement dans le bbox. if bx <= click_x <= bx + bw and by <= click_y <= by + bh: # Sélectionner le plus petit bbox qui contient (élément le plus spécifique) area = max(1, bw * bh) if best_contained is None or area < best_contained_area: best_contained = element best_contained_area = area continue # Cas 2 : calculer la distance au bord le plus proche. dx = max(bx - click_x, 0, click_x - (bx + bw)) dy = max(by - click_y, 0, click_y - (by + bh)) distance = (dx * dx + dy * dy) ** 0.5 if best_near is None or distance < best_near_distance: best_near = element best_near_distance = distance if best_contained is not None: return best_contained if ( best_near is not None and best_near_distance is not None and best_near_distance <= self.element_proximity_max_px ): return best_near return None # Patterns d'erreur courants pour la détection fail_fast _ERROR_PATTERNS = [ "erreur", "error", "échec", "failed", "impossible", "accès refusé", "access denied", "not found", "timeout", "connexion perdue", "session expirée", ] def _build_edges( self, nodes: List[WorkflowNode], screen_states: List[ScreenState], session: RawSession, embeddings: Optional[List[np.ndarray]] = None, sequential: bool = False, ) -> List[WorkflowEdge]: """ Construire WorkflowEdges depuis les transitions observées. Algorithme: 1. Mapper chaque ScreenState vers son node (via embedding similarity) En mode séquentiel, le mapping est direct (state i → node i). 2. Identifier les transitions (state_i -> state_j où node change) 3. Extraire l'action depuis l'événement entre les deux états 4. Créer WorkflowEdge avec action, pré-conditions et post-conditions Les EdgeConstraints sont peuplées depuis le ScreenTemplate du node source : - required_window_title, required_app_name, min_source_similarity - required_texts (textes OCR fréquents dans le cluster source) Les PostConditions sont peuplées depuis le ScreenTemplate du node cible : - expected_window_title, expected_app_name, min_target_similarity - expected_texts + forbidden_texts (patterns d'erreur courants) Args: nodes: WorkflowNodes construits screen_states: ScreenStates session: Session brute (pour événements) embeddings: Embeddings pré-calculés (évite un recalcul dans _map_states_to_nodes) sequential: Mode séquentiel — chaque paire consécutive = transition Returns: Liste de WorkflowEdges """ if not nodes or len(screen_states) < 2: logger.warning("Not enough data to build edges") return [] edges = [] edge_counts = defaultdict(int) # Pour compter les occurrences de chaque transition # Index des nodes par ID pour accès rapide node_by_id = {node.node_id: node for node in nodes} # Étape 1: Mapper chaque état vers son node if sequential: # Mode séquentiel : mapping direct state[i] → node[i] state_to_node = {} for i, state in enumerate(screen_states): if i < len(nodes): state_to_node[state.screen_state_id] = nodes[i].node_id logger.debug( f"Mode séquentiel: {len(state_to_node)} states mappés directement" ) else: state_to_node = self._map_states_to_nodes( screen_states, nodes, embeddings=embeddings ) # Étape 2: Récupérer la résolution d'écran pour normaliser les coordonnées screen_env = session.environment.get("screen", {}) screen_resolution = tuple(screen_env.get("primary_resolution", [1920, 1080])) # Étape 3: Parcourir les transitions for i in range(len(screen_states) - 1): current_state = screen_states[i] next_state = screen_states[i + 1] current_node_id = state_to_node.get(current_state.screen_state_id) next_node_id = state_to_node.get(next_state.screen_state_id) # En mode séquentiel, chaque paire consécutive est une transition # En mode clustering, uniquement si les nodes sont différents if current_node_id and next_node_id and ( sequential or current_node_id != next_node_id ): # Trouver TOUS les événements entre les deux états transition_events = self._find_transition_events( current_state, next_state, session.events ) # Créer l'edge edge_key = f"{current_node_id}_to_{next_node_id}" edge_counts[edge_key] += 1 # Ne créer l'edge qu'une fois, mais compter les occurrences if edge_counts[edge_key] == 1: source_node = node_by_id.get(current_node_id) target_node = node_by_id.get(next_node_id) edge = self._create_edge( current_node_id, next_node_id, transition_events[-1] if transition_events else None, edge_key, source_node=source_node, target_node=target_node, all_events=transition_events, screen_resolution=screen_resolution, source_state=current_state, ) edges.append(edge) # Mettre à jour les stats des edges avec les comptages for edge in edges: edge_key = f"{edge.from_node}_to_{edge.to_node}" edge.stats.execution_count = edge_counts[edge_key] edge.stats.success_count = edge_counts[edge_key] logger.info(f"Built {len(edges)} edges from {sum(edge_counts.values())} transitions") return edges def _map_states_to_nodes( self, screen_states: List[ScreenState], nodes: List[WorkflowNode], embeddings: Optional[List[np.ndarray]] = None, ) -> Dict[str, str]: """ Mapper chaque ScreenState vers le node le plus proche. Utilise la similarité d'embedding pour trouver le meilleur match. Args: screen_states: Liste de ScreenStates à mapper. nodes: WorkflowNodes cibles. embeddings: Embeddings pré-calculés (même ordre que screen_states). Si fourni, évite de recalculer via self.embedding_builder.build(). """ state_to_node = {} # Récupérer les embeddings des prototypes de nodes node_prototypes = {} for node in nodes: # Priorité : vecteur en mémoire (metadata), sinon chargement depuis disque proto_list = node.metadata.get("_prototype_vector") if proto_list is not None: node_prototypes[node.node_id] = np.array(proto_list, dtype=np.float32) elif node.template and node.template.embedding and node.template.embedding.vector_id: proto_path = Path(node.template.embedding.vector_id) if proto_path.exists(): node_prototypes[node.node_id] = np.load(proto_path) if not node_prototypes: logger.warning("No node prototypes available for mapping") return state_to_node # Vérifier si les embeddings pré-calculés sont utilisables use_precomputed = ( embeddings is not None and len(embeddings) == len(screen_states) ) if use_precomputed: logger.debug("_map_states_to_nodes: using precomputed embeddings") # Pour chaque état, trouver le node le plus proche for i, state in enumerate(screen_states): try: # Utiliser l'embedding pré-calculé ou recalculer if use_precomputed: state_vector = np.asarray(embeddings[i], dtype=np.float32) else: state_embedding = self.embedding_builder.build(state) state_vector = state_embedding.get_vector() # Trouver le node avec la meilleure similarité best_node_id = None best_similarity = -1 for node_id, prototype in node_prototypes.items(): similarity = np.dot(state_vector, prototype) if similarity > best_similarity: best_similarity = similarity best_node_id = node_id if best_node_id and best_similarity > 0.7: # Seuil minimum state_to_node[state.screen_state_id] = best_node_id except Exception as e: logger.warning(f"Failed to map state {state.screen_state_id}: {e}") return state_to_node def _get_state_time(self, state: ScreenState, fallback: float = 0) -> float: """Extraire le timestamp d'un ScreenState. Priorité : 1. metadata['event_time'] (set par _create_screen_states) 2. metadata['shot_timestamp'] (set par le reprocessing) 3. state.timestamp converti en epoch si c'est un datetime 4. fallback Note : event_time peut être 0.0 (timestamps relatifs), donc on vérifie `is not None` et non `> 0`. """ if state.metadata: et = state.metadata.get("event_time") if et is not None: return float(et) st = state.metadata.get("shot_timestamp") if st is not None: return float(st) if state.timestamp: try: return state.timestamp.timestamp() except (AttributeError, OSError): pass return fallback def _find_transition_events( self, current_state: ScreenState, next_state: ScreenState, events: List[Event], ) -> List[Event]: """ Trouver TOUS les événements entre deux états (pas juste le dernier). Collecte toutes les actions utilisateur (clics, frappes, saisie texte) qui se sont produites entre les timestamps des deux ScreenStates. C'est essentiel pour le replay : une transition peut nécessiter plusieurs actions (ex: Win+R → taper "notepad" → Entrée). Timestamps : utilise _get_state_time() qui supporte plusieurs sources (event_time, shot_timestamp, datetime). Args: current_state: État source next_state: État cible events: Tous les événements de la session Returns: Liste ordonnée (par timestamp) de tous les événements d'action entre les deux états. Peut être vide. """ current_time = self._get_state_time(current_state, fallback=0) next_time = self._get_state_time(next_state, fallback=float('inf')) action_events = [] for event in events: if current_time <= event.t < next_time: if event.type in ["mouse_click", "key_press", "text_input"]: # Filtrer les événements parasites (modificateurs seuls, texte vide) if not _is_parasitic_event(event): action_events.append(event) return action_events def _find_transition_event( self, current_state: ScreenState, next_state: ScreenState, events: List[Event] ) -> Optional[Event]: """ Trouver l'événement qui a causé la transition entre deux états. DEPRECATED: Utiliser _find_transition_events() pour obtenir TOUS les événements. Conservé pour rétrocompatibilité. """ all_events = self._find_transition_events(current_state, next_state, events) return all_events[-1] if all_events else None def _create_edge( self, from_node: str, to_node: str, event: Optional[Event], edge_id: str, source_node: Optional[WorkflowNode] = None, target_node: Optional[WorkflowNode] = None, all_events: Optional[List[Event]] = None, screen_resolution: Tuple[int, int] = (1920, 1080), source_state: Optional[ScreenState] = None, ) -> WorkflowEdge: """ Créer un WorkflowEdge depuis une transition observée. Peuple les EdgeConstraints depuis le template du node source et les PostConditions depuis le template du node cible. Si plusieurs événements sont associés à la transition (all_events), crée une action "compound" contenant toutes les étapes (steps), ce qui permet au replay de reproduire fidèlement la séquence complète (ex: Win+R → taper "notepad" → Entrée). Args: from_node: ID du node source to_node: ID du node cible event: Dernier événement de la transition (rétrocompatibilité) edge_id: Identifiant de l'edge source_node: Node source (pour extraire les pré-conditions) target_node: Node cible (pour extraire les post-conditions) all_events: TOUS les événements entre les deux états (si disponible) screen_resolution: Résolution de référence pour normaliser les coordonnées """ # Si on a plusieurs événements, créer une action compound events_to_use = all_events or ([event] if event else []) # UIElements de l'écran source — sert à ancrer les clics sur un vrai # élément UI (rôle, texte, bbox) plutôt que sur une coordonnée brute. source_ui_elements = ( list(source_state.ui_elements) if source_state and source_state.ui_elements else [] ) if len(events_to_use) > 1: action = self._build_compound_action( events_to_use, screen_resolution, source_ui_elements=source_ui_elements, ) elif len(events_to_use) == 1: action = self._build_single_action( events_to_use[0], source_ui_elements=source_ui_elements, ) else: action = Action( type="unknown", target=TargetSpec( by_role="unknown", selection_policy="first", fallback_strategy="visual_similarity", ), parameters={}, ) # --------------------------------------------------------------- # Pré-conditions (EdgeConstraints) depuis le template du node source # --------------------------------------------------------------- constraints = self._build_edge_constraints(source_node) # --------------------------------------------------------------- # Post-conditions depuis le template du node cible # --------------------------------------------------------------- post_conditions = self._build_post_conditions( to_node, source_node, target_node ) # Créer l'edge from core.models.workflow_graph import EdgeStats # Metadata enrichie pour le diagnostic edge_metadata = { "auto_generated": True, "event_count": len(events_to_use), } if events_to_use: edge_metadata["created_from_event"] = events_to_use[-1].type if len(events_to_use) > 1: edge_metadata["event_types"] = [e.type for e in events_to_use] else: edge_metadata["created_from_event"] = None return WorkflowEdge( edge_id=edge_id, from_node=from_node, to_node=to_node, action=action, constraints=constraints, post_conditions=post_conditions, stats=EdgeStats(), metadata=edge_metadata, ) def _build_single_action( self, event: Event, source_ui_elements: Optional[List[Any]] = None, ) -> Action: """ Construire une Action simple depuis un seul événement. Pour un clic, si `source_ui_elements` est fourni, on tente d'ancrer l'action sur l'UIElement le plus proche (par proximité spatiale). Le TargetSpec devient alors discriminant : - `by_role` = rôle sémantique de l'élément (ex: "primary_action") - `by_text` = label détecté (ex: "Valider") - `selection_policy` = "by_similarity" (laisse le matcher scorer) - `context_hints["anchor_element_id"]` = traçabilité - `context_hints["anchor_bbox"]` = invariant spatial debug À défaut d'ancrage (pas d'UIElement ou clic hors de toute bbox proche), on retombe sur `by_role="unknown_element"` (legacy). """ action_type = event.type action_params: Dict[str, Any] = {} target_spec: Optional[TargetSpec] = None if action_type == "mouse_click": action_params = { "button": event.data.get("button", "left"), "position": event.data.get("pos", [0, 0]), "wait_after_ms": 500, } target_spec = self._build_click_target_spec( event, source_ui_elements or [] ) elif action_type == "key_press": action_params = { "keys": event.data.get("keys", []), "wait_after_ms": 200, } target_spec = TargetSpec( by_role="keyboard_input", selection_policy="first", fallback_strategy="visual_similarity", ) elif action_type == "text_input": action_params = { "text": event.data.get("text", ""), "wait_after_ms": 300, } target_spec = TargetSpec( by_role="text_field", selection_policy="first", fallback_strategy="visual_similarity", ) else: action_params = {} target_spec = TargetSpec( by_role="unknown", selection_policy="first", fallback_strategy="visual_similarity", ) return Action( type=action_type, target=target_spec, parameters=action_params, ) def _build_click_target_spec( self, event: Event, source_ui_elements: List[Any], ) -> TargetSpec: """ Construire un TargetSpec pour un clic, en essayant de l'ancrer à un UIElement détecté sur l'écran source. Retourne toujours un TargetSpec valide : - ancré (role + text + context_hints) si un élément proche existe ; - fallback `unknown_element` sinon (comportement historique). """ clicked = self._find_clicked_element(event, source_ui_elements) if clicked is None: return TargetSpec( by_role="unknown_element", selection_policy="first", fallback_strategy="visual_similarity", ) # Extraction défensive des attributs de l'élément. role = getattr(clicked, "role", None) or "unknown_element" label = getattr(clicked, "label", None) or None element_id = getattr(clicked, "element_id", None) # Contexte de traçabilité — `context_hints` est le seul dict libre # disponible dans TargetSpec (pas de champ `metadata` dédié). context_hints: Dict[str, Any] = {} if element_id: context_hints["anchor_element_id"] = str(element_id) bbox = getattr(clicked, "bbox", None) if bbox is not None: try: context_hints["anchor_bbox"] = { "x": int(getattr(bbox, "x", bbox[0])), "y": int(getattr(bbox, "y", bbox[1])), "width": int(getattr(bbox, "width", bbox[2])), "height": int(getattr(bbox, "height", bbox[3])), } except (AttributeError, IndexError, TypeError): pass # Center (utile comme ancre de fallback quand le matcher échoue) center = getattr(clicked, "center", None) if center is not None: try: context_hints["anchor_center"] = [int(center[0]), int(center[1])] except (IndexError, TypeError): pass return TargetSpec( by_role=role, by_text=label, selection_policy="by_similarity", fallback_strategy="visual_similarity", context_hints=context_hints, ) def _build_compound_action( self, events: List[Event], screen_resolution: Tuple[int, int] = (1920, 1080), source_ui_elements: Optional[List[Any]] = None, ) -> Action: """ Construire une Action compound (multi-étapes) depuis plusieurs événements. Quand l'utilisateur fait plusieurs actions entre deux changements d'écran (ex: Win+R → taper "notepad" → Entrée), on les regroupe en une seule action compound avec une liste de steps ordonnés. Des waits sont insérés automatiquement entre les étapes quand le délai réel dépasse 500ms (plafonné à 5000ms). Args: events: Liste ordonnée d'événements (>= 2) screen_resolution: Résolution de référence (largeur, hauteur) Returns: Action de type "compound" avec steps dans parameters """ steps = [] ref_w, ref_h = screen_resolution for i, event in enumerate(events): # Insérer un wait si le délai entre deux événements est > 500ms if i > 0: delta_ms = int((event.t - events[i - 1].t) * 1000) if delta_ms > 500: steps.append({ "type": "wait", "duration_ms": min(delta_ms, 5000), }) if event.type == "mouse_click": pos = event.data.get("pos", [0, 0]) steps.append({ "type": "mouse_click", "position": list(pos), "x_pct": round(pos[0] / ref_w, 6) if ref_w > 0 else 0.0, "y_pct": round(pos[1] / ref_h, 6) if ref_h > 0 else 0.0, "button": event.data.get("button", "left"), "ref_width": ref_w, "ref_height": ref_h, }) elif event.type == "text_input": steps.append({ "type": "text_input", "text": event.data.get("text", ""), }) elif event.type == "key_press": keys = event.data.get("keys", []) if keys: steps.append({ "type": "key_press", "keys": keys, }) else: # Type inconnu : stocker quand même pour ne rien perdre steps.append({ "type": event.type, "data": dict(event.data) if event.data else {}, }) # --------------------------------------------------------------- # Nettoyage des steps parasites : # 1. Supprimer les modificateurs seuls (ctrl, alt, shift, etc.) # 2. Fusionner les text_input consécutifs (ex: "No","m",":"," " → "Nom: ") # 3. Dédupliquer les key_combo consécutifs identiques # 4. Garantir un wait minimum de 300ms entre chaque step # --------------------------------------------------------------- steps = _filter_modifier_only_steps(steps) steps = _merge_consecutive_text_steps(steps) steps = _dedup_consecutive_combos(steps) steps = _ensure_min_waits(steps) # La cible du compound = cible de la dernière action (le clic final, etc.) last_event = events[-1] if last_event.type == "mouse_click": # On tente d'ancrer le clic final aux UIElements détectés, # comme dans _build_single_action. target_spec = self._build_click_target_spec( last_event, source_ui_elements or [] ) elif last_event.type == "text_input": target_spec = TargetSpec( by_role="text_field", selection_policy="first", fallback_strategy="visual_similarity", ) elif last_event.type == "key_press": target_spec = TargetSpec( by_role="keyboard_input", selection_policy="first", fallback_strategy="visual_similarity", ) else: target_spec = TargetSpec( by_role="unknown", selection_policy="first", fallback_strategy="visual_similarity", ) return Action( type="compound", target=target_spec, parameters={ "steps": steps, "step_count": len(steps), "ref_width": ref_w, "ref_height": ref_h, }, ) def _build_edge_constraints( self, source_node: Optional[WorkflowNode], ) -> EdgeConstraints: """ Construire les EdgeConstraints (pré-conditions) depuis le node source. Extrait du ScreenTemplate du node source : - required_window_title : titre de fenêtre attendu - required_app_name : nom de l'application - min_source_similarity : seuil de similarité embedding - window / text : contraintes WindowConstraint et TextConstraint """ if not source_node or not source_node.template: return EdgeConstraints( pre_conditions={}, required_confidence=0.8, max_wait_time_ms=5000, ) tpl = source_node.template # Extraire le titre de fenêtre depuis le WindowConstraint req_title = None req_app = None window_constraint = None if tpl.window: req_title = tpl.window.title_contains or tpl.window.title_pattern req_app = tpl.window.process_name # Copier la contrainte de fenêtre complète window_constraint = WindowConstraint( title_pattern=tpl.window.title_pattern, title_contains=tpl.window.title_contains, process_name=tpl.window.process_name, ) # Extraire la contrainte de texte (required_texts du template) text_constraint = None if tpl.text and tpl.text.required_texts: # Limiter à 5 textes les plus significatifs (>= 3 car, <= 100 car) filtered = [ t for t in tpl.text.required_texts if 3 <= len(t) <= 100 ][:5] if filtered: text_constraint = TextConstraint( required_texts=filtered, ) # Seuil de similarité depuis l'embedding prototype min_sim = 0.80 if tpl.embedding and tpl.embedding.min_cosine_similarity: min_sim = tpl.embedding.min_cosine_similarity return EdgeConstraints( pre_conditions={}, required_confidence=min_sim, max_wait_time_ms=5000, window=window_constraint, text=text_constraint, min_source_similarity=min_sim, required_app_name=req_app, required_window_title=req_title, ) def _build_post_conditions( self, to_node_id: str, source_node: Optional[WorkflowNode], target_node: Optional[WorkflowNode], ) -> PostConditions: """ Construire les PostConditions depuis le node cible. Peuple : - expected_window_title / expected_app_name depuis le template cible - success checks : textes attendus (required_texts du template cible) + window_title_contains si le titre change - fail_fast checks : patterns d'erreur courants (forbidden_texts) - min_target_similarity depuis l'embedding prototype cible """ if not target_node or not target_node.template: return PostConditions( expected_node=to_node_id, window_change_expected=False, new_ui_elements_expected=[], timeout_ms=10000, ) tpl = target_node.template # Extraire les infos du node cible expected_title = None expected_app = None if tpl.window: expected_title = tpl.window.title_contains or tpl.window.title_pattern expected_app = tpl.window.process_name # Seuil de similarité cible min_sim = 0.80 if tpl.embedding and tpl.embedding.min_cosine_similarity: min_sim = tpl.embedding.min_cosine_similarity # Détecter si le titre de fenêtre change entre source et cible window_change = False if source_node and source_node.template and source_node.template.window: src_title = source_node.template.window.title_contains or source_node.template.window.title_pattern if src_title and expected_title and src_title != expected_title: window_change = True # --- Construire les checks de succès --- success_checks = [] # Check sur le titre de fenêtre cible (si défini) if expected_title: success_checks.append( PostConditionCheck(kind="window_title_contains", value=expected_title) ) # Checks sur les textes attendus dans l'écran cible if tpl.text and tpl.text.required_texts: for text in tpl.text.required_texts[:5]: if 3 <= len(text) <= 100: success_checks.append( PostConditionCheck(kind="text_present", value=text) ) # --- Construire les checks fail_fast (détection d'erreurs) --- fail_fast_checks = [] for pattern in self._ERROR_PATTERNS: fail_fast_checks.append( PostConditionCheck(kind="text_present", value=pattern) ) return PostConditions( success_mode="all", timeout_ms=10000, poll_ms=200, success=success_checks, fail_fast=fail_fast_checks, retries=2, backoff_ms=150, expected_window_title=expected_title, expected_app_name=expected_app, min_target_similarity=min_sim, expected_node=to_node_id, window_change_expected=window_change, new_ui_elements_expected=[], ) def main(): """Point d'entrée pour tests manuels.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) builder = GraphBuilder(min_pattern_repetitions=3) logger.info(f"GraphBuilder initialized: {builder}") logger.info("Ready to build workflows from sessions") if __name__ == "__main__": main()