409 lines
14 KiB
Python
409 lines
14 KiB
Python
"""
|
|
Constructeur et reconstructeur d'index FAISS.
|
|
Scanne les tâches existantes et reconstruit l'index à partir des signatures sauvegardées.
|
|
"""
|
|
|
|
import json
|
|
import pickle
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from datetime import datetime
|
|
|
|
from .logger import Logger
|
|
from .embeddings_manager import EmbeddingsManager
|
|
from .models import TaskProfile
|
|
|
|
|
|
class FAISSIndexBuilder:
|
|
"""
|
|
Construit et reconstruit l'index FAISS à partir des tâches sauvegardées.
|
|
Résout le problème critique : index vide malgré 19+ tâches existantes.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
embeddings_manager: EmbeddingsManager,
|
|
logger: Logger,
|
|
profiles_path: str = "data/user_profiles"
|
|
):
|
|
"""
|
|
Initialise le constructeur d'index.
|
|
|
|
Args:
|
|
embeddings_manager: Gestionnaire d'embeddings et index FAISS
|
|
logger: Logger pour tracer les opérations
|
|
profiles_path: Chemin vers les profils de tâches
|
|
"""
|
|
self.embeddings_manager = embeddings_manager
|
|
self.logger = logger
|
|
self.profiles_path = Path(profiles_path)
|
|
|
|
self.logger.log_action({
|
|
"action": "faiss_index_builder_initialized",
|
|
"profiles_path": str(self.profiles_path)
|
|
})
|
|
|
|
def scan_tasks(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scanne tous les dossiers de tâches pour trouver les tâches existantes.
|
|
|
|
Returns:
|
|
Liste de dictionnaires avec task_id et chemin
|
|
"""
|
|
tasks = []
|
|
|
|
if not self.profiles_path.exists():
|
|
self.logger.log_action({
|
|
"action": "scan_tasks_no_directory",
|
|
"path": str(self.profiles_path)
|
|
})
|
|
return tasks
|
|
|
|
# Scanner tous les sous-dossiers
|
|
for task_dir in self.profiles_path.iterdir():
|
|
if not task_dir.is_dir():
|
|
continue
|
|
|
|
# Vérifier qu'il y a un fichier metadata.json
|
|
metadata_file = task_dir / "metadata.json"
|
|
if not metadata_file.exists():
|
|
self.logger.log_action({
|
|
"action": "scan_tasks_no_metadata",
|
|
"task_dir": str(task_dir)
|
|
})
|
|
continue
|
|
|
|
tasks.append({
|
|
"task_id": task_dir.name,
|
|
"path": task_dir,
|
|
"metadata_file": metadata_file
|
|
})
|
|
|
|
self.logger.log_action({
|
|
"action": "scan_tasks_completed",
|
|
"tasks_found": len(tasks)
|
|
})
|
|
|
|
return tasks
|
|
|
|
|
|
def load_task_embeddings(self, task_info: Dict[str, Any]) -> Tuple[List[np.ndarray], Dict[str, Any]]:
|
|
"""
|
|
Charge les embeddings d'une tâche depuis ses fichiers.
|
|
|
|
Args:
|
|
task_info: Dictionnaire avec task_id, path, metadata_file
|
|
|
|
Returns:
|
|
Tuple (liste d'embeddings, métadonnées de la tâche)
|
|
"""
|
|
task_id = task_info["task_id"]
|
|
task_path = task_info["path"]
|
|
|
|
embeddings = []
|
|
metadata = {}
|
|
|
|
try:
|
|
# Charger les métadonnées
|
|
with open(task_info["metadata_file"], 'r') as f:
|
|
metadata = json.load(f)
|
|
|
|
# Charger les signatures (qui contiennent les embeddings)
|
|
signatures_file = task_path / "signatures.pkl"
|
|
if signatures_file.exists():
|
|
with open(signatures_file, 'rb') as f:
|
|
signatures = pickle.load(f)
|
|
|
|
# Extraire les embeddings des signatures
|
|
for sig in signatures:
|
|
if isinstance(sig, dict) and "embedding" in sig:
|
|
emb = sig["embedding"]
|
|
if emb is not None:
|
|
# Convertir en numpy array si nécessaire
|
|
if not isinstance(emb, np.ndarray):
|
|
emb = np.array(emb, dtype=np.float32)
|
|
embeddings.append(emb)
|
|
|
|
self.logger.log_action({
|
|
"action": "load_task_embeddings_success",
|
|
"task_id": task_id,
|
|
"embeddings_count": len(embeddings)
|
|
})
|
|
else:
|
|
self.logger.log_action({
|
|
"action": "load_task_embeddings_no_signatures",
|
|
"task_id": task_id
|
|
})
|
|
|
|
except Exception as e:
|
|
self.logger.log_action({
|
|
"action": "load_task_embeddings_error",
|
|
"task_id": task_id,
|
|
"error": str(e)
|
|
})
|
|
|
|
return embeddings, metadata
|
|
|
|
def rebuild_index(self, force: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Reconstruit l'index FAISS complet à partir de toutes les tâches.
|
|
|
|
Args:
|
|
force: Si True, reconstruit même si l'index n'est pas vide
|
|
|
|
Returns:
|
|
Rapport de reconstruction avec statistiques
|
|
"""
|
|
start_time = datetime.now()
|
|
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_started",
|
|
"force": force,
|
|
"current_index_size": self.embeddings_manager.faiss_index.ntotal
|
|
})
|
|
|
|
# Vérifier si reconstruction nécessaire
|
|
if not force and self.embeddings_manager.faiss_index.ntotal > 0:
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_skipped",
|
|
"reason": "index_not_empty"
|
|
})
|
|
return {
|
|
"success": False,
|
|
"reason": "index_not_empty",
|
|
"current_size": self.embeddings_manager.faiss_index.ntotal
|
|
}
|
|
|
|
# Scanner les tâches
|
|
tasks = self.scan_tasks()
|
|
|
|
if not tasks:
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_no_tasks",
|
|
"reason": "no_tasks_found"
|
|
})
|
|
return {
|
|
"success": False,
|
|
"reason": "no_tasks_found",
|
|
"tasks_scanned": 0
|
|
}
|
|
|
|
# Statistiques
|
|
stats = {
|
|
"tasks_scanned": len(tasks),
|
|
"tasks_processed": 0,
|
|
"tasks_failed": 0,
|
|
"embeddings_added": 0,
|
|
"errors": []
|
|
}
|
|
|
|
# Traiter chaque tâche
|
|
for task_info in tasks:
|
|
task_id = task_info["task_id"]
|
|
|
|
try:
|
|
# Charger les embeddings
|
|
embeddings, metadata = self.load_task_embeddings(task_info)
|
|
|
|
if not embeddings:
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_task_no_embeddings",
|
|
"task_id": task_id
|
|
})
|
|
stats["tasks_failed"] += 1
|
|
continue
|
|
|
|
# Ajouter chaque embedding à l'index
|
|
for i, embedding in enumerate(embeddings):
|
|
try:
|
|
# Valider l'embedding
|
|
if not self._validate_embedding(embedding):
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_invalid_embedding",
|
|
"task_id": task_id,
|
|
"embedding_index": i
|
|
})
|
|
continue
|
|
|
|
# Ajouter à l'index FAISS
|
|
self.embeddings_manager.add_to_index(
|
|
embedding,
|
|
{
|
|
"task_id": task_id,
|
|
"embedding_index": i,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"from_rebuild": True
|
|
}
|
|
)
|
|
stats["embeddings_added"] += 1
|
|
|
|
except Exception as e:
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_embedding_error",
|
|
"task_id": task_id,
|
|
"embedding_index": i,
|
|
"error": str(e)
|
|
})
|
|
stats["errors"].append({
|
|
"task_id": task_id,
|
|
"embedding_index": i,
|
|
"error": str(e)
|
|
})
|
|
|
|
stats["tasks_processed"] += 1
|
|
|
|
except Exception as e:
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_task_error",
|
|
"task_id": task_id,
|
|
"error": str(e)
|
|
})
|
|
stats["tasks_failed"] += 1
|
|
stats["errors"].append({
|
|
"task_id": task_id,
|
|
"error": str(e)
|
|
})
|
|
|
|
# Sauvegarder l'index
|
|
try:
|
|
self.embeddings_manager.save_index()
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_saved",
|
|
"embeddings_count": stats["embeddings_added"]
|
|
})
|
|
except Exception as e:
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_save_error",
|
|
"error": str(e)
|
|
})
|
|
stats["errors"].append({
|
|
"error": f"Failed to save index: {str(e)}"
|
|
})
|
|
|
|
# Calculer la durée
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
stats["success"] = stats["embeddings_added"] > 0
|
|
stats["duration_seconds"] = duration
|
|
stats["final_index_size"] = self.embeddings_manager.faiss_index.ntotal
|
|
|
|
self.logger.log_action({
|
|
"action": "rebuild_index_completed",
|
|
"stats": stats
|
|
})
|
|
|
|
return stats
|
|
|
|
|
|
def verify_index_integrity(self) -> Dict[str, Any]:
|
|
"""
|
|
Vérifie la cohérence entre les tâches sauvegardées et l'index FAISS.
|
|
|
|
Returns:
|
|
Rapport de vérification avec incohérences détectées
|
|
"""
|
|
self.logger.log_action({
|
|
"action": "verify_index_integrity_started"
|
|
})
|
|
|
|
# Scanner les tâches
|
|
tasks = self.scan_tasks()
|
|
|
|
# Compter les embeddings attendus
|
|
expected_embeddings = 0
|
|
tasks_with_embeddings = 0
|
|
|
|
for task_info in tasks:
|
|
embeddings, _ = self.load_task_embeddings(task_info)
|
|
if embeddings:
|
|
expected_embeddings += len(embeddings)
|
|
tasks_with_embeddings += 1
|
|
|
|
# Comparer avec l'index actuel
|
|
actual_embeddings = self.embeddings_manager.faiss_index.ntotal
|
|
|
|
# Déterminer la cohérence
|
|
is_consistent = (actual_embeddings >= expected_embeddings * 0.9) # Tolérance de 10%
|
|
|
|
report = {
|
|
"is_consistent": is_consistent,
|
|
"tasks_scanned": len(tasks),
|
|
"tasks_with_embeddings": tasks_with_embeddings,
|
|
"expected_embeddings": expected_embeddings,
|
|
"actual_embeddings": actual_embeddings,
|
|
"missing_embeddings": max(0, expected_embeddings - actual_embeddings),
|
|
"needs_rebuild": not is_consistent
|
|
}
|
|
|
|
self.logger.log_action({
|
|
"action": "verify_index_integrity_completed",
|
|
"report": report
|
|
})
|
|
|
|
return report
|
|
|
|
def _validate_embedding(self, embedding: np.ndarray) -> bool:
|
|
"""
|
|
Valide qu'un embedding est correct.
|
|
|
|
Args:
|
|
embedding: Embedding à valider
|
|
|
|
Returns:
|
|
True si valide, False sinon
|
|
"""
|
|
try:
|
|
# Vérifier que c'est un numpy array
|
|
if not isinstance(embedding, np.ndarray):
|
|
return False
|
|
|
|
# Vérifier la dimension
|
|
if embedding.shape[0] != self.embeddings_manager.embedding_dim:
|
|
self.logger.log_action({
|
|
"action": "validate_embedding_wrong_dimension",
|
|
"expected": self.embeddings_manager.embedding_dim,
|
|
"actual": embedding.shape[0]
|
|
})
|
|
return False
|
|
|
|
# Vérifier pas de NaN ou Inf
|
|
if np.isnan(embedding).any() or np.isinf(embedding).any():
|
|
self.logger.log_action({
|
|
"action": "validate_embedding_nan_inf"
|
|
})
|
|
return False
|
|
|
|
# Vérifier norme non-nulle
|
|
norm = np.linalg.norm(embedding)
|
|
if norm == 0:
|
|
self.logger.log_action({
|
|
"action": "validate_embedding_zero_norm"
|
|
})
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.log_action({
|
|
"action": "validate_embedding_error",
|
|
"error": str(e)
|
|
})
|
|
return False
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Retourne des statistiques sur l'état actuel.
|
|
|
|
Returns:
|
|
Dictionnaire de statistiques
|
|
"""
|
|
tasks = self.scan_tasks()
|
|
|
|
return {
|
|
"tasks_found": len(tasks),
|
|
"index_size": self.embeddings_manager.faiss_index.ntotal,
|
|
"profiles_path": str(self.profiles_path),
|
|
"profiles_path_exists": self.profiles_path.exists()
|
|
}
|