""" FAISSManager - Gestion d'Index FAISS pour Recherche de Similarité Gère l'indexation et la recherche rapide d'embeddings avec FAISS. Supporte sauvegarde/chargement d'index et métadonnées. """ import logging from typing import List, Dict, Optional, Tuple, Any from pathlib import Path from dataclasses import dataclass import numpy as np import json import pickle logger = logging.getLogger(__name__) try: import faiss FAISS_AVAILABLE = True except ImportError: FAISS_AVAILABLE = False logger.warning("FAISS not installed. Install with: pip install faiss-cpu") @dataclass class SearchResult: """Résultat d'une recherche de similarité""" embedding_id: str similarity: float # Similarité cosinus distance: float # Distance L2 metadata: Dict[str, Any] class FAISSManager: """ Gestionnaire d'index FAISS Gère l'ajout, la recherche et la persistence d'embeddings avec FAISS. Maintient un mapping entre IDs FAISS et métadonnées. Features d'optimisation: - Migration automatique Flat → IVF pour >10k embeddings - Entraînement automatique de l'index IVF - Support GPU si disponible - Optimisation périodique de l'index """ def __init__(self, dimensions: int, index_type: str = "Flat", metric: str = "cosine", nlist: Optional[int] = None, nprobe: int = 8, use_gpu: bool = False, auto_optimize: bool = True): """ Initialiser le gestionnaire FAISS Args: dimensions: Nombre de dimensions des vecteurs index_type: Type d'index FAISS ("Flat", "IVF", "HNSW") metric: Métrique de distance ("cosine", "l2", "ip") nlist: Nombre de clusters pour IVF (auto si None) nprobe: Nombre de clusters à visiter lors de la recherche IVF use_gpu: Utiliser GPU si disponible auto_optimize: Migrer automatiquement vers IVF si >10k embeddings Raises: ImportError: Si FAISS n'est pas installé """ if not FAISS_AVAILABLE: raise ImportError( "FAISS is required but not installed. " "Install with: pip install faiss-cpu" ) self.dimensions = dimensions self.index_type = index_type self.metric = metric self.nlist = nlist self.nprobe = nprobe self.use_gpu = use_gpu self.auto_optimize = auto_optimize # Mapping ID FAISS -> métadonnées self.metadata_store: Dict[int, Dict[str, Any]] = {} # Compteur pour IDs FAISS self.next_id = 0 # Vecteurs pour entraînement IVF (si nécessaire) self.training_vectors: List[np.ndarray] = [] self.is_trained = (index_type == "Flat") # Flat n'a pas besoin d'entraînement # Seuil pour migration automatique self.migration_threshold = 10000 # GPU resources self.gpu_resources = None if use_gpu: self._setup_gpu() # Créer l'index FAISS (après avoir initialisé tous les attributs) self.index = self._create_index() def _setup_gpu(self): """Configurer les ressources GPU si disponibles""" try: # Vérifier si GPU est disponible ngpus = faiss.get_num_gpus() if ngpus > 0: self.gpu_resources = faiss.StandardGpuResources() logger.info(f"FAISS GPU enabled: {ngpus} GPU(s) available") else: logger.warning("FAISS GPU requested but no GPU available, using CPU") self.use_gpu = False except Exception as e: logger.warning(f"FAISS GPU setup failed: {e}, using CPU") self.use_gpu = False def _calculate_nlist(self, n_vectors: int) -> int: """ Calculer le nombre optimal de clusters pour IVF Règle empirique: nlist = sqrt(n_vectors) Minimum: 100, Maximum: 65536 Args: n_vectors: Nombre de vecteurs dans l'index Returns: Nombre optimal de clusters """ if self.nlist is not None: return self.nlist # Règle empirique nlist = int(np.sqrt(n_vectors)) # Contraintes nlist = max(100, min(nlist, 65536)) return nlist def _create_index(self) -> 'faiss.Index': """Créer un index FAISS selon la configuration""" if self.metric == "cosine": # Pour cosine similarity, normaliser et utiliser inner product if self.index_type == "Flat": index = faiss.IndexFlatIP(self.dimensions) elif self.index_type == "IVF": # Calculer nlist optimal nlist = self._calculate_nlist(max(1000, self.migration_threshold)) quantizer = faiss.IndexFlatIP(self.dimensions) index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) # Configurer nprobe index.nprobe = self.nprobe # Activer DirectMap pour permettre reconstruct() index.make_direct_map() elif self.index_type == "HNSW": index = faiss.IndexHNSWFlat(self.dimensions, 32) else: raise ValueError(f"Unknown index type: {self.index_type}") elif self.metric == "l2": if self.index_type == "Flat": index = faiss.IndexFlatL2(self.dimensions) elif self.index_type == "IVF": # Calculer nlist optimal nlist = self._calculate_nlist(max(1000, self.migration_threshold)) quantizer = faiss.IndexFlatL2(self.dimensions) index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) # Configurer nprobe index.nprobe = self.nprobe # Activer DirectMap pour permettre reconstruct() index.make_direct_map() elif self.index_type == "HNSW": index = faiss.IndexHNSWFlat(self.dimensions, 32) else: raise ValueError(f"Unknown index type: {self.index_type}") elif self.metric == "ip": # Inner product if self.index_type == "Flat": index = faiss.IndexFlatIP(self.dimensions) else: raise ValueError(f"Inner product only supports Flat index") else: raise ValueError(f"Unknown metric: {self.metric}") # Migrer vers GPU si demandé if self.use_gpu and self.gpu_resources is not None: try: index = faiss.index_cpu_to_gpu(self.gpu_resources, 0, index) except Exception as e: logger.warning(f"Failed to move index to GPU: {e}, using CPU") return index def add_embedding(self, embedding_id: str, vector: np.ndarray, metadata: Optional[Dict[str, Any]] = None) -> int: """ Ajouter un embedding à l'index Args: embedding_id: ID unique de l'embedding vector: Vecteur d'embedding (dimensions doivent correspondre) metadata: Métadonnées associées (optionnel) Returns: ID FAISS assigné Raises: ValueError: Si dimensions ne correspondent pas """ if vector.shape[0] != self.dimensions: raise ValueError( f"Vector dimensions mismatch: expected {self.dimensions}, " f"got {vector.shape[0]}" ) # Convertir en float32 d'abord vector_float32 = vector.astype(np.float32) # Normaliser si métrique cosine if self.metric == "cosine": norm = np.linalg.norm(vector_float32) if norm > 0: vector_float32 = vector_float32 / norm # Reshape pour FAISS vector_reshaped = vector_float32.reshape(1, -1) # Pour IVF, stocker vecteurs pour entraînement si pas encore entraîné if self.index_type == "IVF" and not self.is_trained: self.training_vectors.append(vector_float32) # Stocker le vecteur normalisé # Entraîner si on a assez de vecteurs if len(self.training_vectors) >= 100: self._train_ivf_index() # Les vecteurs d'entraînement ont déjà été ajoutés dans _train_ivf_index # Ne pas ajouter à nouveau elif self.is_trained: # Ajouter à l'index (seulement si entraîné pour IVF ou si Flat) self.index.add(vector_reshaped) # Stocker métadonnées faiss_id = self.next_id self.metadata_store[faiss_id] = { "embedding_id": embedding_id, "metadata": metadata or {} } self.next_id += 1 # Vérifier si migration automatique nécessaire if self.auto_optimize and self.index_type == "Flat": if self.index.ntotal >= self.migration_threshold: self._migrate_to_ivf() return faiss_id def _train_ivf_index(self): """Entraîner l'index IVF avec les vecteurs collectés""" if self.is_trained or self.index_type != "IVF": return if len(self.training_vectors) < 100: logger.warning(f" Training IVF with only {len(self.training_vectors)} vectors") # Convertir en array numpy training_data = np.array(self.training_vectors, dtype=np.float32) logger.info(f"Training IVF index with {len(self.training_vectors)} vectors...") # Entraîner l'index self.index.train(training_data) self.is_trained = True # Ajouter tous les vecteurs d'entraînement à l'index self.index.add(training_data) # Libérer mémoire self.training_vectors.clear() logger.info(f"IVF index trained successfully with nlist={self.index.nlist}") def _migrate_to_ivf(self): """ Migrer automatiquement de Flat vers IVF Appelé automatiquement quand l'index Flat dépasse le seuil. """ if self.index_type != "Flat": return logger.info(f"Migrating from Flat to IVF (current size: {self.index.ntotal})...") # Extraire tous les vecteurs de l'index Flat n_vectors = self.index.ntotal vectors = np.zeros((n_vectors, self.dimensions), dtype=np.float32) for i in range(n_vectors): vectors[i] = self.index.reconstruct(int(i)) # Calculer nlist optimal nlist = self._calculate_nlist(n_vectors) # Créer nouvel index IVF if self.metric == "cosine": quantizer = faiss.IndexFlatIP(self.dimensions) new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) else: # l2 quantizer = faiss.IndexFlatL2(self.dimensions) new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) new_index.nprobe = self.nprobe new_index.make_direct_map() # Activer DirectMap # Entraîner avec tous les vecteurs new_index.train(vectors) # Ajouter tous les vecteurs new_index.add(vectors) # Remplacer l'index self.index = new_index self.index_type = "IVF" self.is_trained = True logger.info(f"Migration complete: IVF index with nlist={nlist}, nprobe={self.nprobe}") def optimize_index(self): """ Optimiser l'index périodiquement Pour IVF: Recalculer nlist optimal et réentraîner si nécessaire """ if self.index_type != "IVF" or not self.is_trained: return n_vectors = self.index.ntotal if n_vectors < 100: return # Calculer nlist optimal pour la taille actuelle optimal_nlist = self._calculate_nlist(n_vectors) # Si nlist actuel est très différent, reconstruire current_nlist = self.index.nlist if abs(optimal_nlist - current_nlist) / current_nlist > 0.5: logger.info(f"Optimizing IVF index: {current_nlist} → {optimal_nlist} clusters") # Extraire tous les vecteurs vectors = np.zeros((n_vectors, self.dimensions), dtype=np.float32) for i in range(n_vectors): vectors[i] = self.index.reconstruct(int(i)) # Créer nouvel index avec nlist optimal if self.metric == "cosine": quantizer = faiss.IndexFlatIP(self.dimensions) new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, optimal_nlist) else: quantizer = faiss.IndexFlatL2(self.dimensions) new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, optimal_nlist) new_index.nprobe = self.nprobe new_index.make_direct_map() # Activer DirectMap # Entraîner et ajouter new_index.train(vectors) new_index.add(vectors) # Remplacer self.index = new_index logger.info("Index optimized successfully") def search_similar(self, query_vector: np.ndarray, k: int = 5, min_similarity: Optional[float] = None) -> List[SearchResult]: """ Rechercher les k embeddings les plus similaires Args: query_vector: Vecteur de requête k: Nombre de résultats à retourner min_similarity: Similarité minimale (optionnel, pour cosine) Returns: Liste de SearchResult triés par similarité décroissante Raises: ValueError: Si dimensions ne correspondent pas """ if query_vector.shape[0] != self.dimensions: raise ValueError( f"Query vector dimensions mismatch: expected {self.dimensions}, " f"got {query_vector.shape[0]}" ) if self.index.ntotal == 0: return [] # Index vide # Normaliser si métrique cosine if self.metric == "cosine": norm = np.linalg.norm(query_vector) if norm > 0: query_vector = query_vector / norm # Convertir en float32 et reshape query_vector = query_vector.astype(np.float32).reshape(1, -1) # Rechercher k = min(k, self.index.ntotal) # Ne pas demander plus que disponible distances, indices = self.index.search(query_vector, k) # Convertir en SearchResults results = [] for dist, idx in zip(distances[0], indices[0]): if idx == -1: # Pas de résultat continue # Récupérer métadonnées meta = self.metadata_store.get(int(idx), {}) # Convertir distance en similarité if self.metric == "cosine": # Pour inner product avec vecteurs normalisés, distance = similarité similarity = float(dist) elif self.metric == "l2": # Convertir distance L2 en similarité approximative similarity = 1.0 / (1.0 + float(dist)) else: similarity = float(dist) # Filtrer par similarité minimale if min_similarity is not None and similarity < min_similarity: continue results.append(SearchResult( embedding_id=meta.get("embedding_id", f"unknown_{idx}"), similarity=similarity, distance=float(dist), metadata=meta.get("metadata", {}) )) return results def remove_embedding(self, faiss_id: int) -> bool: """ Supprimer un embedding de l'index Note: FAISS ne supporte pas la suppression directe. Cette méthode supprime juste les métadonnées. Pour vraiment supprimer, il faut reconstruire l'index. Args: faiss_id: ID FAISS de l'embedding Returns: True si supprimé, False si non trouvé """ if faiss_id in self.metadata_store: del self.metadata_store[faiss_id] return True return False def get_metadata(self, faiss_id: int) -> Optional[Dict[str, Any]]: """Récupérer les métadonnées d'un embedding""" return self.metadata_store.get(faiss_id) def save(self, index_path: Path, metadata_path: Path) -> None: """ Sauvegarder l'index et les métadonnées Args: index_path: Chemin pour sauvegarder l'index FAISS metadata_path: Chemin pour sauvegarder les métadonnées """ # Créer répertoires si nécessaire index_path.parent.mkdir(parents=True, exist_ok=True) metadata_path.parent.mkdir(parents=True, exist_ok=True) # Si GPU, ramener sur CPU avant sauvegarde index_to_save = self.index if self.use_gpu: try: index_to_save = faiss.index_gpu_to_cpu(self.index) except (RuntimeError, AttributeError): pass # Déjà sur CPU ou pas de GPU # Sauvegarder index FAISS faiss.write_index(index_to_save, str(index_path)) # Sauvegarder métadonnées metadata = { "dimensions": self.dimensions, "index_type": self.index_type, "metric": self.metric, "next_id": self.next_id, "metadata_store": self.metadata_store, "nlist": self.nlist, "nprobe": self.nprobe, "is_trained": self.is_trained, "auto_optimize": self.auto_optimize } with open(metadata_path, 'wb') as f: pickle.dump(metadata, f) @classmethod def load(cls, index_path: Path, metadata_path: Path, use_gpu: bool = False) -> 'FAISSManager': """ Charger un index et ses métadonnées Args: index_path: Chemin de l'index FAISS metadata_path: Chemin des métadonnées use_gpu: Charger sur GPU si disponible Returns: FAISSManager chargé """ # Charger métadonnées with open(metadata_path, 'rb') as f: metadata = pickle.load(f) # Créer instance manager = cls( dimensions=metadata["dimensions"], index_type=metadata["index_type"], metric=metadata["metric"], nlist=metadata.get("nlist"), nprobe=metadata.get("nprobe", 8), use_gpu=use_gpu, auto_optimize=metadata.get("auto_optimize", True) ) # Charger index FAISS manager.index = faiss.read_index(str(index_path)) # Migrer vers GPU si demandé if use_gpu and manager.gpu_resources is not None: try: manager.index = faiss.index_cpu_to_gpu(manager.gpu_resources, 0, manager.index) except Exception as e: logger.warning(f"Failed to move loaded index to GPU: {e}") # Restaurer métadonnées manager.next_id = metadata["next_id"] manager.metadata_store = metadata["metadata_store"] manager.is_trained = metadata.get("is_trained", True) return manager def get_stats(self) -> Dict[str, Any]: """Récupérer statistiques de l'index""" stats = { "dimensions": self.dimensions, "index_type": self.index_type, "metric": self.metric, "total_vectors": self.index.ntotal, "metadata_count": len(self.metadata_store), "is_trained": self.is_trained, "use_gpu": self.use_gpu } # Ajouter stats spécifiques IVF if self.index_type == "IVF" and self.is_trained: stats["nlist"] = self.index.nlist stats["nprobe"] = self.index.nprobe # Calculer nlist optimal pour comparaison if self.index.ntotal > 0: optimal_nlist = self._calculate_nlist(self.index.ntotal) stats["optimal_nlist"] = optimal_nlist stats["nlist_efficiency"] = min(1.0, self.index.nlist / optimal_nlist) return stats def clear(self) -> None: """ Vider complètement l'index + reset état d'entraînement. Auteur : Dom, Alice Kiro - 22 décembre 2025 Amélioration pour FAISS Rebuild Propre: - Reset complet de l'état IVF training - Réinitialisation des training_vectors - Gestion correcte du flag is_trained selon le type d'index """ self.index = self._create_index() self.metadata_store.clear() self.next_id = 0 # IMPORTANT: reset IVF training state self.training_vectors.clear() self.is_trained = (self.index_type == "Flat") def reindex(self, items, force_train_ivf: bool = True) -> int: """ Reconstruit l'index à partir d'une source canonique (vecteurs). Auteur : Dom, Alice Kiro - 22 décembre 2025 Stratégie FAISS Rebuild Propre: "1 prototype = 1 entrée" - Clear complet avant reconstruction - Ajout sécurisé avec validation des vecteurs - Force training IVF même pour petits volumes - Retour du nombre d'éléments indexés Args: items: Iterable[(embedding_id: str, vector: np.ndarray, metadata: dict)] force_train_ivf: Forcer l'entraînement IVF même avec peu de vecteurs Returns: Nombre d'items indexés avec succès """ logger.info(f"FAISS reindex started with force_train_ivf={force_train_ivf}") # Clear complet avant reconstruction self.clear() count = 0 for embedding_id, vector, metadata in items: if vector is None: logger.debug(f"Skipping None vector for {embedding_id}") continue try: self.add_embedding(embedding_id, vector, metadata or {}) count += 1 except Exception as e: logger.warning(f"Failed to add embedding {embedding_id}: {e}") continue # Si IVF + petit volume, add_embedding ne déclenche pas forcément l'entraînement if (self.index_type == "IVF" and force_train_ivf and (not self.is_trained) and self.training_vectors): logger.info(f"Force training IVF with {len(self.training_vectors)} vectors") self._train_ivf_index() logger.info(f"FAISS reindex completed: {count} items indexed") return count def rebuild_index(self) -> None: """ Reconstruire l'index depuis les métadonnées Utile après suppressions pour compacter l'index. Note: Nécessite d'avoir les vecteurs originaux. """ # TODO: Implémenter si nécessaire # Nécessiterait de stocker les vecteurs dans metadata_store raise NotImplementedError("Rebuild not yet implemented") # ============================================================================ # Fonctions utilitaires # ============================================================================ def create_flat_index(dimensions: int, metric: str = "cosine") -> FAISSManager: """ Créer un index FAISS Flat (recherche exhaustive) Args: dimensions: Nombre de dimensions metric: Métrique ("cosine", "l2", "ip") Returns: FAISSManager configuré """ return FAISSManager(dimensions=dimensions, index_type="Flat", metric=metric) def create_ivf_index(dimensions: int, metric: str = "cosine") -> FAISSManager: """ Créer un index FAISS IVF (recherche approximative rapide) Args: dimensions: Nombre de dimensions metric: Métrique ("cosine", "l2") Returns: FAISSManager configuré """ return FAISSManager(dimensions=dimensions, index_type="IVF", metric=metric)