diff --git a/core/embedding/faiss_manager.py b/core/embedding/faiss_manager.py new file mode 100644 index 000000000..0acfca368 --- /dev/null +++ b/core/embedding/faiss_manager.py @@ -0,0 +1,692 @@ +""" +FAISSManager - Gestion d'Index FAISS pour Recherche de Similarité + +Gère l'indexation et la recherche rapide d'embeddings avec FAISS. +Supporte sauvegarde/chargement d'index et métadonnées. +""" + +import logging +from typing import List, Dict, Optional, Tuple, Any +from pathlib import Path +from dataclasses import dataclass +import numpy as np +import json +import pickle + +logger = logging.getLogger(__name__) + +try: + import faiss + FAISS_AVAILABLE = True +except ImportError: + FAISS_AVAILABLE = False + logger.warning("FAISS not installed. Install with: pip install faiss-cpu") + + +@dataclass +class SearchResult: + """Résultat d'une recherche de similarité""" + embedding_id: str + similarity: float # Similarité cosinus + distance: float # Distance L2 + metadata: Dict[str, Any] + + +class FAISSManager: + """ + Gestionnaire d'index FAISS + + Gère l'ajout, la recherche et la persistence d'embeddings avec FAISS. + Maintient un mapping entre IDs FAISS et métadonnées. + + Features d'optimisation: + - Migration automatique Flat → IVF pour >10k embeddings + - Entraînement automatique de l'index IVF + - Support GPU si disponible + - Optimisation périodique de l'index + """ + + def __init__(self, + dimensions: int, + index_type: str = "Flat", + metric: str = "cosine", + nlist: Optional[int] = None, + nprobe: int = 8, + use_gpu: bool = False, + auto_optimize: bool = True): + """ + Initialiser le gestionnaire FAISS + + Args: + dimensions: Nombre de dimensions des vecteurs + index_type: Type d'index FAISS ("Flat", "IVF", "HNSW") + metric: Métrique de distance ("cosine", "l2", "ip") + nlist: Nombre de clusters pour IVF (auto si None) + nprobe: Nombre de clusters à visiter lors de la recherche IVF + use_gpu: Utiliser GPU si disponible + auto_optimize: Migrer automatiquement vers IVF si >10k embeddings + + Raises: + ImportError: Si FAISS n'est pas installé + """ + if not FAISS_AVAILABLE: + raise ImportError( + "FAISS is required but not installed. " + "Install with: pip install faiss-cpu" + ) + + self.dimensions = dimensions + self.index_type = index_type + self.metric = metric + self.nlist = nlist + self.nprobe = nprobe + self.use_gpu = use_gpu + self.auto_optimize = auto_optimize + + # Mapping ID FAISS -> métadonnées + self.metadata_store: Dict[int, Dict[str, Any]] = {} + + # Compteur pour IDs FAISS + self.next_id = 0 + + # Vecteurs pour entraînement IVF (si nécessaire) + self.training_vectors: List[np.ndarray] = [] + self.is_trained = (index_type == "Flat") # Flat n'a pas besoin d'entraînement + + # Seuil pour migration automatique + self.migration_threshold = 10000 + + # GPU resources + self.gpu_resources = None + if use_gpu: + self._setup_gpu() + + # Créer l'index FAISS (après avoir initialisé tous les attributs) + self.index = self._create_index() + + def _setup_gpu(self): + """Configurer les ressources GPU si disponibles""" + try: + # Vérifier si GPU est disponible + ngpus = faiss.get_num_gpus() + if ngpus > 0: + self.gpu_resources = faiss.StandardGpuResources() + logger.info(f"FAISS GPU enabled: {ngpus} GPU(s) available") + else: + logger.warning("FAISS GPU requested but no GPU available, using CPU") + self.use_gpu = False + except Exception as e: + logger.warning(f"FAISS GPU setup failed: {e}, using CPU") + self.use_gpu = False + + def _calculate_nlist(self, n_vectors: int) -> int: + """ + Calculer le nombre optimal de clusters pour IVF + + Règle empirique: nlist = sqrt(n_vectors) + Minimum: 100, Maximum: 65536 + + Args: + n_vectors: Nombre de vecteurs dans l'index + + Returns: + Nombre optimal de clusters + """ + if self.nlist is not None: + return self.nlist + + # Règle empirique + nlist = int(np.sqrt(n_vectors)) + + # Contraintes + nlist = max(100, min(nlist, 65536)) + + return nlist + + def _create_index(self) -> 'faiss.Index': + """Créer un index FAISS selon la configuration""" + if self.metric == "cosine": + # Pour cosine similarity, normaliser et utiliser inner product + if self.index_type == "Flat": + index = faiss.IndexFlatIP(self.dimensions) + elif self.index_type == "IVF": + # Calculer nlist optimal + nlist = self._calculate_nlist(max(1000, self.migration_threshold)) + quantizer = faiss.IndexFlatIP(self.dimensions) + index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) + # Configurer nprobe + index.nprobe = self.nprobe + # Activer DirectMap pour permettre reconstruct() + index.make_direct_map() + elif self.index_type == "HNSW": + index = faiss.IndexHNSWFlat(self.dimensions, 32) + else: + raise ValueError(f"Unknown index type: {self.index_type}") + + elif self.metric == "l2": + if self.index_type == "Flat": + index = faiss.IndexFlatL2(self.dimensions) + elif self.index_type == "IVF": + # Calculer nlist optimal + nlist = self._calculate_nlist(max(1000, self.migration_threshold)) + quantizer = faiss.IndexFlatL2(self.dimensions) + index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) + # Configurer nprobe + index.nprobe = self.nprobe + # Activer DirectMap pour permettre reconstruct() + index.make_direct_map() + elif self.index_type == "HNSW": + index = faiss.IndexHNSWFlat(self.dimensions, 32) + else: + raise ValueError(f"Unknown index type: {self.index_type}") + + elif self.metric == "ip": # Inner product + if self.index_type == "Flat": + index = faiss.IndexFlatIP(self.dimensions) + else: + raise ValueError(f"Inner product only supports Flat index") + + else: + raise ValueError(f"Unknown metric: {self.metric}") + + # Migrer vers GPU si demandé + if self.use_gpu and self.gpu_resources is not None: + try: + index = faiss.index_cpu_to_gpu(self.gpu_resources, 0, index) + except Exception as e: + logger.warning(f"Failed to move index to GPU: {e}, using CPU") + + return index + + def add_embedding(self, + embedding_id: str, + vector: np.ndarray, + metadata: Optional[Dict[str, Any]] = None) -> int: + """ + Ajouter un embedding à l'index + + Args: + embedding_id: ID unique de l'embedding + vector: Vecteur d'embedding (dimensions doivent correspondre) + metadata: Métadonnées associées (optionnel) + + Returns: + ID FAISS assigné + + Raises: + ValueError: Si dimensions ne correspondent pas + """ + if vector.shape[0] != self.dimensions: + raise ValueError( + f"Vector dimensions mismatch: expected {self.dimensions}, " + f"got {vector.shape[0]}" + ) + + # Convertir en float32 d'abord + vector_float32 = vector.astype(np.float32) + + # Normaliser si métrique cosine + if self.metric == "cosine": + norm = np.linalg.norm(vector_float32) + if norm > 0: + vector_float32 = vector_float32 / norm + + # Reshape pour FAISS + vector_reshaped = vector_float32.reshape(1, -1) + + # Pour IVF, stocker vecteurs pour entraînement si pas encore entraîné + if self.index_type == "IVF" and not self.is_trained: + self.training_vectors.append(vector_float32) # Stocker le vecteur normalisé + + # Entraîner si on a assez de vecteurs + if len(self.training_vectors) >= 100: + self._train_ivf_index() + # Les vecteurs d'entraînement ont déjà été ajoutés dans _train_ivf_index + # Ne pas ajouter à nouveau + elif self.is_trained: + # Ajouter à l'index (seulement si entraîné pour IVF ou si Flat) + self.index.add(vector_reshaped) + + # Stocker métadonnées + faiss_id = self.next_id + self.metadata_store[faiss_id] = { + "embedding_id": embedding_id, + "metadata": metadata or {} + } + + self.next_id += 1 + + # Vérifier si migration automatique nécessaire + if self.auto_optimize and self.index_type == "Flat": + if self.index.ntotal >= self.migration_threshold: + self._migrate_to_ivf() + + return faiss_id + + def _train_ivf_index(self): + """Entraîner l'index IVF avec les vecteurs collectés""" + if self.is_trained or self.index_type != "IVF": + return + + if len(self.training_vectors) < 100: + logger.warning(f" Training IVF with only {len(self.training_vectors)} vectors") + + # Convertir en array numpy + training_data = np.array(self.training_vectors, dtype=np.float32) + + logger.info(f"Training IVF index with {len(self.training_vectors)} vectors...") + + # Entraîner l'index + self.index.train(training_data) + self.is_trained = True + + # Ajouter tous les vecteurs d'entraînement à l'index + self.index.add(training_data) + + # Libérer mémoire + self.training_vectors.clear() + + logger.info(f"IVF index trained successfully with nlist={self.index.nlist}") + + def _migrate_to_ivf(self): + """ + Migrer automatiquement de Flat vers IVF + + Appelé automatiquement quand l'index Flat dépasse le seuil. + """ + if self.index_type != "Flat": + return + + logger.info(f"Migrating from Flat to IVF (current size: {self.index.ntotal})...") + + # Extraire tous les vecteurs de l'index Flat + n_vectors = self.index.ntotal + vectors = np.zeros((n_vectors, self.dimensions), dtype=np.float32) + + for i in range(n_vectors): + vectors[i] = self.index.reconstruct(int(i)) + + # Calculer nlist optimal + nlist = self._calculate_nlist(n_vectors) + + # Créer nouvel index IVF + if self.metric == "cosine": + quantizer = faiss.IndexFlatIP(self.dimensions) + new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) + else: # l2 + quantizer = faiss.IndexFlatL2(self.dimensions) + new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist) + + new_index.nprobe = self.nprobe + new_index.make_direct_map() # Activer DirectMap + + # Entraîner avec tous les vecteurs + new_index.train(vectors) + + # Ajouter tous les vecteurs + new_index.add(vectors) + + # Remplacer l'index + self.index = new_index + self.index_type = "IVF" + self.is_trained = True + + logger.info(f"Migration complete: IVF index with nlist={nlist}, nprobe={self.nprobe}") + + def optimize_index(self): + """ + Optimiser l'index périodiquement + + Pour IVF: Recalculer nlist optimal et réentraîner si nécessaire + """ + if self.index_type != "IVF" or not self.is_trained: + return + + n_vectors = self.index.ntotal + if n_vectors < 100: + return + + # Calculer nlist optimal pour la taille actuelle + optimal_nlist = self._calculate_nlist(n_vectors) + + # Si nlist actuel est très différent, reconstruire + current_nlist = self.index.nlist + if abs(optimal_nlist - current_nlist) / current_nlist > 0.5: + logger.info(f"Optimizing IVF index: {current_nlist} → {optimal_nlist} clusters") + + # Extraire tous les vecteurs + vectors = np.zeros((n_vectors, self.dimensions), dtype=np.float32) + for i in range(n_vectors): + vectors[i] = self.index.reconstruct(int(i)) + + # Créer nouvel index avec nlist optimal + if self.metric == "cosine": + quantizer = faiss.IndexFlatIP(self.dimensions) + new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, optimal_nlist) + else: + quantizer = faiss.IndexFlatL2(self.dimensions) + new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, optimal_nlist) + + new_index.nprobe = self.nprobe + new_index.make_direct_map() # Activer DirectMap + + # Entraîner et ajouter + new_index.train(vectors) + new_index.add(vectors) + + # Remplacer + self.index = new_index + + logger.info("Index optimized successfully") + + def search_similar(self, + query_vector: np.ndarray, + k: int = 5, + min_similarity: Optional[float] = None) -> List[SearchResult]: + """ + Rechercher les k embeddings les plus similaires + + Args: + query_vector: Vecteur de requête + k: Nombre de résultats à retourner + min_similarity: Similarité minimale (optionnel, pour cosine) + + Returns: + Liste de SearchResult triés par similarité décroissante + + Raises: + ValueError: Si dimensions ne correspondent pas + """ + if query_vector.shape[0] != self.dimensions: + raise ValueError( + f"Query vector dimensions mismatch: expected {self.dimensions}, " + f"got {query_vector.shape[0]}" + ) + + if self.index.ntotal == 0: + return [] # Index vide + + # Normaliser si métrique cosine + if self.metric == "cosine": + norm = np.linalg.norm(query_vector) + if norm > 0: + query_vector = query_vector / norm + + # Convertir en float32 et reshape + query_vector = query_vector.astype(np.float32).reshape(1, -1) + + # Rechercher + k = min(k, self.index.ntotal) # Ne pas demander plus que disponible + distances, indices = self.index.search(query_vector, k) + + # Convertir en SearchResults + results = [] + for dist, idx in zip(distances[0], indices[0]): + if idx == -1: # Pas de résultat + continue + + # Récupérer métadonnées + meta = self.metadata_store.get(int(idx), {}) + + # Convertir distance en similarité + if self.metric == "cosine": + # Pour inner product avec vecteurs normalisés, distance = similarité + similarity = float(dist) + elif self.metric == "l2": + # Convertir distance L2 en similarité approximative + similarity = 1.0 / (1.0 + float(dist)) + else: + similarity = float(dist) + + # Filtrer par similarité minimale + if min_similarity is not None and similarity < min_similarity: + continue + + results.append(SearchResult( + embedding_id=meta.get("embedding_id", f"unknown_{idx}"), + similarity=similarity, + distance=float(dist), + metadata=meta.get("metadata", {}) + )) + + return results + + def remove_embedding(self, faiss_id: int) -> bool: + """ + Supprimer un embedding de l'index + + Note: FAISS ne supporte pas la suppression directe. + Cette méthode supprime juste les métadonnées. + Pour vraiment supprimer, il faut reconstruire l'index. + + Args: + faiss_id: ID FAISS de l'embedding + + Returns: + True si supprimé, False si non trouvé + """ + if faiss_id in self.metadata_store: + del self.metadata_store[faiss_id] + return True + return False + + def get_metadata(self, faiss_id: int) -> Optional[Dict[str, Any]]: + """Récupérer les métadonnées d'un embedding""" + return self.metadata_store.get(faiss_id) + + def save(self, index_path: Path, metadata_path: Path) -> None: + """ + Sauvegarder l'index et les métadonnées + + Args: + index_path: Chemin pour sauvegarder l'index FAISS + metadata_path: Chemin pour sauvegarder les métadonnées + """ + # Créer répertoires si nécessaire + index_path.parent.mkdir(parents=True, exist_ok=True) + metadata_path.parent.mkdir(parents=True, exist_ok=True) + + # Si GPU, ramener sur CPU avant sauvegarde + index_to_save = self.index + if self.use_gpu: + try: + index_to_save = faiss.index_gpu_to_cpu(self.index) + except (RuntimeError, AttributeError): + pass # Déjà sur CPU ou pas de GPU + + # Sauvegarder index FAISS + faiss.write_index(index_to_save, str(index_path)) + + # Sauvegarder métadonnées + metadata = { + "dimensions": self.dimensions, + "index_type": self.index_type, + "metric": self.metric, + "next_id": self.next_id, + "metadata_store": self.metadata_store, + "nlist": self.nlist, + "nprobe": self.nprobe, + "is_trained": self.is_trained, + "auto_optimize": self.auto_optimize + } + + with open(metadata_path, 'wb') as f: + pickle.dump(metadata, f) + + @classmethod + def load(cls, index_path: Path, metadata_path: Path, use_gpu: bool = False) -> 'FAISSManager': + """ + Charger un index et ses métadonnées + + Args: + index_path: Chemin de l'index FAISS + metadata_path: Chemin des métadonnées + use_gpu: Charger sur GPU si disponible + + Returns: + FAISSManager chargé + """ + # Charger métadonnées + with open(metadata_path, 'rb') as f: + metadata = pickle.load(f) + + # Créer instance + manager = cls( + dimensions=metadata["dimensions"], + index_type=metadata["index_type"], + metric=metadata["metric"], + nlist=metadata.get("nlist"), + nprobe=metadata.get("nprobe", 8), + use_gpu=use_gpu, + auto_optimize=metadata.get("auto_optimize", True) + ) + + # Charger index FAISS + manager.index = faiss.read_index(str(index_path)) + + # Migrer vers GPU si demandé + if use_gpu and manager.gpu_resources is not None: + try: + manager.index = faiss.index_cpu_to_gpu(manager.gpu_resources, 0, manager.index) + except Exception as e: + logger.warning(f"Failed to move loaded index to GPU: {e}") + + # Restaurer métadonnées + manager.next_id = metadata["next_id"] + manager.metadata_store = metadata["metadata_store"] + manager.is_trained = metadata.get("is_trained", True) + + return manager + + def get_stats(self) -> Dict[str, Any]: + """Récupérer statistiques de l'index""" + stats = { + "dimensions": self.dimensions, + "index_type": self.index_type, + "metric": self.metric, + "total_vectors": self.index.ntotal, + "metadata_count": len(self.metadata_store), + "is_trained": self.is_trained, + "use_gpu": self.use_gpu + } + + # Ajouter stats spécifiques IVF + if self.index_type == "IVF" and self.is_trained: + stats["nlist"] = self.index.nlist + stats["nprobe"] = self.index.nprobe + + # Calculer nlist optimal pour comparaison + if self.index.ntotal > 0: + optimal_nlist = self._calculate_nlist(self.index.ntotal) + stats["optimal_nlist"] = optimal_nlist + stats["nlist_efficiency"] = min(1.0, self.index.nlist / optimal_nlist) + + return stats + + def clear(self) -> None: + """ + Vider complètement l'index + reset état d'entraînement. + + Auteur : Dom, Alice Kiro - 22 décembre 2025 + + Amélioration pour FAISS Rebuild Propre: + - Reset complet de l'état IVF training + - Réinitialisation des training_vectors + - Gestion correcte du flag is_trained selon le type d'index + """ + self.index = self._create_index() + self.metadata_store.clear() + self.next_id = 0 + + # IMPORTANT: reset IVF training state + self.training_vectors.clear() + self.is_trained = (self.index_type == "Flat") + + def reindex(self, items, force_train_ivf: bool = True) -> int: + """ + Reconstruit l'index à partir d'une source canonique (vecteurs). + + Auteur : Dom, Alice Kiro - 22 décembre 2025 + + Stratégie FAISS Rebuild Propre: "1 prototype = 1 entrée" + - Clear complet avant reconstruction + - Ajout sécurisé avec validation des vecteurs + - Force training IVF même pour petits volumes + - Retour du nombre d'éléments indexés + + Args: + items: Iterable[(embedding_id: str, vector: np.ndarray, metadata: dict)] + force_train_ivf: Forcer l'entraînement IVF même avec peu de vecteurs + + Returns: + Nombre d'items indexés avec succès + """ + logger.info(f"FAISS reindex started with force_train_ivf={force_train_ivf}") + + # Clear complet avant reconstruction + self.clear() + + count = 0 + for embedding_id, vector, metadata in items: + if vector is None: + logger.debug(f"Skipping None vector for {embedding_id}") + continue + + try: + self.add_embedding(embedding_id, vector, metadata or {}) + count += 1 + except Exception as e: + logger.warning(f"Failed to add embedding {embedding_id}: {e}") + continue + + # Si IVF + petit volume, add_embedding ne déclenche pas forcément l'entraînement + if (self.index_type == "IVF" and force_train_ivf and + (not self.is_trained) and self.training_vectors): + logger.info(f"Force training IVF with {len(self.training_vectors)} vectors") + self._train_ivf_index() + + logger.info(f"FAISS reindex completed: {count} items indexed") + return count + + def rebuild_index(self) -> None: + """ + Reconstruire l'index depuis les métadonnées + + Utile après suppressions pour compacter l'index. + Note: Nécessite d'avoir les vecteurs originaux. + """ + # TODO: Implémenter si nécessaire + # Nécessiterait de stocker les vecteurs dans metadata_store + raise NotImplementedError("Rebuild not yet implemented") + + +# ============================================================================ +# Fonctions utilitaires +# ============================================================================ + +def create_flat_index(dimensions: int, metric: str = "cosine") -> FAISSManager: + """ + Créer un index FAISS Flat (recherche exhaustive) + + Args: + dimensions: Nombre de dimensions + metric: Métrique ("cosine", "l2", "ip") + + Returns: + FAISSManager configuré + """ + return FAISSManager(dimensions=dimensions, index_type="Flat", metric=metric) + + +def create_ivf_index(dimensions: int, metric: str = "cosine") -> FAISSManager: + """ + Créer un index FAISS IVF (recherche approximative rapide) + + Args: + dimensions: Nombre de dimensions + metric: Métrique ("cosine", "l2") + + Returns: + FAISSManager configuré + """ + return FAISSManager(dimensions=dimensions, index_type="IVF", metric=metric)