""" Learning Pack — Format d'export anonymisé des apprentissages. Un LearningPack contient les connaissances extraites des workflows d'un client, sans aucune donnée personnelle ou sensible. Ce qu'on exporte (anonymisé) : - Embeddings CLIP des prototypes d'écran (vecteurs 512d — pas réversibles) - ScreenTemplates (contraintes UI : titres fenêtres, rôles éléments) - Structure des workflows (nodes/edges, actions, contraintes) - Patterns d'erreur rencontrés - Signatures d'applications (app_name, version) Ce qu'on N'exporte PAS : - Screenshots bruts - Textes OCR bruts (données patient potentielles) - Événements clavier bruts (mots de passe potentiels) - machine_id, hostname, IP (identification du client) Structure JSON : { "version": "1.0", "created_at": "2026-03-19T...", "source_hash": "abc123...", # SHA-256 anonyme du client "pack_id": "lp_xxx", "stats": { ... }, "app_signatures": [ ... ], "screen_prototypes": [ ... ], "workflow_skeletons": [ ... ], "ui_patterns": [ ... ], "error_patterns": [ ... ], "edge_statistics": [ ... ], } Auteur : Dom, Claude — 19 mars 2026 """ import hashlib import json import logging import uuid from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import numpy as np logger = logging.getLogger(__name__) # Version du format Learning Pack LEARNING_PACK_VERSION = "1.0" # Seuil de similarité cosinus pour considérer deux prototypes comme identiques DEDUP_COSINE_THRESHOLD = 0.95 # Longueur maximale d'un texte avant d'être considéré comme donnée OCR sensible MAX_SAFE_TEXT_LENGTH = 120 # Champs de métadonnées à exclure (données sensibles) _SENSITIVE_METADATA_KEYS = frozenset({ "screenshot_path", "screenshot", "ocr_text", "ocr_raw", "raw_text", "keyboard_events", "key_events", "input_text", "machine_id", "hostname", "ip_address", "user", "username", "patient", "patient_id", "dossier", "nip", "ipp", }) # ============================================================================ # Structures de données du Learning Pack # ============================================================================ @dataclass class AppSignature: """Signature d'une application observée.""" app_name: str version: Optional[str] = None window_title_patterns: List[str] = field(default_factory=list) observation_count: int = 1 def to_dict(self) -> Dict[str, Any]: return { "app_name": self.app_name, "version": self.version, "window_title_patterns": self.window_title_patterns, "observation_count": self.observation_count, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "AppSignature": return cls( app_name=data["app_name"], version=data.get("version"), window_title_patterns=data.get("window_title_patterns", []), observation_count=data.get("observation_count", 1), ) @dataclass class ScreenPrototype: """Prototype d'écran anonymisé (embedding + contraintes UI).""" prototype_id: str vector: Optional[List[float]] = None # Vecteur 512d sérialisé en liste provider: str = "openclip_ViT-B-32" app_name: Optional[str] = None window_constraints: Optional[Dict[str, Any]] = None text_constraints: Optional[Dict[str, Any]] = None ui_constraints: Optional[Dict[str, Any]] = None sample_count: int = 1 source_hashes: List[str] = field(default_factory=list) # Packs d'origine def to_dict(self) -> Dict[str, Any]: return { "prototype_id": self.prototype_id, "vector": self.vector, "provider": self.provider, "app_name": self.app_name, "window_constraints": self.window_constraints, "text_constraints": self.text_constraints, "ui_constraints": self.ui_constraints, "sample_count": self.sample_count, "source_hashes": self.source_hashes, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ScreenPrototype": return cls( prototype_id=data["prototype_id"], vector=data.get("vector"), provider=data.get("provider", "openclip_ViT-B-32"), app_name=data.get("app_name"), window_constraints=data.get("window_constraints"), text_constraints=data.get("text_constraints"), ui_constraints=data.get("ui_constraints"), sample_count=data.get("sample_count", 1), source_hashes=data.get("source_hashes", []), ) @dataclass class WorkflowSkeleton: """Structure anonymisée d'un workflow (sans données sensibles).""" skeleton_id: str name: str description: str learning_state: str node_names: List[str] edge_summaries: List[Dict[str, Any]] # from_node, to_node, action_type, target_role entry_nodes: List[str] end_nodes: List[str] node_count: int = 0 edge_count: int = 0 app_names: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "skeleton_id": self.skeleton_id, "name": self.name, "description": self.description, "learning_state": self.learning_state, "node_names": self.node_names, "edge_summaries": self.edge_summaries, "entry_nodes": self.entry_nodes, "end_nodes": self.end_nodes, "node_count": self.node_count, "edge_count": self.edge_count, "app_names": self.app_names, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "WorkflowSkeleton": return cls( skeleton_id=data["skeleton_id"], name=data["name"], description=data.get("description", ""), learning_state=data.get("learning_state", "OBSERVATION"), node_names=data.get("node_names", []), edge_summaries=data.get("edge_summaries", []), entry_nodes=data.get("entry_nodes", []), end_nodes=data.get("end_nodes", []), node_count=data.get("node_count", 0), edge_count=data.get("edge_count", 0), app_names=data.get("app_names", []), ) @dataclass class UIPattern: """Pattern UI universel (bouton Enregistrer, menu Fichier, etc.).""" pattern_id: str role: str # button, textfield, menu, etc. context_description: str # description du contexte window_title_patterns: List[str] = field(default_factory=list) observation_count: int = 1 cross_client_count: int = 1 # Nb de clients différents l'ayant vu confidence: float = 0.0 def to_dict(self) -> Dict[str, Any]: return { "pattern_id": self.pattern_id, "role": self.role, "context_description": self.context_description, "window_title_patterns": self.window_title_patterns, "observation_count": self.observation_count, "cross_client_count": self.cross_client_count, "confidence": self.confidence, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "UIPattern": return cls( pattern_id=data["pattern_id"], role=data.get("role", "unknown"), context_description=data.get("context_description", ""), window_title_patterns=data.get("window_title_patterns", []), observation_count=data.get("observation_count", 1), cross_client_count=data.get("cross_client_count", 1), confidence=data.get("confidence", 0.0), ) @dataclass class ErrorPattern: """Pattern d'erreur rencontré (texte d'erreur, contexte, fréquence).""" pattern_id: str error_text: str kind: str = "text_present" # kind du PostConditionCheck source app_name: Optional[str] = None observation_count: int = 1 cross_client_count: int = 1 def to_dict(self) -> Dict[str, Any]: return { "pattern_id": self.pattern_id, "error_text": self.error_text, "kind": self.kind, "app_name": self.app_name, "observation_count": self.observation_count, "cross_client_count": self.cross_client_count, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ErrorPattern": return cls( pattern_id=data["pattern_id"], error_text=data["error_text"], kind=data.get("kind", "text_present"), app_name=data.get("app_name"), observation_count=data.get("observation_count", 1), cross_client_count=data.get("cross_client_count", 1), ) @dataclass class EdgeStatistic: """Statistiques anonymisées d'une transition entre écrans.""" from_node_name: str to_node_name: str action_type: str target_role: Optional[str] = None execution_count: int = 0 success_rate: float = 0.0 avg_execution_time_ms: float = 0.0 def to_dict(self) -> Dict[str, Any]: return { "from_node_name": self.from_node_name, "to_node_name": self.to_node_name, "action_type": self.action_type, "target_role": self.target_role, "execution_count": self.execution_count, "success_rate": self.success_rate, "avg_execution_time_ms": self.avg_execution_time_ms, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "EdgeStatistic": return cls( from_node_name=data["from_node_name"], to_node_name=data["to_node_name"], action_type=data["action_type"], target_role=data.get("target_role"), execution_count=data.get("execution_count", 0), success_rate=data.get("success_rate", 0.0), avg_execution_time_ms=data.get("avg_execution_time_ms", 0.0), ) # ============================================================================ # LearningPack — conteneur principal # ============================================================================ @dataclass class LearningPack: """ Pack d'apprentissage anonymisé prêt à être échangé entre sites. Peut être sérialisé en JSON (``to_dict`` / ``from_dict``) ou sauvegardé / chargé depuis un fichier (``save`` / ``load``). """ version: str = LEARNING_PACK_VERSION created_at: str = "" source_hash: str = "" pack_id: str = "" stats: Dict[str, Any] = field(default_factory=dict) app_signatures: List[AppSignature] = field(default_factory=list) screen_prototypes: List[ScreenPrototype] = field(default_factory=list) workflow_skeletons: List[WorkflowSkeleton] = field(default_factory=list) ui_patterns: List[UIPattern] = field(default_factory=list) error_patterns: List[ErrorPattern] = field(default_factory=list) edge_statistics: List[EdgeStatistic] = field(default_factory=list) # --- Sérialisation ------------------------------------------------------- def to_dict(self) -> Dict[str, Any]: """Convertir en dictionnaire JSON-sérialisable.""" return { "version": self.version, "created_at": self.created_at, "source_hash": self.source_hash, "pack_id": self.pack_id, "stats": self.stats, "app_signatures": [a.to_dict() for a in self.app_signatures], "screen_prototypes": [p.to_dict() for p in self.screen_prototypes], "workflow_skeletons": [s.to_dict() for s in self.workflow_skeletons], "ui_patterns": [u.to_dict() for u in self.ui_patterns], "error_patterns": [e.to_dict() for e in self.error_patterns], "edge_statistics": [e.to_dict() for e in self.edge_statistics], } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "LearningPack": """Reconstruire depuis un dictionnaire.""" return cls( version=data.get("version", LEARNING_PACK_VERSION), created_at=data.get("created_at", ""), source_hash=data.get("source_hash", ""), pack_id=data.get("pack_id", ""), stats=data.get("stats", {}), app_signatures=[ AppSignature.from_dict(a) for a in data.get("app_signatures", []) ], screen_prototypes=[ ScreenPrototype.from_dict(p) for p in data.get("screen_prototypes", []) ], workflow_skeletons=[ WorkflowSkeleton.from_dict(s) for s in data.get("workflow_skeletons", []) ], ui_patterns=[ UIPattern.from_dict(u) for u in data.get("ui_patterns", []) ], error_patterns=[ ErrorPattern.from_dict(e) for e in data.get("error_patterns", []) ], edge_statistics=[ EdgeStatistic.from_dict(e) for e in data.get("edge_statistics", []) ], ) # --- Persistance fichier -------------------------------------------------- def save(self, path: Path) -> None: """Sauvegarder le pack au format JSON compressé.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as fh: json.dump(self.to_dict(), fh, indent=2, ensure_ascii=False) logger.info("Learning pack sauvegardé : %s (%d prototypes, %d skeletons)", path, len(self.screen_prototypes), len(self.workflow_skeletons)) @classmethod def load(cls, path: Path) -> "LearningPack": """Charger un pack depuis un fichier JSON.""" path = Path(path) with open(path, "r", encoding="utf-8") as fh: data = json.load(fh) pack = cls.from_dict(data) logger.info("Learning pack chargé : %s (v%s, %d prototypes)", path, pack.version, len(pack.screen_prototypes)) return pack # ============================================================================ # Fonctions utilitaires d'anonymisation # ============================================================================ def _hash_client_id(client_id: str) -> str: """Hacher un identifiant client via SHA-256 (irréversible).""" return hashlib.sha256(client_id.encode("utf-8")).hexdigest() def _sanitize_text(text: str) -> Optional[str]: """ Nettoyer un texte pour l'export. Retourne None si le texte est trop long (probable donnée OCR sensible) ou s'il contient des patterns suspects (numéros de dossier, etc.). """ if not text or len(text) > MAX_SAFE_TEXT_LENGTH: return None # Filtrer les textes qui ressemblent à des identifiants patients lower = text.lower() for suspect in ("patient", "nip:", "ipp:", "dossier n", "numéro de"): if suspect in lower: return None return text def _clean_metadata(metadata: Dict[str, Any]) -> Dict[str, Any]: """Retirer les clés sensibles d'un dictionnaire de métadonnées.""" return { k: v for k, v in metadata.items() if k.lower() not in _SENSITIVE_METADATA_KEYS } def _extract_prototype_vector(node) -> Optional[List[float]]: """ Extraire le vecteur prototype d'un WorkflowNode. Cherche dans ``node.metadata["_prototype_vector"]`` (numpy array ou liste) puis tente de charger depuis le fichier .npy référencé par le template. """ # 1. Vecteur directement stocké dans les métadonnées vec = node.metadata.get("_prototype_vector") if vec is not None: if isinstance(vec, np.ndarray): return vec.tolist() if isinstance(vec, list): return vec # 2. Fichier .npy référencé par le template embedding vector_id = node.template.embedding.vector_id if vector_id: npy_path = Path(vector_id) if npy_path.exists() and npy_path.suffix == ".npy": try: arr = np.load(str(npy_path)) return arr.tolist() except Exception as exc: logger.debug("Impossible de charger %s : %s", npy_path, exc) return None # ============================================================================ # LearningPackExporter # ============================================================================ class LearningPackExporter: """ Produit un LearningPack anonymisé à partir d'une liste de Workflows. Usage : >>> from core.models.workflow_graph import Workflow >>> exporter = LearningPackExporter() >>> pack = exporter.export(workflows, client_id="CHU-Lyon-001") >>> pack.save(Path("export/chu_lyon.json")) """ def export(self, workflows, client_id: str) -> LearningPack: """ Exporter les workflows d'un client en un LearningPack anonymisé. Args: workflows: Liste d'objets ``Workflow`` (core.models.workflow_graph). client_id: Identifiant en clair du client (sera haché). Returns: LearningPack prêt à être sauvegardé ou envoyé au serveur central. """ source_hash = _hash_client_id(client_id) pack_id = f"lp_{uuid.uuid4().hex[:12]}" app_sigs: Dict[str, AppSignature] = {} prototypes: List[ScreenPrototype] = [] skeletons: List[WorkflowSkeleton] = [] ui_patterns_map: Dict[str, UIPattern] = {} error_patterns_map: Dict[str, ErrorPattern] = {} edge_stats: List[EdgeStatistic] = [] total_nodes = 0 total_edges = 0 for wf in workflows: # --- Skeleton --- skeleton = self._extract_skeleton(wf) skeletons.append(skeleton) total_nodes += len(wf.nodes) total_edges += len(wf.edges) # --- Nodes : prototypes + app signatures + UI patterns --- for node in wf.nodes: proto = self._extract_prototype(node, source_hash, wf.workflow_id) if proto is not None: prototypes.append(proto) self._collect_app_signature(node, app_sigs) self._collect_ui_patterns(node, ui_patterns_map) # --- Edges : actions + error patterns + stats --- for edge in wf.edges: self._collect_error_patterns(edge, error_patterns_map, wf) stat = self._extract_edge_statistic(edge, wf) if stat is not None: edge_stats.append(stat) apps_seen = sorted(app_sigs.keys()) pack = LearningPack( version=LEARNING_PACK_VERSION, created_at=datetime.utcnow().isoformat(), source_hash=source_hash, pack_id=pack_id, stats={ "workflows_count": len(workflows), "total_nodes": total_nodes, "total_edges": total_edges, "apps_seen": apps_seen, "prototypes_exported": len(prototypes), }, app_signatures=list(app_sigs.values()), screen_prototypes=prototypes, workflow_skeletons=skeletons, ui_patterns=list(ui_patterns_map.values()), error_patterns=list(error_patterns_map.values()), edge_statistics=edge_stats, ) logger.info( "Learning pack exporté : %s — %d workflows, %d prototypes, %d error patterns", pack_id, len(workflows), len(prototypes), len(error_patterns_map), ) return pack # ------------------------------------------------------------------ # Extraction unitaire # ------------------------------------------------------------------ def _extract_skeleton(self, wf) -> WorkflowSkeleton: """Extraire le squelette anonymisé d'un workflow.""" node_names = [n.name for n in wf.nodes] app_names = set() edge_summaries = [] for edge in wf.edges: summary: Dict[str, Any] = { "from_node": edge.from_node, "to_node": edge.to_node, "action_type": edge.action.type, "target_role": edge.action.target.by_role, } edge_summaries.append(summary) for node in wf.nodes: proc = node.template.window.process_name if proc: app_names.add(proc) return WorkflowSkeleton( skeleton_id=wf.workflow_id, name=wf.name, description=wf.description, learning_state=wf.learning_state, node_names=node_names, edge_summaries=edge_summaries, entry_nodes=wf.entry_nodes, end_nodes=wf.end_nodes, node_count=len(wf.nodes), edge_count=len(wf.edges), app_names=sorted(app_names), ) def _extract_prototype( self, node, source_hash: str, workflow_id: str ) -> Optional[ScreenPrototype]: """Extraire un ScreenPrototype anonymisé depuis un WorkflowNode.""" vector = _extract_prototype_vector(node) # On exporte même sans vecteur : les contraintes UI ont de la valeur app_name = node.template.window.process_name # Construire les contraintes nettoyées window_constraints = node.template.window.to_dict() text_constraints = self._sanitize_text_constraints(node.template.text.to_dict()) ui_constraints = node.template.ui.to_dict() return ScreenPrototype( prototype_id=f"{workflow_id}__{node.node_id}", vector=vector, provider=node.template.embedding.provider, app_name=app_name, window_constraints=window_constraints, text_constraints=text_constraints, ui_constraints=ui_constraints, sample_count=node.template.embedding.sample_count, source_hashes=[source_hash], ) def _sanitize_text_constraints(self, text_dict: Dict[str, Any]) -> Dict[str, Any]: """Nettoyer les contraintes texte en retirant les textes trop longs / sensibles.""" required = [ t for t in text_dict.get("required_texts", []) if _sanitize_text(t) is not None ] forbidden = [ t for t in text_dict.get("forbidden_texts", []) if _sanitize_text(t) is not None ] return {"required_texts": required, "forbidden_texts": forbidden} def _collect_app_signature( self, node, app_sigs: Dict[str, AppSignature] ) -> None: """Collecter la signature d'application depuis un node.""" proc = node.template.window.process_name if not proc: return if proc in app_sigs: app_sigs[proc].observation_count += 1 else: title_pattern = node.template.window.title_pattern patterns = [title_pattern] if title_pattern else [] app_sigs[proc] = AppSignature( app_name=proc, window_title_patterns=patterns, ) # Ajouter le pattern de titre s'il est nouveau title_pattern = node.template.window.title_pattern if title_pattern and title_pattern not in app_sigs[proc].window_title_patterns: app_sigs[proc].window_title_patterns.append(title_pattern) def _collect_ui_patterns( self, node, patterns: Dict[str, UIPattern] ) -> None: """Collecter les patterns UI depuis les contraintes d'un node.""" for role in node.template.ui.required_roles: key = role if key in patterns: patterns[key].observation_count += 1 else: title_pattern = node.template.window.title_pattern title_patterns = [title_pattern] if title_pattern else [] patterns[key] = UIPattern( pattern_id=f"uip_{role}", role=role, context_description=f"Rôle UI requis : {role}", window_title_patterns=title_patterns, ) def _collect_error_patterns( self, edge, patterns: Dict[str, ErrorPattern], wf ) -> None: """Extraire les patterns d'erreur depuis les PostConditions.fail_fast.""" for check in edge.post_conditions.fail_fast: if check.value and _sanitize_text(check.value) is not None: key = check.value if key in patterns: patterns[key].observation_count += 1 else: # Trouver l'app_name du node source source_node = wf.get_node(edge.from_node) app_name = None if source_node: app_name = source_node.template.window.process_name patterns[key] = ErrorPattern( pattern_id=f"err_{hashlib.md5(key.encode()).hexdigest()[:8]}", error_text=check.value, kind=check.kind, app_name=app_name, ) def _extract_edge_statistic(self, edge, wf) -> Optional[EdgeStatistic]: """Extraire les statistiques anonymisées d'un edge.""" source_node = wf.get_node(edge.from_node) target_node = wf.get_node(edge.to_node) from_name = source_node.name if source_node else edge.from_node to_name = target_node.name if target_node else edge.to_node return EdgeStatistic( from_node_name=from_name, to_node_name=to_name, action_type=edge.action.type, target_role=edge.action.target.by_role, execution_count=edge.stats.execution_count, success_rate=edge.stats.success_rate, avg_execution_time_ms=edge.stats.avg_execution_time_ms, ) # ============================================================================ # LearningPackMerger # ============================================================================ class LearningPackMerger: """ Fusionne plusieurs LearningPacks en un seul pack consolidé. La fusion : - Déduplique les prototypes similaires (cosine > 0.95 = même écran) - Fusionne les signatures d'application (union) - Fusionne les patterns d'erreur (union, comptage cross-clients) - Calcule les occurrences cross-clients (haute confiance si vu par N clients) Usage : >>> merger = LearningPackMerger() >>> merged = merger.merge([pack_a, pack_b, pack_c]) >>> merged.save(Path("global/merged_pack.json")) """ def __init__(self, dedup_threshold: float = DEDUP_COSINE_THRESHOLD): self.dedup_threshold = dedup_threshold def merge(self, packs: List[LearningPack]) -> LearningPack: """ Fusionner plusieurs packs en un pack global consolidé. Args: packs: Liste de LearningPacks à fusionner. Returns: LearningPack consolidé avec déduplication et comptage cross-clients. """ if not packs: return LearningPack( created_at=datetime.utcnow().isoformat(), pack_id=f"lp_merged_{uuid.uuid4().hex[:8]}", ) if len(packs) == 1: # Un seul pack : on le retourne avec un nouveau pack_id merged = LearningPack.from_dict(packs[0].to_dict()) merged.pack_id = f"lp_merged_{uuid.uuid4().hex[:8]}" return merged merged_id = f"lp_merged_{uuid.uuid4().hex[:8]}" source_hashes = list({p.source_hash for p in packs if p.source_hash}) # Fusionner chaque catégorie app_sigs = self._merge_app_signatures(packs) prototypes = self._merge_prototypes(packs) skeletons = self._merge_skeletons(packs) ui_patterns = self._merge_ui_patterns(packs) error_patterns = self._merge_error_patterns(packs) edge_stats = self._merge_edge_statistics(packs) # Calculer les stats globales total_wf = sum(p.stats.get("workflows_count", 0) for p in packs) total_nodes = sum(p.stats.get("total_nodes", 0) for p in packs) total_edges = sum(p.stats.get("total_edges", 0) for p in packs) all_apps = set() for p in packs: all_apps.update(p.stats.get("apps_seen", [])) return LearningPack( version=LEARNING_PACK_VERSION, created_at=datetime.utcnow().isoformat(), source_hash=",".join(sorted(source_hashes)), pack_id=merged_id, stats={ "workflows_count": total_wf, "total_nodes": total_nodes, "total_edges": total_edges, "apps_seen": sorted(all_apps), "prototypes_exported": len(prototypes), "source_packs_count": len(packs), "source_hashes": source_hashes, }, app_signatures=app_sigs, screen_prototypes=prototypes, workflow_skeletons=skeletons, ui_patterns=ui_patterns, error_patterns=error_patterns, edge_statistics=edge_stats, ) # ------------------------------------------------------------------ # Fusion par catégorie # ------------------------------------------------------------------ def _merge_app_signatures(self, packs: List[LearningPack]) -> List[AppSignature]: """Union des signatures d'application, cumul des compteurs.""" merged: Dict[str, AppSignature] = {} for pack in packs: for sig in pack.app_signatures: if sig.app_name in merged: existing = merged[sig.app_name] existing.observation_count += sig.observation_count for pat in sig.window_title_patterns: if pat not in existing.window_title_patterns: existing.window_title_patterns.append(pat) else: merged[sig.app_name] = AppSignature.from_dict(sig.to_dict()) return list(merged.values()) def _merge_prototypes(self, packs: List[LearningPack]) -> List[ScreenPrototype]: """ Fusionner les prototypes avec déduplication par similarité cosinus. Deux prototypes avec cosine > ``self.dedup_threshold`` sont considérés comme le même écran. On conserve celui avec le plus d'échantillons et on fusionne les source_hashes. """ all_protos: List[ScreenPrototype] = [] for pack in packs: all_protos.extend(pack.screen_prototypes) if not all_protos: return [] # Séparer les prototypes avec et sans vecteur with_vec: List[Tuple[ScreenPrototype, np.ndarray]] = [] without_vec: List[ScreenPrototype] = [] for proto in all_protos: if proto.vector is not None and len(proto.vector) > 0: vec = np.array(proto.vector, dtype=np.float32) norm = np.linalg.norm(vec) if norm > 0: vec = vec / norm with_vec.append((proto, vec)) else: without_vec.append(proto) # Déduplication greedy par similarité cosinus merged: List[ScreenPrototype] = [] used = [False] * len(with_vec) for i, (proto_i, vec_i) in enumerate(with_vec): if used[i]: continue used[i] = True # Chercher les prototypes similaires group_sources = set(proto_i.source_hashes) best_sample_count = proto_i.sample_count best_proto = proto_i for j in range(i + 1, len(with_vec)): if used[j]: continue proto_j, vec_j = with_vec[j] cosine_sim = float(np.dot(vec_i, vec_j)) if cosine_sim >= self.dedup_threshold: used[j] = True group_sources.update(proto_j.source_hashes) if proto_j.sample_count > best_sample_count: best_sample_count = proto_j.sample_count best_proto = proto_j # Construire le prototype consolidé consolidated = ScreenPrototype.from_dict(best_proto.to_dict()) consolidated.source_hashes = sorted(group_sources) consolidated.sample_count = best_sample_count merged.append(consolidated) # Ajouter les prototypes sans vecteur (pas de déduplication possible) merged.extend(without_vec) logger.info( "Fusion prototypes : %d entrées → %d après déduplication (seuil=%.2f)", len(all_protos), len(merged), self.dedup_threshold, ) return merged def _merge_skeletons(self, packs: List[LearningPack]) -> List[WorkflowSkeleton]: """Union des skeletons de workflows (dédupliqués par skeleton_id).""" merged: Dict[str, WorkflowSkeleton] = {} for pack in packs: for skel in pack.workflow_skeletons: if skel.skeleton_id not in merged: merged[skel.skeleton_id] = skel return list(merged.values()) def _merge_ui_patterns(self, packs: List[LearningPack]) -> List[UIPattern]: """Fusionner les patterns UI avec comptage cross-clients.""" merged: Dict[str, UIPattern] = {} # Suivre quels source_hashes ont vu chaque pattern pattern_sources: Dict[str, set] = {} for pack in packs: for pattern in pack.ui_patterns: key = pattern.role if key in merged: merged[key].observation_count += pattern.observation_count for pat in pattern.window_title_patterns: if pat not in merged[key].window_title_patterns: merged[key].window_title_patterns.append(pat) else: merged[key] = UIPattern.from_dict(pattern.to_dict()) pattern_sources[key] = set() if pack.source_hash: pattern_sources.setdefault(key, set()).add(pack.source_hash) # Mettre à jour le cross_client_count for key, pattern in merged.items(): sources = pattern_sources.get(key, set()) pattern.cross_client_count = len(sources) # Confiance = proportion de clients ayant vu le pattern total_clients = len({p.source_hash for p in packs if p.source_hash}) pattern.confidence = ( len(sources) / total_clients if total_clients > 0 else 0.0 ) return list(merged.values()) def _merge_error_patterns(self, packs: List[LearningPack]) -> List[ErrorPattern]: """Fusionner les patterns d'erreur avec comptage cross-clients.""" merged: Dict[str, ErrorPattern] = {} pattern_sources: Dict[str, set] = {} for pack in packs: for pattern in pack.error_patterns: key = pattern.error_text if key in merged: merged[key].observation_count += pattern.observation_count else: merged[key] = ErrorPattern.from_dict(pattern.to_dict()) pattern_sources[key] = set() if pack.source_hash: pattern_sources.setdefault(key, set()).add(pack.source_hash) for key, pattern in merged.items(): pattern.cross_client_count = len(pattern_sources.get(key, set())) return list(merged.values()) def _merge_edge_statistics( self, packs: List[LearningPack] ) -> List[EdgeStatistic]: """Fusionner les statistiques de transitions.""" merged: Dict[str, EdgeStatistic] = {} for pack in packs: for stat in pack.edge_statistics: key = f"{stat.from_node_name}→{stat.to_node_name}→{stat.action_type}" if key in merged: existing = merged[key] total_exec = existing.execution_count + stat.execution_count if total_exec > 0: # Moyenne pondérée du success_rate existing.success_rate = ( existing.success_rate * existing.execution_count + stat.success_rate * stat.execution_count ) / total_exec # Moyenne pondérée du temps d'exécution existing.avg_execution_time_ms = ( existing.avg_execution_time_ms * existing.execution_count + stat.avg_execution_time_ms * stat.execution_count ) / total_exec existing.execution_count = total_exec else: merged[key] = EdgeStatistic.from_dict(stat.to_dict()) return list(merged.values())