#!/usr/bin/env python3 """ Pipeline de traitement des sessions agent V0 Transforme RawSession → ScreenStates → Embeddings → Workflow Étapes: 1. Charger RawSession 2. Construire ScreenStates (pour chaque event avec screenshot) 3. Générer embeddings (CLIP) 4. Indexer dans FAISS 5. Détecter UI (optionnel, si modèles disponibles) 6. Construire workflow """ import logging import sys from pathlib import Path from datetime import datetime from typing import List, Optional import numpy as np # Ajouter le répertoire parent au path sys.path.insert(0, str(Path(__file__).parent.parent)) from core.models import RawSession from core.models.screen_state import ( ScreenState, WindowContext as ScreenWindowContext, RawLevel, PerceptionLevel, ContextLevel, EmbeddingRef ) from core.models.raw_session import WindowContext from core.persistence import StorageManager # Imports optionnels (si modèles disponibles) try: from core.embedding import CLIPEmbedder, FusionEngine, FAISSManager EMBEDDINGS_AVAILABLE = True except ImportError: logging.warning("Modules embedding non disponibles") EMBEDDINGS_AVAILABLE = False try: from core.detection import UIDetector UI_DETECTION_AVAILABLE = True except ImportError: logging.warning("Module UI detection non disponible") UI_DETECTION_AVAILABLE = False try: from core.graph import GraphBuilder GRAPH_AVAILABLE = True except ImportError: logging.warning("Module graph non disponible") GRAPH_AVAILABLE = False logger = logging.getLogger("processing_pipeline") class ProcessingPipeline: """ Pipeline de traitement des sessions agent. Transforme les données brutes en données exploitables pour RPA Vision V3. """ def __init__(self, base_path: str = "data/training"): """ Initialise le pipeline. Args: base_path: Chemin de base pour le stockage """ self.storage = StorageManager(base_path=base_path) self.base_path = Path(base_path) # Flags pour composants disponibles self.embeddings_available = EMBEDDINGS_AVAILABLE self.ui_detection_available = UI_DETECTION_AVAILABLE self.graph_available = GRAPH_AVAILABLE # Initialiser les composants si disponibles if self.embeddings_available: try: self.clip = CLIPEmbedder() self.fusion = FusionEngine() self.faiss = FAISSManager(dimension=512) logger.info("Embeddings initialisés") except Exception as e: logger.warning(f"Erreur init embeddings: {e}") self.embeddings_available = False if self.ui_detection_available: try: self.ui_detector = UIDetector() logger.info("UI Detector initialisé") except Exception as e: logger.warning(f"Erreur init UI detector: {e}") self.ui_detection_available = False if self.graph_available: try: self.graph_builder = GraphBuilder() logger.info("Graph Builder initialisé") except Exception as e: logger.warning(f"Erreur init graph builder: {e}") self.graph_available = False def process_session(self, session_id: str) -> dict: """ Traite une session complète. Args: session_id: ID de la session à traiter Returns: Dictionnaire avec statistiques du traitement """ logger.info(f"Début traitement session: {session_id}") stats = { "session_id": session_id, "started_at": datetime.now().isoformat(), "screen_states_created": 0, "embeddings_generated": 0, "ui_elements_detected": 0, "workflow_created": False, "errors": [] } try: # 1. Charger RawSession session = self._load_session(session_id) logger.info(f"Session chargée: {len(session.events)} events, {len(session.screenshots)} screenshots") # 2. Construire ScreenStates screen_states = self._build_screen_states(session, stats) logger.info(f"ScreenStates créés: {len(screen_states)}") # 3. Générer embeddings if self.embeddings_available: self._generate_embeddings(screen_states, session_id, stats) logger.info(f"Embeddings générés: {stats['embeddings_generated']}") else: logger.warning("Embeddings non disponibles, étape sautée") # 4. Détecter UI (optionnel) if self.ui_detection_available: self._detect_ui_elements(screen_states, session_id, stats) logger.info(f"UI éléments détectés: {stats['ui_elements_detected']}") else: logger.warning("UI detection non disponible, étape sautée") # 5. Construire workflow (optionnel) if self.graph_available and len(screen_states) > 0: self._build_workflow(screen_states, session, stats) logger.info(f"Workflow créé: {stats['workflow_created']}") else: logger.warning("Graph builder non disponible ou pas de states, étape sautée") stats["completed_at"] = datetime.now().isoformat() stats["status"] = "success" # 6. Nettoyer les fichiers bruts après traitement réussi # Seulement si des screen_states ont été créés (données traitées sauvegardées) if stats["screen_states_created"] > 0: self._cleanup_raw_files(session_id, stats) logger.info("Fichiers bruts nettoyés (embeddings, screen_states, workflows conservés)") else: logger.warning("Aucun screen_state créé, nettoyage annulé pour préserver les données") logger.info(f"Traitement terminé: {session_id}") return stats except Exception as e: logger.exception(f"Erreur traitement session {session_id}: {e}") stats["status"] = "error" stats["errors"].append(str(e)) stats["completed_at"] = datetime.now().isoformat() return stats def _cleanup_raw_files(self, session_id: str, stats: dict) -> None: """ Supprime les fichiers bruts après traitement réussi. Supprime: - Le fichier .enc/.zip uploadé dans uploads/ - Le dossier session extrait dans sessions/ (screenshots bruts) Les données traitées (embeddings, workflows) sont conservées. """ import shutil import os uploads_dir = self.base_path / "uploads" sessions_dir = self.base_path / "sessions" cleaned_files = [] try: # Supprimer le fichier uploadé (.enc ou .zip) for ext in ['.enc', '.zip']: upload_file = uploads_dir / f"{session_id}{ext}" if upload_file.exists(): os.remove(upload_file) cleaned_files.append(str(upload_file)) logger.info(f"Fichier uploadé supprimé: {upload_file}") # Supprimer le dossier session extrait (contient les screenshots bruts) session_dir = sessions_dir / session_id if session_dir.exists() and session_dir.is_dir(): shutil.rmtree(session_dir) cleaned_files.append(str(session_dir)) logger.info(f"Dossier session supprimé: {session_dir}") stats["cleaned_files"] = cleaned_files logger.info(f"Nettoyage terminé: {len(cleaned_files)} éléments supprimés") except Exception as e: logger.warning(f"Erreur lors du nettoyage: {e}") stats["cleanup_error"] = str(e) def _load_session(self, session_id: str) -> RawSession: """Charge la RawSession depuis le stockage.""" # Chercher le fichier JSON session_dir = self.base_path / "sessions" / session_id json_files = list(session_dir.glob("*/*.json")) if not json_files: raise FileNotFoundError(f"Aucun fichier JSON trouvé pour session {session_id}") json_path = json_files[0] return RawSession.load_from_file(json_path) def _build_screen_states(self, session: RawSession, stats: dict) -> List[ScreenState]: """Construit les ScreenStates depuis la RawSession.""" screen_states = [] for i, event in enumerate(session.events): if not event.screenshot_id: continue # Trouver le screenshot correspondant screenshot = next( (s for s in session.screenshots if s.screenshot_id == event.screenshot_id), None ) if not screenshot: logger.warning(f"Screenshot {event.screenshot_id} non trouvé") continue try: # Construire ScreenState state = self._create_screen_state(event, screenshot, session, i) # Sauvegarder self.storage.save_screen_state(state) screen_states.append(state) stats["screen_states_created"] += 1 except Exception as e: logger.error(f"Erreur création ScreenState pour event {i}: {e}") stats["errors"].append(f"ScreenState {i}: {str(e)}") return screen_states def _create_screen_state( self, event, screenshot, session: RawSession, index: int ) -> ScreenState: """Crée un ScreenState depuis un Event.""" # Calculer timestamp relatif timestamp = session.started_at if hasattr(event, 't'): from datetime import timedelta timestamp = session.started_at + timedelta(seconds=event.t) # Raw level # Construire chemin absolu : data/training/sessions/{session_id}/{session_id}/{relative_path} screenshot_absolute_path = f"data/training/sessions/{session.session_id}/{session.session_id}/{screenshot.relative_path}" raw = RawLevel( screenshot_path=screenshot_absolute_path, capture_method="agent_v0", file_size_bytes=self._get_file_size(session.session_id, screenshot.relative_path) ) # Perception level (sera rempli plus tard avec embeddings) # Pour l'instant, créer un embedding ref vide embedding_ref = EmbeddingRef( provider="pending", vector_id=f"emb_{screenshot.screenshot_id}", dimensions=512 ) perception = PerceptionLevel( embedding=embedding_ref, detected_text=[], text_detection_method="none", confidence_avg=1.0 ) # Context level context = ContextLevel( user_id=session.user.get("id", "unknown"), tags=[session.context.get("training_label", "")], business_variables=session.context ) # Window context # event.window peut être un WindowContext ou un dict if isinstance(event.window, WindowContext): app_name = event.window.app_name window_title = event.window.title else: app_name = event.window.get("app_name", "unknown") window_title = event.window.get("title", "unknown") window = ScreenWindowContext( app_name=app_name, window_title=window_title, screen_resolution=session.environment.get("screen", {}).get("primary_resolution", [0, 0]) ) return ScreenState( screen_state_id=f"state_{session.session_id}_{index:04d}", timestamp=timestamp, session_id=session.session_id, window=window, raw=raw, perception=perception, context=context ) def _get_file_size(self, session_id: str, relative_path: str) -> int: """Récupère la taille d'un fichier.""" try: file_path = self.base_path / "sessions" / session_id / relative_path if file_path.exists(): return file_path.stat().st_size except Exception: pass return 0 def _generate_embeddings(self, screen_states: List[ScreenState], session_id: str, stats: dict): """Génère les embeddings pour les ScreenStates.""" for state in screen_states: try: # Chemin du screenshot screenshot_path = self.base_path / "sessions" / session_id / state.raw.screenshot_path if not screenshot_path.exists(): logger.warning(f"Screenshot non trouvé: {screenshot_path}") continue # Générer embedding visuel avec CLIP visual_emb = self.clip.embed_image(str(screenshot_path)) # Pour l'instant, utiliser seulement l'embedding visuel # TODO: Ajouter OCR + embedding texte final_emb = visual_emb # Sauvegarder embedding emb_id = f"emb_{state.screen_state_id}" self.storage.save_embedding( final_emb, embedding_id=emb_id, embedding_type="state", metadata={ "session_id": session_id, "screen_state_id": state.screen_state_id, "provider": "openclip" } ) # Indexer dans FAISS self.faiss.add_vector(final_emb, state.screen_state_id) stats["embeddings_generated"] += 1 except Exception as e: logger.error(f"Erreur génération embedding pour {state.screen_state_id}: {e}") stats["errors"].append(f"Embedding {state.screen_state_id}: {str(e)}") def _detect_ui_elements(self, screen_states: List[ScreenState], session_id: str, stats: dict): """Détecte les éléments UI dans les screenshots.""" for state in screen_states: try: screenshot_path = self.base_path / "sessions" / session_id / state.raw.screenshot_path if not screenshot_path.exists(): continue # Détecter UI elements = self.ui_detector.detect(str(screenshot_path)) stats["ui_elements_detected"] += len(elements) # TODO: Associer les éléments au ScreenState except Exception as e: logger.error(f"Erreur détection UI pour {state.screen_state_id}: {e}") stats["errors"].append(f"UI detection {state.screen_state_id}: {str(e)}") def _build_workflow(self, screen_states: List[ScreenState], session: RawSession, stats: dict): """Construit le workflow depuis les ScreenStates.""" try: # Construire workflow workflow = self.graph_builder.build_from_session(session) # Sauvegarder self.storage.save_workflow(workflow, workflow_name=session.context.get("training_label", "workflow")) stats["workflow_created"] = True except Exception as e: logger.error(f"Erreur construction workflow: {e}") stats["errors"].append(f"Workflow: {str(e)}") # Fonction standalone pour utilisation dans l'API def process_session_async(session_id: str, base_path: str = "data/training") -> dict: """ Traite une session de manière asynchrone. Args: session_id: ID de la session base_path: Chemin de base Returns: Statistiques du traitement """ pipeline = ProcessingPipeline(base_path=base_path) return pipeline.process_session(session_id) if __name__ == "__main__": # Test du pipeline import sys logging.basicConfig(level=logging.INFO) if len(sys.argv) < 2: print("Usage: python processing_pipeline.py ") sys.exit(1) session_id = sys.argv[1] print(f"Traitement de la session: {session_id}") stats = process_session_async(session_id) print("\nRésultats:") print(f" ScreenStates créés: {stats['screen_states_created']}") print(f" Embeddings générés: {stats['embeddings_generated']}") print(f" UI éléments détectés: {stats['ui_elements_detected']}") print(f" Workflow créé: {stats['workflow_created']}") if stats["errors"]: print(f"\nErreurs ({len(stats['errors'])}):") for error in stats["errors"]: print(f" - {error}")