fix(faiss): Correct parameter name in factory functions

Fix TypeError in create_flat_index() and create_ivf_index():
- Line 678: dimension= → dimensions=
- Line 692: dimension= → dimensions=

The FAISSManager.__init__() expects 'dimensions' (plural), not 'dimension'.
This bug prevented using the factory functions to create FAISS indexes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-19 08:50:23 +01:00
parent 47b215e639
commit ae1cb72362

View File

@@ -0,0 +1,692 @@
"""
FAISSManager - Gestion d'Index FAISS pour Recherche de Similarité
Gère l'indexation et la recherche rapide d'embeddings avec FAISS.
Supporte sauvegarde/chargement d'index et métadonnées.
"""
import logging
from typing import List, Dict, Optional, Tuple, Any
from pathlib import Path
from dataclasses import dataclass
import numpy as np
import json
import pickle
logger = logging.getLogger(__name__)
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
logger.warning("FAISS not installed. Install with: pip install faiss-cpu")
@dataclass
class SearchResult:
"""Résultat d'une recherche de similarité"""
embedding_id: str
similarity: float # Similarité cosinus
distance: float # Distance L2
metadata: Dict[str, Any]
class FAISSManager:
"""
Gestionnaire d'index FAISS
Gère l'ajout, la recherche et la persistence d'embeddings avec FAISS.
Maintient un mapping entre IDs FAISS et métadonnées.
Features d'optimisation:
- Migration automatique Flat → IVF pour >10k embeddings
- Entraînement automatique de l'index IVF
- Support GPU si disponible
- Optimisation périodique de l'index
"""
def __init__(self,
dimensions: int,
index_type: str = "Flat",
metric: str = "cosine",
nlist: Optional[int] = None,
nprobe: int = 8,
use_gpu: bool = False,
auto_optimize: bool = True):
"""
Initialiser le gestionnaire FAISS
Args:
dimensions: Nombre de dimensions des vecteurs
index_type: Type d'index FAISS ("Flat", "IVF", "HNSW")
metric: Métrique de distance ("cosine", "l2", "ip")
nlist: Nombre de clusters pour IVF (auto si None)
nprobe: Nombre de clusters à visiter lors de la recherche IVF
use_gpu: Utiliser GPU si disponible
auto_optimize: Migrer automatiquement vers IVF si >10k embeddings
Raises:
ImportError: Si FAISS n'est pas installé
"""
if not FAISS_AVAILABLE:
raise ImportError(
"FAISS is required but not installed. "
"Install with: pip install faiss-cpu"
)
self.dimensions = dimensions
self.index_type = index_type
self.metric = metric
self.nlist = nlist
self.nprobe = nprobe
self.use_gpu = use_gpu
self.auto_optimize = auto_optimize
# Mapping ID FAISS -> métadonnées
self.metadata_store: Dict[int, Dict[str, Any]] = {}
# Compteur pour IDs FAISS
self.next_id = 0
# Vecteurs pour entraînement IVF (si nécessaire)
self.training_vectors: List[np.ndarray] = []
self.is_trained = (index_type == "Flat") # Flat n'a pas besoin d'entraînement
# Seuil pour migration automatique
self.migration_threshold = 10000
# GPU resources
self.gpu_resources = None
if use_gpu:
self._setup_gpu()
# Créer l'index FAISS (après avoir initialisé tous les attributs)
self.index = self._create_index()
def _setup_gpu(self):
"""Configurer les ressources GPU si disponibles"""
try:
# Vérifier si GPU est disponible
ngpus = faiss.get_num_gpus()
if ngpus > 0:
self.gpu_resources = faiss.StandardGpuResources()
logger.info(f"FAISS GPU enabled: {ngpus} GPU(s) available")
else:
logger.warning("FAISS GPU requested but no GPU available, using CPU")
self.use_gpu = False
except Exception as e:
logger.warning(f"FAISS GPU setup failed: {e}, using CPU")
self.use_gpu = False
def _calculate_nlist(self, n_vectors: int) -> int:
"""
Calculer le nombre optimal de clusters pour IVF
Règle empirique: nlist = sqrt(n_vectors)
Minimum: 100, Maximum: 65536
Args:
n_vectors: Nombre de vecteurs dans l'index
Returns:
Nombre optimal de clusters
"""
if self.nlist is not None:
return self.nlist
# Règle empirique
nlist = int(np.sqrt(n_vectors))
# Contraintes
nlist = max(100, min(nlist, 65536))
return nlist
def _create_index(self) -> 'faiss.Index':
"""Créer un index FAISS selon la configuration"""
if self.metric == "cosine":
# Pour cosine similarity, normaliser et utiliser inner product
if self.index_type == "Flat":
index = faiss.IndexFlatIP(self.dimensions)
elif self.index_type == "IVF":
# Calculer nlist optimal
nlist = self._calculate_nlist(max(1000, self.migration_threshold))
quantizer = faiss.IndexFlatIP(self.dimensions)
index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist)
# Configurer nprobe
index.nprobe = self.nprobe
# Activer DirectMap pour permettre reconstruct()
index.make_direct_map()
elif self.index_type == "HNSW":
index = faiss.IndexHNSWFlat(self.dimensions, 32)
else:
raise ValueError(f"Unknown index type: {self.index_type}")
elif self.metric == "l2":
if self.index_type == "Flat":
index = faiss.IndexFlatL2(self.dimensions)
elif self.index_type == "IVF":
# Calculer nlist optimal
nlist = self._calculate_nlist(max(1000, self.migration_threshold))
quantizer = faiss.IndexFlatL2(self.dimensions)
index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist)
# Configurer nprobe
index.nprobe = self.nprobe
# Activer DirectMap pour permettre reconstruct()
index.make_direct_map()
elif self.index_type == "HNSW":
index = faiss.IndexHNSWFlat(self.dimensions, 32)
else:
raise ValueError(f"Unknown index type: {self.index_type}")
elif self.metric == "ip": # Inner product
if self.index_type == "Flat":
index = faiss.IndexFlatIP(self.dimensions)
else:
raise ValueError(f"Inner product only supports Flat index")
else:
raise ValueError(f"Unknown metric: {self.metric}")
# Migrer vers GPU si demandé
if self.use_gpu and self.gpu_resources is not None:
try:
index = faiss.index_cpu_to_gpu(self.gpu_resources, 0, index)
except Exception as e:
logger.warning(f"Failed to move index to GPU: {e}, using CPU")
return index
def add_embedding(self,
embedding_id: str,
vector: np.ndarray,
metadata: Optional[Dict[str, Any]] = None) -> int:
"""
Ajouter un embedding à l'index
Args:
embedding_id: ID unique de l'embedding
vector: Vecteur d'embedding (dimensions doivent correspondre)
metadata: Métadonnées associées (optionnel)
Returns:
ID FAISS assigné
Raises:
ValueError: Si dimensions ne correspondent pas
"""
if vector.shape[0] != self.dimensions:
raise ValueError(
f"Vector dimensions mismatch: expected {self.dimensions}, "
f"got {vector.shape[0]}"
)
# Convertir en float32 d'abord
vector_float32 = vector.astype(np.float32)
# Normaliser si métrique cosine
if self.metric == "cosine":
norm = np.linalg.norm(vector_float32)
if norm > 0:
vector_float32 = vector_float32 / norm
# Reshape pour FAISS
vector_reshaped = vector_float32.reshape(1, -1)
# Pour IVF, stocker vecteurs pour entraînement si pas encore entraîné
if self.index_type == "IVF" and not self.is_trained:
self.training_vectors.append(vector_float32) # Stocker le vecteur normalisé
# Entraîner si on a assez de vecteurs
if len(self.training_vectors) >= 100:
self._train_ivf_index()
# Les vecteurs d'entraînement ont déjà été ajoutés dans _train_ivf_index
# Ne pas ajouter à nouveau
elif self.is_trained:
# Ajouter à l'index (seulement si entraîné pour IVF ou si Flat)
self.index.add(vector_reshaped)
# Stocker métadonnées
faiss_id = self.next_id
self.metadata_store[faiss_id] = {
"embedding_id": embedding_id,
"metadata": metadata or {}
}
self.next_id += 1
# Vérifier si migration automatique nécessaire
if self.auto_optimize and self.index_type == "Flat":
if self.index.ntotal >= self.migration_threshold:
self._migrate_to_ivf()
return faiss_id
def _train_ivf_index(self):
"""Entraîner l'index IVF avec les vecteurs collectés"""
if self.is_trained or self.index_type != "IVF":
return
if len(self.training_vectors) < 100:
logger.warning(f" Training IVF with only {len(self.training_vectors)} vectors")
# Convertir en array numpy
training_data = np.array(self.training_vectors, dtype=np.float32)
logger.info(f"Training IVF index with {len(self.training_vectors)} vectors...")
# Entraîner l'index
self.index.train(training_data)
self.is_trained = True
# Ajouter tous les vecteurs d'entraînement à l'index
self.index.add(training_data)
# Libérer mémoire
self.training_vectors.clear()
logger.info(f"IVF index trained successfully with nlist={self.index.nlist}")
def _migrate_to_ivf(self):
"""
Migrer automatiquement de Flat vers IVF
Appelé automatiquement quand l'index Flat dépasse le seuil.
"""
if self.index_type != "Flat":
return
logger.info(f"Migrating from Flat to IVF (current size: {self.index.ntotal})...")
# Extraire tous les vecteurs de l'index Flat
n_vectors = self.index.ntotal
vectors = np.zeros((n_vectors, self.dimensions), dtype=np.float32)
for i in range(n_vectors):
vectors[i] = self.index.reconstruct(int(i))
# Calculer nlist optimal
nlist = self._calculate_nlist(n_vectors)
# Créer nouvel index IVF
if self.metric == "cosine":
quantizer = faiss.IndexFlatIP(self.dimensions)
new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist)
else: # l2
quantizer = faiss.IndexFlatL2(self.dimensions)
new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, nlist)
new_index.nprobe = self.nprobe
new_index.make_direct_map() # Activer DirectMap
# Entraîner avec tous les vecteurs
new_index.train(vectors)
# Ajouter tous les vecteurs
new_index.add(vectors)
# Remplacer l'index
self.index = new_index
self.index_type = "IVF"
self.is_trained = True
logger.info(f"Migration complete: IVF index with nlist={nlist}, nprobe={self.nprobe}")
def optimize_index(self):
"""
Optimiser l'index périodiquement
Pour IVF: Recalculer nlist optimal et réentraîner si nécessaire
"""
if self.index_type != "IVF" or not self.is_trained:
return
n_vectors = self.index.ntotal
if n_vectors < 100:
return
# Calculer nlist optimal pour la taille actuelle
optimal_nlist = self._calculate_nlist(n_vectors)
# Si nlist actuel est très différent, reconstruire
current_nlist = self.index.nlist
if abs(optimal_nlist - current_nlist) / current_nlist > 0.5:
logger.info(f"Optimizing IVF index: {current_nlist}{optimal_nlist} clusters")
# Extraire tous les vecteurs
vectors = np.zeros((n_vectors, self.dimensions), dtype=np.float32)
for i in range(n_vectors):
vectors[i] = self.index.reconstruct(int(i))
# Créer nouvel index avec nlist optimal
if self.metric == "cosine":
quantizer = faiss.IndexFlatIP(self.dimensions)
new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, optimal_nlist)
else:
quantizer = faiss.IndexFlatL2(self.dimensions)
new_index = faiss.IndexIVFFlat(quantizer, self.dimensions, optimal_nlist)
new_index.nprobe = self.nprobe
new_index.make_direct_map() # Activer DirectMap
# Entraîner et ajouter
new_index.train(vectors)
new_index.add(vectors)
# Remplacer
self.index = new_index
logger.info("Index optimized successfully")
def search_similar(self,
query_vector: np.ndarray,
k: int = 5,
min_similarity: Optional[float] = None) -> List[SearchResult]:
"""
Rechercher les k embeddings les plus similaires
Args:
query_vector: Vecteur de requête
k: Nombre de résultats à retourner
min_similarity: Similarité minimale (optionnel, pour cosine)
Returns:
Liste de SearchResult triés par similarité décroissante
Raises:
ValueError: Si dimensions ne correspondent pas
"""
if query_vector.shape[0] != self.dimensions:
raise ValueError(
f"Query vector dimensions mismatch: expected {self.dimensions}, "
f"got {query_vector.shape[0]}"
)
if self.index.ntotal == 0:
return [] # Index vide
# Normaliser si métrique cosine
if self.metric == "cosine":
norm = np.linalg.norm(query_vector)
if norm > 0:
query_vector = query_vector / norm
# Convertir en float32 et reshape
query_vector = query_vector.astype(np.float32).reshape(1, -1)
# Rechercher
k = min(k, self.index.ntotal) # Ne pas demander plus que disponible
distances, indices = self.index.search(query_vector, k)
# Convertir en SearchResults
results = []
for dist, idx in zip(distances[0], indices[0]):
if idx == -1: # Pas de résultat
continue
# Récupérer métadonnées
meta = self.metadata_store.get(int(idx), {})
# Convertir distance en similarité
if self.metric == "cosine":
# Pour inner product avec vecteurs normalisés, distance = similarité
similarity = float(dist)
elif self.metric == "l2":
# Convertir distance L2 en similarité approximative
similarity = 1.0 / (1.0 + float(dist))
else:
similarity = float(dist)
# Filtrer par similarité minimale
if min_similarity is not None and similarity < min_similarity:
continue
results.append(SearchResult(
embedding_id=meta.get("embedding_id", f"unknown_{idx}"),
similarity=similarity,
distance=float(dist),
metadata=meta.get("metadata", {})
))
return results
def remove_embedding(self, faiss_id: int) -> bool:
"""
Supprimer un embedding de l'index
Note: FAISS ne supporte pas la suppression directe.
Cette méthode supprime juste les métadonnées.
Pour vraiment supprimer, il faut reconstruire l'index.
Args:
faiss_id: ID FAISS de l'embedding
Returns:
True si supprimé, False si non trouvé
"""
if faiss_id in self.metadata_store:
del self.metadata_store[faiss_id]
return True
return False
def get_metadata(self, faiss_id: int) -> Optional[Dict[str, Any]]:
"""Récupérer les métadonnées d'un embedding"""
return self.metadata_store.get(faiss_id)
def save(self, index_path: Path, metadata_path: Path) -> None:
"""
Sauvegarder l'index et les métadonnées
Args:
index_path: Chemin pour sauvegarder l'index FAISS
metadata_path: Chemin pour sauvegarder les métadonnées
"""
# Créer répertoires si nécessaire
index_path.parent.mkdir(parents=True, exist_ok=True)
metadata_path.parent.mkdir(parents=True, exist_ok=True)
# Si GPU, ramener sur CPU avant sauvegarde
index_to_save = self.index
if self.use_gpu:
try:
index_to_save = faiss.index_gpu_to_cpu(self.index)
except (RuntimeError, AttributeError):
pass # Déjà sur CPU ou pas de GPU
# Sauvegarder index FAISS
faiss.write_index(index_to_save, str(index_path))
# Sauvegarder métadonnées
metadata = {
"dimensions": self.dimensions,
"index_type": self.index_type,
"metric": self.metric,
"next_id": self.next_id,
"metadata_store": self.metadata_store,
"nlist": self.nlist,
"nprobe": self.nprobe,
"is_trained": self.is_trained,
"auto_optimize": self.auto_optimize
}
with open(metadata_path, 'wb') as f:
pickle.dump(metadata, f)
@classmethod
def load(cls, index_path: Path, metadata_path: Path, use_gpu: bool = False) -> 'FAISSManager':
"""
Charger un index et ses métadonnées
Args:
index_path: Chemin de l'index FAISS
metadata_path: Chemin des métadonnées
use_gpu: Charger sur GPU si disponible
Returns:
FAISSManager chargé
"""
# Charger métadonnées
with open(metadata_path, 'rb') as f:
metadata = pickle.load(f)
# Créer instance
manager = cls(
dimensions=metadata["dimensions"],
index_type=metadata["index_type"],
metric=metadata["metric"],
nlist=metadata.get("nlist"),
nprobe=metadata.get("nprobe", 8),
use_gpu=use_gpu,
auto_optimize=metadata.get("auto_optimize", True)
)
# Charger index FAISS
manager.index = faiss.read_index(str(index_path))
# Migrer vers GPU si demandé
if use_gpu and manager.gpu_resources is not None:
try:
manager.index = faiss.index_cpu_to_gpu(manager.gpu_resources, 0, manager.index)
except Exception as e:
logger.warning(f"Failed to move loaded index to GPU: {e}")
# Restaurer métadonnées
manager.next_id = metadata["next_id"]
manager.metadata_store = metadata["metadata_store"]
manager.is_trained = metadata.get("is_trained", True)
return manager
def get_stats(self) -> Dict[str, Any]:
"""Récupérer statistiques de l'index"""
stats = {
"dimensions": self.dimensions,
"index_type": self.index_type,
"metric": self.metric,
"total_vectors": self.index.ntotal,
"metadata_count": len(self.metadata_store),
"is_trained": self.is_trained,
"use_gpu": self.use_gpu
}
# Ajouter stats spécifiques IVF
if self.index_type == "IVF" and self.is_trained:
stats["nlist"] = self.index.nlist
stats["nprobe"] = self.index.nprobe
# Calculer nlist optimal pour comparaison
if self.index.ntotal > 0:
optimal_nlist = self._calculate_nlist(self.index.ntotal)
stats["optimal_nlist"] = optimal_nlist
stats["nlist_efficiency"] = min(1.0, self.index.nlist / optimal_nlist)
return stats
def clear(self) -> None:
"""
Vider complètement l'index + reset état d'entraînement.
Auteur : Dom, Alice Kiro - 22 décembre 2025
Amélioration pour FAISS Rebuild Propre:
- Reset complet de l'état IVF training
- Réinitialisation des training_vectors
- Gestion correcte du flag is_trained selon le type d'index
"""
self.index = self._create_index()
self.metadata_store.clear()
self.next_id = 0
# IMPORTANT: reset IVF training state
self.training_vectors.clear()
self.is_trained = (self.index_type == "Flat")
def reindex(self, items, force_train_ivf: bool = True) -> int:
"""
Reconstruit l'index à partir d'une source canonique (vecteurs).
Auteur : Dom, Alice Kiro - 22 décembre 2025
Stratégie FAISS Rebuild Propre: "1 prototype = 1 entrée"
- Clear complet avant reconstruction
- Ajout sécurisé avec validation des vecteurs
- Force training IVF même pour petits volumes
- Retour du nombre d'éléments indexés
Args:
items: Iterable[(embedding_id: str, vector: np.ndarray, metadata: dict)]
force_train_ivf: Forcer l'entraînement IVF même avec peu de vecteurs
Returns:
Nombre d'items indexés avec succès
"""
logger.info(f"FAISS reindex started with force_train_ivf={force_train_ivf}")
# Clear complet avant reconstruction
self.clear()
count = 0
for embedding_id, vector, metadata in items:
if vector is None:
logger.debug(f"Skipping None vector for {embedding_id}")
continue
try:
self.add_embedding(embedding_id, vector, metadata or {})
count += 1
except Exception as e:
logger.warning(f"Failed to add embedding {embedding_id}: {e}")
continue
# Si IVF + petit volume, add_embedding ne déclenche pas forcément l'entraînement
if (self.index_type == "IVF" and force_train_ivf and
(not self.is_trained) and self.training_vectors):
logger.info(f"Force training IVF with {len(self.training_vectors)} vectors")
self._train_ivf_index()
logger.info(f"FAISS reindex completed: {count} items indexed")
return count
def rebuild_index(self) -> None:
"""
Reconstruire l'index depuis les métadonnées
Utile après suppressions pour compacter l'index.
Note: Nécessite d'avoir les vecteurs originaux.
"""
# TODO: Implémenter si nécessaire
# Nécessiterait de stocker les vecteurs dans metadata_store
raise NotImplementedError("Rebuild not yet implemented")
# ============================================================================
# Fonctions utilitaires
# ============================================================================
def create_flat_index(dimensions: int, metric: str = "cosine") -> FAISSManager:
"""
Créer un index FAISS Flat (recherche exhaustive)
Args:
dimensions: Nombre de dimensions
metric: Métrique ("cosine", "l2", "ip")
Returns:
FAISSManager configuré
"""
return FAISSManager(dimensions=dimensions, index_type="Flat", metric=metric)
def create_ivf_index(dimensions: int, metric: str = "cosine") -> FAISSManager:
"""
Créer un index FAISS IVF (recherche approximative rapide)
Args:
dimensions: Nombre de dimensions
metric: Métrique ("cosine", "l2")
Returns:
FAISSManager configuré
"""
return FAISSManager(dimensions=dimensions, index_type="IVF", metric=metric)