""" GlobalFAISSIndex — Index FAISS global fédérant les prototypes de tous les clients. Construit un index de recherche vectorielle à partir des Learning Packs reçus de multiples sites clients. Chaque vecteur indexé porte des métadonnées permettant de retrouver le pack source, le workflow et l'application d'origine. Cet index est utilisé par le serveur central (DGX Spark) pour : - Reconnaître instantanément un écran déjà vu chez un autre client - Proposer des workflows existants quand un nouveau client rencontre un écran familier - Mesurer la couverture applicative globale de Léa Auteur : Dom, Claude — 19 mars 2026 """ import json import logging from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional import numpy as np from .learning_pack import LearningPack, ScreenPrototype logger = logging.getLogger(__name__) # Dimensions par défaut des embeddings CLIP (ViT-B-32) DEFAULT_DIMENSIONS = 512 try: import faiss FAISS_AVAILABLE = True except ImportError: FAISS_AVAILABLE = False logger.warning("FAISS non installé — GlobalFAISSIndex désactivé. pip install faiss-cpu") @dataclass class GlobalSearchResult: """Résultat d'une recherche dans l'index global.""" prototype_id: str similarity: float pack_source_hash: str workflow_skeleton_id: str node_name: str app_name: str metadata: Dict[str, Any] = field(default_factory=dict) class GlobalFAISSIndex: """ Index FAISS global contenant les prototypes d'écran de tous les clients. Chaque vecteur est associé à des métadonnées : - pack_source_hash : hash du client source - workflow_skeleton_id : ID du workflow d'origine - node_name : nom du nœud (écran) dans le workflow - app_name : nom de l'application Usage : >>> index = GlobalFAISSIndex() >>> index.build_from_packs([pack_a, pack_b]) >>> results = index.search(query_vector, k=5) >>> index.save(Path("global/faiss_index")) """ def __init__(self, dimensions: int = DEFAULT_DIMENSIONS): """ Initialiser l'index global. Args: dimensions: Nombre de dimensions des vecteurs (512 pour CLIP ViT-B-32). """ if not FAISS_AVAILABLE: raise ImportError( "FAISS est requis pour GlobalFAISSIndex. " "Installer avec : pip install faiss-cpu" ) self.dimensions = dimensions self.index: Optional["faiss.IndexFlatIP"] = None self._metadata: List[Dict[str, Any]] = [] self._rebuild_index() def _rebuild_index(self) -> None: """Créer ou recréer l'index FAISS vide.""" # IndexFlatIP pour similarité cosinus (vecteurs normalisés) self.index = faiss.IndexFlatIP(self.dimensions) self._metadata = [] @property def total_vectors(self) -> int: """Nombre de vecteurs dans l'index.""" return self.index.ntotal if self.index is not None else 0 # ------------------------------------------------------------------ # Construction depuis les Learning Packs # ------------------------------------------------------------------ def build_from_packs(self, packs: List[LearningPack]) -> int: """ Construire l'index à partir d'une liste de Learning Packs. Remplace le contenu existant de l'index. Args: packs: Liste de LearningPacks à indexer. Returns: Nombre de vecteurs ajoutés à l'index. """ self._rebuild_index() vectors = [] metadata_list = [] for pack in packs: for proto in pack.screen_prototypes: vec = self._proto_to_vector(proto) if vec is None: continue meta = { "prototype_id": proto.prototype_id, "pack_source_hash": pack.source_hash, "workflow_skeleton_id": self._extract_skeleton_id(proto), "node_name": self._extract_node_name(proto), "app_name": proto.app_name or "", } vectors.append(vec) metadata_list.append(meta) if not vectors: logger.info("Aucun vecteur valide trouvé dans les packs.") return 0 # Empiler et normaliser les vecteurs matrix = np.array(vectors, dtype=np.float32) faiss.normalize_L2(matrix) # Ajouter à l'index self.index.add(matrix) self._metadata = metadata_list logger.info( "Index global construit : %d vecteurs depuis %d packs", len(vectors), len(packs), ) return len(vectors) def add_pack(self, pack: LearningPack) -> int: """ Ajouter les prototypes d'un pack à l'index existant (incrémental). Args: pack: LearningPack à ajouter. Returns: Nombre de vecteurs ajoutés. """ vectors = [] metadata_list = [] for proto in pack.screen_prototypes: vec = self._proto_to_vector(proto) if vec is None: continue meta = { "prototype_id": proto.prototype_id, "pack_source_hash": pack.source_hash, "workflow_skeleton_id": self._extract_skeleton_id(proto), "node_name": self._extract_node_name(proto), "app_name": proto.app_name or "", } vectors.append(vec) metadata_list.append(meta) if not vectors: return 0 matrix = np.array(vectors, dtype=np.float32) faiss.normalize_L2(matrix) self.index.add(matrix) self._metadata.extend(metadata_list) logger.info( "Pack ajouté à l'index global : +%d vecteurs (total=%d)", len(vectors), self.total_vectors, ) return len(vectors) # ------------------------------------------------------------------ # Recherche # ------------------------------------------------------------------ def search( self, query_vector: np.ndarray, k: int = 5 ) -> List[GlobalSearchResult]: """ Chercher les k écrans les plus similaires dans l'index global. Args: query_vector: Vecteur de requête (même dimension que l'index). k: Nombre de résultats à retourner. Returns: Liste de GlobalSearchResult triée par similarité décroissante. """ if self.total_vectors == 0: return [] # Préparer le vecteur q = np.array(query_vector, dtype=np.float32).reshape(1, -1) faiss.normalize_L2(q) k = min(k, self.total_vectors) distances, indices = self.index.search(q, k) results = [] for dist, idx in zip(distances[0], indices[0]): if idx < 0 or idx >= len(self._metadata): continue meta = self._metadata[int(idx)] results.append(GlobalSearchResult( prototype_id=meta["prototype_id"], similarity=float(dist), pack_source_hash=meta["pack_source_hash"], workflow_skeleton_id=meta["workflow_skeleton_id"], node_name=meta["node_name"], app_name=meta["app_name"], metadata=meta, )) return results # ------------------------------------------------------------------ # Persistance # ------------------------------------------------------------------ def save(self, path: Path) -> None: """ Sauvegarder l'index et ses métadonnées. Crée deux fichiers : - ``{path}.faiss`` — index FAISS binaire - ``{path}.meta.json`` — métadonnées JSON Args: path: Chemin de base (sans extension). """ path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) index_path = path.with_suffix(".faiss") meta_path = path.with_suffix(".meta.json") faiss.write_index(self.index, str(index_path)) meta_data = { "dimensions": self.dimensions, "total_vectors": self.total_vectors, "entries": self._metadata, } with open(meta_path, "w", encoding="utf-8") as fh: json.dump(meta_data, fh, indent=2, ensure_ascii=False) logger.info( "Index global sauvegardé : %s (%d vecteurs)", index_path, self.total_vectors, ) @classmethod def load(cls, path: Path) -> "GlobalFAISSIndex": """ Charger un index depuis le disque. Args: path: Chemin de base (sans extension). Returns: GlobalFAISSIndex chargé et prêt à l'emploi. """ if not FAISS_AVAILABLE: raise ImportError("FAISS requis pour charger l'index global.") path = Path(path) index_path = path.with_suffix(".faiss") meta_path = path.with_suffix(".meta.json") with open(meta_path, "r", encoding="utf-8") as fh: meta_data = json.load(fh) dimensions = meta_data.get("dimensions", DEFAULT_DIMENSIONS) instance = cls.__new__(cls) instance.dimensions = dimensions instance.index = faiss.read_index(str(index_path)) instance._metadata = meta_data.get("entries", []) logger.info( "Index global chargé : %s (%d vecteurs, %dd)", index_path, instance.total_vectors, dimensions, ) return instance def get_stats(self) -> Dict[str, Any]: """Statistiques de l'index global.""" source_hashes = set() app_names = set() for meta in self._metadata: source_hashes.add(meta.get("pack_source_hash", "")) app_name = meta.get("app_name", "") if app_name: app_names.add(app_name) return { "dimensions": self.dimensions, "total_vectors": self.total_vectors, "unique_sources": len(source_hashes), "unique_apps": sorted(app_names), } # ------------------------------------------------------------------ # Utilitaires internes # ------------------------------------------------------------------ def _proto_to_vector(self, proto: ScreenPrototype) -> Optional[np.ndarray]: """Convertir un ScreenPrototype en vecteur numpy, ou None si absent.""" if proto.vector is None or len(proto.vector) == 0: return None vec = np.array(proto.vector, dtype=np.float32) if vec.shape[0] != self.dimensions: logger.warning( "Prototype %s : dimensions incorrectes (%d != %d), ignoré", proto.prototype_id, vec.shape[0], self.dimensions, ) return None return vec @staticmethod def _extract_skeleton_id(proto: ScreenPrototype) -> str: """Extraire le workflow_id depuis le prototype_id (format: workflow_id__node_id).""" parts = proto.prototype_id.split("__", 1) return parts[0] if len(parts) >= 1 else "" @staticmethod def _extract_node_name(proto: ScreenPrototype) -> str: """Extraire le node_id depuis le prototype_id.""" parts = proto.prototype_id.split("__", 1) return parts[1] if len(parts) >= 2 else proto.prototype_id