Files
Geniusia_v2/geniusia2/core/faiss_index_builder.py
2026-03-05 00:20:25 +01:00

409 lines
14 KiB
Python

"""
Constructeur et reconstructeur d'index FAISS.
Scanne les tâches existantes et reconstruit l'index à partir des signatures sauvegardées.
"""
import json
import pickle
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from .logger import Logger
from .embeddings_manager import EmbeddingsManager
from .models import TaskProfile
class FAISSIndexBuilder:
"""
Construit et reconstruit l'index FAISS à partir des tâches sauvegardées.
Résout le problème critique : index vide malgré 19+ tâches existantes.
"""
def __init__(
self,
embeddings_manager: EmbeddingsManager,
logger: Logger,
profiles_path: str = "data/user_profiles"
):
"""
Initialise le constructeur d'index.
Args:
embeddings_manager: Gestionnaire d'embeddings et index FAISS
logger: Logger pour tracer les opérations
profiles_path: Chemin vers les profils de tâches
"""
self.embeddings_manager = embeddings_manager
self.logger = logger
self.profiles_path = Path(profiles_path)
self.logger.log_action({
"action": "faiss_index_builder_initialized",
"profiles_path": str(self.profiles_path)
})
def scan_tasks(self) -> List[Dict[str, Any]]:
"""
Scanne tous les dossiers de tâches pour trouver les tâches existantes.
Returns:
Liste de dictionnaires avec task_id et chemin
"""
tasks = []
if not self.profiles_path.exists():
self.logger.log_action({
"action": "scan_tasks_no_directory",
"path": str(self.profiles_path)
})
return tasks
# Scanner tous les sous-dossiers
for task_dir in self.profiles_path.iterdir():
if not task_dir.is_dir():
continue
# Vérifier qu'il y a un fichier metadata.json
metadata_file = task_dir / "metadata.json"
if not metadata_file.exists():
self.logger.log_action({
"action": "scan_tasks_no_metadata",
"task_dir": str(task_dir)
})
continue
tasks.append({
"task_id": task_dir.name,
"path": task_dir,
"metadata_file": metadata_file
})
self.logger.log_action({
"action": "scan_tasks_completed",
"tasks_found": len(tasks)
})
return tasks
def load_task_embeddings(self, task_info: Dict[str, Any]) -> Tuple[List[np.ndarray], Dict[str, Any]]:
"""
Charge les embeddings d'une tâche depuis ses fichiers.
Args:
task_info: Dictionnaire avec task_id, path, metadata_file
Returns:
Tuple (liste d'embeddings, métadonnées de la tâche)
"""
task_id = task_info["task_id"]
task_path = task_info["path"]
embeddings = []
metadata = {}
try:
# Charger les métadonnées
with open(task_info["metadata_file"], 'r') as f:
metadata = json.load(f)
# Charger les signatures (qui contiennent les embeddings)
signatures_file = task_path / "signatures.pkl"
if signatures_file.exists():
with open(signatures_file, 'rb') as f:
signatures = pickle.load(f)
# Extraire les embeddings des signatures
for sig in signatures:
if isinstance(sig, dict) and "embedding" in sig:
emb = sig["embedding"]
if emb is not None:
# Convertir en numpy array si nécessaire
if not isinstance(emb, np.ndarray):
emb = np.array(emb, dtype=np.float32)
embeddings.append(emb)
self.logger.log_action({
"action": "load_task_embeddings_success",
"task_id": task_id,
"embeddings_count": len(embeddings)
})
else:
self.logger.log_action({
"action": "load_task_embeddings_no_signatures",
"task_id": task_id
})
except Exception as e:
self.logger.log_action({
"action": "load_task_embeddings_error",
"task_id": task_id,
"error": str(e)
})
return embeddings, metadata
def rebuild_index(self, force: bool = False) -> Dict[str, Any]:
"""
Reconstruit l'index FAISS complet à partir de toutes les tâches.
Args:
force: Si True, reconstruit même si l'index n'est pas vide
Returns:
Rapport de reconstruction avec statistiques
"""
start_time = datetime.now()
self.logger.log_action({
"action": "rebuild_index_started",
"force": force,
"current_index_size": self.embeddings_manager.faiss_index.ntotal
})
# Vérifier si reconstruction nécessaire
if not force and self.embeddings_manager.faiss_index.ntotal > 0:
self.logger.log_action({
"action": "rebuild_index_skipped",
"reason": "index_not_empty"
})
return {
"success": False,
"reason": "index_not_empty",
"current_size": self.embeddings_manager.faiss_index.ntotal
}
# Scanner les tâches
tasks = self.scan_tasks()
if not tasks:
self.logger.log_action({
"action": "rebuild_index_no_tasks",
"reason": "no_tasks_found"
})
return {
"success": False,
"reason": "no_tasks_found",
"tasks_scanned": 0
}
# Statistiques
stats = {
"tasks_scanned": len(tasks),
"tasks_processed": 0,
"tasks_failed": 0,
"embeddings_added": 0,
"errors": []
}
# Traiter chaque tâche
for task_info in tasks:
task_id = task_info["task_id"]
try:
# Charger les embeddings
embeddings, metadata = self.load_task_embeddings(task_info)
if not embeddings:
self.logger.log_action({
"action": "rebuild_index_task_no_embeddings",
"task_id": task_id
})
stats["tasks_failed"] += 1
continue
# Ajouter chaque embedding à l'index
for i, embedding in enumerate(embeddings):
try:
# Valider l'embedding
if not self._validate_embedding(embedding):
self.logger.log_action({
"action": "rebuild_index_invalid_embedding",
"task_id": task_id,
"embedding_index": i
})
continue
# Ajouter à l'index FAISS
self.embeddings_manager.add_to_index(
embedding,
{
"task_id": task_id,
"embedding_index": i,
"timestamp": datetime.now().isoformat(),
"from_rebuild": True
}
)
stats["embeddings_added"] += 1
except Exception as e:
self.logger.log_action({
"action": "rebuild_index_embedding_error",
"task_id": task_id,
"embedding_index": i,
"error": str(e)
})
stats["errors"].append({
"task_id": task_id,
"embedding_index": i,
"error": str(e)
})
stats["tasks_processed"] += 1
except Exception as e:
self.logger.log_action({
"action": "rebuild_index_task_error",
"task_id": task_id,
"error": str(e)
})
stats["tasks_failed"] += 1
stats["errors"].append({
"task_id": task_id,
"error": str(e)
})
# Sauvegarder l'index
try:
self.embeddings_manager.save_index()
self.logger.log_action({
"action": "rebuild_index_saved",
"embeddings_count": stats["embeddings_added"]
})
except Exception as e:
self.logger.log_action({
"action": "rebuild_index_save_error",
"error": str(e)
})
stats["errors"].append({
"error": f"Failed to save index: {str(e)}"
})
# Calculer la durée
duration = (datetime.now() - start_time).total_seconds()
stats["success"] = stats["embeddings_added"] > 0
stats["duration_seconds"] = duration
stats["final_index_size"] = self.embeddings_manager.faiss_index.ntotal
self.logger.log_action({
"action": "rebuild_index_completed",
"stats": stats
})
return stats
def verify_index_integrity(self) -> Dict[str, Any]:
"""
Vérifie la cohérence entre les tâches sauvegardées et l'index FAISS.
Returns:
Rapport de vérification avec incohérences détectées
"""
self.logger.log_action({
"action": "verify_index_integrity_started"
})
# Scanner les tâches
tasks = self.scan_tasks()
# Compter les embeddings attendus
expected_embeddings = 0
tasks_with_embeddings = 0
for task_info in tasks:
embeddings, _ = self.load_task_embeddings(task_info)
if embeddings:
expected_embeddings += len(embeddings)
tasks_with_embeddings += 1
# Comparer avec l'index actuel
actual_embeddings = self.embeddings_manager.faiss_index.ntotal
# Déterminer la cohérence
is_consistent = (actual_embeddings >= expected_embeddings * 0.9) # Tolérance de 10%
report = {
"is_consistent": is_consistent,
"tasks_scanned": len(tasks),
"tasks_with_embeddings": tasks_with_embeddings,
"expected_embeddings": expected_embeddings,
"actual_embeddings": actual_embeddings,
"missing_embeddings": max(0, expected_embeddings - actual_embeddings),
"needs_rebuild": not is_consistent
}
self.logger.log_action({
"action": "verify_index_integrity_completed",
"report": report
})
return report
def _validate_embedding(self, embedding: np.ndarray) -> bool:
"""
Valide qu'un embedding est correct.
Args:
embedding: Embedding à valider
Returns:
True si valide, False sinon
"""
try:
# Vérifier que c'est un numpy array
if not isinstance(embedding, np.ndarray):
return False
# Vérifier la dimension
if embedding.shape[0] != self.embeddings_manager.embedding_dim:
self.logger.log_action({
"action": "validate_embedding_wrong_dimension",
"expected": self.embeddings_manager.embedding_dim,
"actual": embedding.shape[0]
})
return False
# Vérifier pas de NaN ou Inf
if np.isnan(embedding).any() or np.isinf(embedding).any():
self.logger.log_action({
"action": "validate_embedding_nan_inf"
})
return False
# Vérifier norme non-nulle
norm = np.linalg.norm(embedding)
if norm == 0:
self.logger.log_action({
"action": "validate_embedding_zero_norm"
})
return False
return True
except Exception as e:
self.logger.log_action({
"action": "validate_embedding_error",
"error": str(e)
})
return False
def get_stats(self) -> Dict[str, Any]:
"""
Retourne des statistiques sur l'état actuel.
Returns:
Dictionnaire de statistiques
"""
tasks = self.scan_tasks()
return {
"tasks_found": len(tasks),
"index_size": self.embeddings_manager.faiss_index.ntotal,
"profiles_path": str(self.profiles_path),
"profiles_path_exists": self.profiles_path.exists()
}