#!/usr/bin/env python3 """ Script utilitaire FAISS Rebuild Propre Auteur : Dom, Alice Kiro - 22 décembre 2025 Script pour déclencher un rebuild complet de l'index FAISS depuis les prototypes stockés dans les workflows. Utilise la stratégie "clear + reindex complet". Usage: python3 rebuild_faiss_simple.py [options] Options: --dry-run Afficher ce qui serait fait sans exécuter --verbose Affichage détaillé --index-type Type d'index FAISS (Flat, IVF) [défaut: Flat] --data-dir Répertoire de données [défaut: data] --help Afficher cette aide """ import sys import argparse import logging from pathlib import Path from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import json # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def setup_logging(verbose: bool = False): """Configurer le niveau de logging""" level = logging.DEBUG if verbose else logging.INFO logging.getLogger().setLevel(level) def load_workflows_from_directory(workflows_dir: Path) -> List[Dict[str, Any]]: """ Charger tous les workflows depuis un répertoire. Args: workflows_dir: Répertoire contenant les fichiers .json de workflows Returns: Liste des workflows chargés avec métadonnées """ workflows = [] if not workflows_dir.exists(): logger.warning(f"Répertoire workflows non trouvé: {workflows_dir}") return workflows for workflow_file in workflows_dir.glob("*.json"): try: with open(workflow_file, 'r', encoding='utf-8') as f: workflow_data = json.load(f) workflows.append({ "file_path": workflow_file, "workflow_id": workflow_data.get("workflow_id", workflow_file.stem), "name": workflow_data.get("name", "Unknown"), "nodes_count": len(workflow_data.get("nodes", [])), "data": workflow_data }) logger.debug(f"Chargé workflow: {workflow_data.get('name', 'Unknown')} ({len(workflow_data.get('nodes', []))} nodes)") except Exception as e: logger.error(f"Erreur chargement workflow {workflow_file}: {e}") continue logger.info(f"Chargé {len(workflows)} workflows depuis {workflows_dir}") return workflows def extract_prototypes_from_workflows(workflows: List[Dict[str, Any]]) -> List[Tuple[str, Any, Dict[str, Any]]]: """ Extraire tous les prototypes de vecteurs depuis les workflows. Args: workflows: Liste des workflows chargés Returns: Liste de tuples (embedding_id, vector, metadata) """ import numpy as np prototypes = [] for workflow in workflows: workflow_id = workflow["workflow_id"] workflow_name = workflow["name"] nodes = workflow["data"].get("nodes", []) logger.debug(f"Extraction prototypes workflow {workflow_name} ({len(nodes)} nodes)") for node in nodes: node_id = node.get("node_id", "unknown") node_name = node.get("name", "") # Essayer différents formats de stockage de prototypes vector = None # Format v1: template.embedding_prototype (liste) template = node.get("template") if template and isinstance(template, dict): embedding_prototype = template.get("embedding_prototype") if isinstance(embedding_prototype, list): try: vector = np.array(embedding_prototype, dtype=np.float32) logger.debug(f"Prototype v1 trouvé pour {node_id}: {len(vector)} dimensions") except Exception as e: logger.debug(f"Erreur conversion prototype v1 {node_id}: {e}") # Format v2: template.embedding.vector_id (fichier) if vector is None: embedding = template.get("embedding") if embedding and isinstance(embedding, dict): vector_id = embedding.get("vector_id") if vector_id and Path(vector_id).exists(): try: vector = np.load(vector_id).astype(np.float32) logger.debug(f"Prototype v2 trouvé pour {node_id}: {len(vector)} dimensions") except Exception as e: logger.debug(f"Erreur chargement prototype v2 {node_id}: {e}") # Format legacy: screen_template.embedding_prototype_path if vector is None: screen_template = node.get("screen_template") if screen_template and isinstance(screen_template, dict): prototype_path = screen_template.get("embedding_prototype_path") if prototype_path and Path(prototype_path).exists(): try: vector = np.load(prototype_path).astype(np.float32) logger.debug(f"Prototype legacy trouvé pour {node_id}: {len(vector)} dimensions") except Exception as e: logger.debug(f"Erreur chargement prototype legacy {node_id}: {e}") # Ajouter à la liste si vecteur trouvé if vector is not None: prototypes.append(( node_id, vector, { "workflow_id": workflow_id, "workflow_name": workflow_name, "node_id": node_id, "node_name": node_name, "vector_dimensions": len(vector) } )) else: logger.debug(f"Aucun prototype trouvé pour node {node_id} (workflow {workflow_name})") logger.info(f"Extrait {len(prototypes)} prototypes depuis {len(workflows)} workflows") return prototypes def rebuild_faiss_index( prototypes: List[Tuple[str, Any, Dict[str, Any]]], index_type: str = "Flat", dimensions: Optional[int] = None, dry_run: bool = False ) -> Dict[str, Any]: """ Reconstruire l'index FAISS avec les prototypes. Args: prototypes: Liste des prototypes à indexer index_type: Type d'index FAISS dimensions: Nombre de dimensions (auto-détecté si None) dry_run: Mode simulation Returns: Résultats du rebuild """ if not prototypes: return { "success": False, "message": "Aucun prototype à indexer", "count": 0 } # Auto-détecter dimensions if dimensions is None: first_vector = prototypes[0][1] dimensions = len(first_vector) logger.info(f"Dimensions auto-détectées: {dimensions}") # Vérifier cohérence des dimensions for embedding_id, vector, metadata in prototypes: if len(vector) != dimensions: logger.warning(f"Dimension incohérente pour {embedding_id}: {len(vector)} != {dimensions}") if dry_run: logger.info("=== MODE DRY-RUN ===") logger.info(f"Créerait index FAISS {index_type} avec {dimensions} dimensions") logger.info(f"Indexerait {len(prototypes)} prototypes:") for embedding_id, vector, metadata in prototypes[:5]: # Afficher les 5 premiers logger.info(f" - {embedding_id}: {metadata.get('workflow_name', 'Unknown')} / {metadata.get('node_name', 'Unknown')}") if len(prototypes) > 5: logger.info(f" ... et {len(prototypes) - 5} autres") return { "success": True, "message": "Simulation réussie", "count": len(prototypes), "dry_run": True } # Rebuild réel try: from core.embedding.faiss_manager import FAISSManager logger.info(f"Création index FAISS {index_type} avec {dimensions} dimensions") manager = FAISSManager( dimensions=dimensions, index_type=index_type, metric="cosine" ) logger.info(f"Rebuild FAISS avec {len(prototypes)} prototypes...") start_time = datetime.now() count = manager.reindex(prototypes, force_train_ivf=True) duration = (datetime.now() - start_time).total_seconds() logger.info(f"Rebuild terminé: {count} prototypes indexés en {duration:.2f}s") # Statistiques finales stats = manager.get_stats() logger.info(f"Index final: {stats['total_vectors']} vecteurs, trained={stats['is_trained']}") return { "success": True, "message": f"Rebuild réussi: {count} prototypes indexés", "count": count, "duration_seconds": duration, "stats": stats } except ImportError as e: return { "success": False, "message": f"FAISS non disponible: {e}", "count": 0 } except Exception as e: logger.error(f"Erreur rebuild FAISS: {e}", exc_info=True) return { "success": False, "message": f"Erreur rebuild: {e}", "count": 0 } def main(): """Point d'entrée principal""" parser = argparse.ArgumentParser( description="Script utilitaire FAISS Rebuild Propre", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemples: python3 rebuild_faiss_simple.py --dry-run python3 rebuild_faiss_simple.py --verbose --index-type IVF python3 rebuild_faiss_simple.py --data-dir /path/to/data """ ) parser.add_argument( "--dry-run", action="store_true", help="Mode simulation - afficher ce qui serait fait sans exécuter" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Affichage détaillé" ) parser.add_argument( "--index-type", choices=["Flat", "IVF"], default="Flat", help="Type d'index FAISS (défaut: Flat)" ) parser.add_argument( "--data-dir", type=Path, default=Path("data"), help="Répertoire de données (défaut: data)" ) args = parser.parse_args() # Configuration setup_logging(args.verbose) logger.info("🔧 FAISS Rebuild Propre - Script utilitaire") logger.info("=" * 60) logger.info(f"Mode: {'DRY-RUN' if args.dry_run else 'EXECUTION'}") logger.info(f"Index type: {args.index_type}") logger.info(f"Data dir: {args.data_dir}") # Étape 1: Charger workflows logger.info("\n1. Chargement des workflows...") workflows_dir = args.data_dir / "workflows" workflows = load_workflows_from_directory(workflows_dir) if not workflows: logger.error("Aucun workflow trouvé. Vérifiez le répertoire de données.") return 1 # Étape 2: Extraire prototypes logger.info("\n2. Extraction des prototypes...") prototypes = extract_prototypes_from_workflows(workflows) if not prototypes: logger.error("Aucun prototype trouvé dans les workflows.") return 1 # Étape 3: Rebuild FAISS logger.info("\n3. Rebuild index FAISS...") result = rebuild_faiss_index( prototypes=prototypes, index_type=args.index_type, dry_run=args.dry_run ) # Résultats logger.info("\n" + "=" * 60) if result["success"]: logger.info(f"✅ {result['message']}") if not args.dry_run: logger.info(f"📊 Statistiques: {result.get('stats', {})}") else: logger.error(f"❌ {result['message']}") return 1 logger.info("🎉 FAISS Rebuild Propre terminé avec succès") return 0 if __name__ == "__main__": sys.exit(main())