""" Gestionnaire d'états d'écran pour le système RPA Vision V2. Gère la création et la persistence des EnrichedScreenState en mode light. Phase 1 - Mode Light: Compatibilité arrière complète avec le système existant. """ from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any import numpy as np import json from .ui_element_models import ( EnrichedScreenState, WindowInfo, RawData, PerceptionData, StateEmbedding, ContextData, UIElement ) from .logger import Logger class ScreenStateManager: """ Gestionnaire d'états d'écran enrichis. En mode light (Phase 1): - Crée des EnrichedScreenState avec ui_elements vide - Utilise uniquement l'embedding image (pas de fusion multi-modale) - Assure la compatibilité arrière avec le système existant """ def __init__( self, logger: Logger, data_dir: str = "data", mode: str = "light" ): """ Initialise le gestionnaire d'états d'écran. Args: logger: Logger pour journalisation data_dir: Répertoire de données mode: Mode de traitement ("light", "enriched", "complete") """ self.logger = logger self.data_dir = Path(data_dir) self.mode = mode # Créer les répertoires nécessaires self.screens_dir = self.data_dir / "screens" self.embeddings_dir = self.data_dir / "embeddings" / "screens" self.states_dir = self.data_dir / "screen_states" self.screens_dir.mkdir(parents=True, exist_ok=True) self.embeddings_dir.mkdir(parents=True, exist_ok=True) self.states_dir.mkdir(parents=True, exist_ok=True) self.logger.log_action({ "action": "screen_state_manager_initialized", "mode": self.mode, "data_dir": str(self.data_dir) }) def create_screen_state( self, session_id: str, window_title: str, app_name: str, screenshot_path: str, screen_resolution: tuple, embedding_provider: str = "openclip_ViT-B-32", embedding_vector_id: Optional[str] = None, detected_text: Optional[list] = None, context_tags: Optional[list] = None, workflow_candidate: Optional[str] = None ) -> EnrichedScreenState: """ Crée un EnrichedScreenState en mode light. Args: session_id: ID de session window_title: Titre de la fenêtre app_name: Nom de l'application screenshot_path: Chemin vers le screenshot screen_resolution: Résolution d'écran (width, height) embedding_provider: Provider de l'embedding embedding_vector_id: ID du vecteur d'embedding (généré si None) detected_text: Texte détecté (optionnel) context_tags: Tags de contexte (optionnel) workflow_candidate: Workflow candidat (optionnel) Returns: EnrichedScreenState créé """ # Générer un ID unique pour l'état d'écran timestamp = datetime.now() screen_state_id = f"screen_{timestamp.strftime('%Y%m%d_%H%M%S_%f')}" # Générer l'ID du vecteur d'embedding si non fourni if embedding_vector_id is None: embedding_vector_id = str(self.embeddings_dir / f"{screen_state_id}.npy") # Créer les informations de fenêtre window = WindowInfo( app_name=app_name, window_title=window_title, screen_resolution=screen_resolution ) # Créer les données de perception perception = PerceptionData( detected_text=detected_text or [], ocr_results=None ) # Créer le state embedding (mode light: image uniquement) state_embedding = StateEmbedding( provider=embedding_provider, vector_id=embedding_vector_id, components=None # Pas de composantes en mode light ) # Créer le contexte context = ContextData( current_workflow_candidate=workflow_candidate, tags=context_tags or [], metadata={} ) # Créer l'EnrichedScreenState screen_state = EnrichedScreenState( screen_state_id=screen_state_id, timestamp=timestamp, session_id=session_id, window=window, raw=RawData(screenshot_path=screenshot_path), perception=perception, ui_elements=[], # Vide en mode light state_embedding=state_embedding, context=context, mode=self.mode ) self.logger.log_action({ "action": "screen_state_created", "screen_state_id": screen_state_id, "mode": self.mode, "session_id": session_id, "app_name": app_name }) return screen_state def save_screen_state( self, screen_state: EnrichedScreenState, save_embedding: bool = False, embedding_vector: Optional[np.ndarray] = None ) -> Path: """ Sauvegarde un EnrichedScreenState sur disque. Args: screen_state: État d'écran à sauvegarder save_embedding: Si True, sauvegarde aussi le vecteur d'embedding embedding_vector: Vecteur d'embedding à sauvegarder (si save_embedding=True) Returns: Chemin du fichier JSON créé """ # Créer le fichier JSON state_file = self.states_dir / f"{screen_state.screen_state_id}.json" try: # Sérialiser en JSON json_str = screen_state.to_json() # Écrire le fichier with open(state_file, 'w', encoding='utf-8') as f: f.write(json_str) # Sauvegarder l'embedding si demandé if save_embedding and embedding_vector is not None: embedding_path = Path(screen_state.state_embedding.vector_id) embedding_path.parent.mkdir(parents=True, exist_ok=True) np.save(embedding_path, embedding_vector) self.logger.log_action({ "action": "screen_state_saved", "screen_state_id": screen_state.screen_state_id, "file": str(state_file), "embedding_saved": save_embedding }) return state_file except Exception as e: self.logger.log_action({ "action": "screen_state_save_failed", "screen_state_id": screen_state.screen_state_id, "error": str(e) }) raise def load_screen_state(self, screen_state_id: str) -> Optional[EnrichedScreenState]: """ Charge un EnrichedScreenState depuis le disque. Args: screen_state_id: ID de l'état d'écran à charger Returns: EnrichedScreenState chargé ou None si non trouvé """ state_file = self.states_dir / f"{screen_state_id}.json" if not state_file.exists(): self.logger.log_action({ "action": "screen_state_not_found", "screen_state_id": screen_state_id }) return None try: # Lire le fichier JSON with open(state_file, 'r', encoding='utf-8') as f: json_str = f.read() # Désérialiser screen_state = EnrichedScreenState.from_json(json_str) self.logger.log_action({ "action": "screen_state_loaded", "screen_state_id": screen_state_id, "mode": screen_state.mode }) return screen_state except Exception as e: self.logger.log_action({ "action": "screen_state_load_failed", "screen_state_id": screen_state_id, "error": str(e) }) return None def load_embedding(self, vector_id: str) -> Optional[np.ndarray]: """ Charge un vecteur d'embedding depuis le disque. Args: vector_id: Chemin vers le fichier .npy Returns: Vecteur numpy ou None si non trouvé """ embedding_path = Path(vector_id) if not embedding_path.exists(): self.logger.log_action({ "action": "embedding_not_found", "vector_id": vector_id }) return None try: embedding = np.load(embedding_path) return embedding except Exception as e: self.logger.log_action({ "action": "embedding_load_failed", "vector_id": vector_id, "error": str(e) }) return None def list_screen_states( self, session_id: Optional[str] = None, limit: Optional[int] = None ) -> list: """ Liste les états d'écran disponibles. Args: session_id: Filtrer par session (optionnel) limit: Limiter le nombre de résultats (optionnel) Returns: Liste des screen_state_id """ state_files = sorted(self.states_dir.glob("*.json"), reverse=True) screen_state_ids = [] for state_file in state_files: if limit and len(screen_state_ids) >= limit: break # Si on filtre par session, charger et vérifier if session_id: try: with open(state_file, 'r', encoding='utf-8') as f: data = json.load(f) if data.get("session_id") == session_id: screen_state_ids.append(state_file.stem) except: continue else: screen_state_ids.append(state_file.stem) return screen_state_ids if __name__ == "__main__": # Tests basiques from .logger import Logger print("Test du ScreenStateManager") print("=" * 50) # Créer un logger de test logger = Logger(log_dir="test_logs") # Créer le manager manager = ScreenStateManager( logger=logger, data_dir="test_data", mode="light" ) print("\n1. Test création d'un screen state:") screen_state = manager.create_screen_state( session_id="test_session_001", window_title="Test Window", app_name="test_app", screenshot_path="test_data/screens/test_001.png", screen_resolution=(1920, 1080), detected_text=["Test", "Button"], context_tags=["test"] ) print(f" Screen State ID: {screen_state.screen_state_id}") print(f" Mode: {screen_state.mode}") print(f" Session ID: {screen_state.session_id}") print(f" UI Elements: {len(screen_state.ui_elements)}") print("\n2. Test sauvegarde:") # Créer un embedding de test test_embedding = np.random.rand(512) state_file = manager.save_screen_state( screen_state, save_embedding=True, embedding_vector=test_embedding ) print(f" Saved to: {state_file}") print("\n3. Test chargement:") loaded_state = manager.load_screen_state(screen_state.screen_state_id) if loaded_state: print(f" Loaded screen_state_id: {loaded_state.screen_state_id}") print(f" Loaded mode: {loaded_state.mode}") print(f" Loaded session_id: {loaded_state.session_id}") print("\n4. Test chargement d'embedding:") loaded_embedding = manager.load_embedding(screen_state.state_embedding.vector_id) if loaded_embedding is not None: print(f" Loaded embedding shape: {loaded_embedding.shape}") print(f" Embeddings match: {np.allclose(test_embedding, loaded_embedding)}") print("\n5. Test listage:") state_ids = manager.list_screen_states(session_id="test_session_001") print(f" Found {len(state_ids)} screen states") print("\n✓ Tous les tests ScreenStateManager réussis!") # Nettoyage import shutil if Path("test_data").exists(): shutil.rmtree("test_data") if Path("test_logs").exists(): shutil.rmtree("test_logs")