Files
Geniusia_v2/geniusia2/core/screen_state_manager.py
2026-03-05 00:20:25 +01:00

389 lines
13 KiB
Python

"""
Gestionnaire d'états d'écran pour le système RPA Vision V2.
Gère la création et la persistence des EnrichedScreenState en mode light.
Phase 1 - Mode Light: Compatibilité arrière complète avec le système existant.
"""
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import numpy as np
import json
from .ui_element_models import (
EnrichedScreenState,
WindowInfo,
RawData,
PerceptionData,
StateEmbedding,
ContextData,
UIElement
)
from .logger import Logger
class ScreenStateManager:
"""
Gestionnaire d'états d'écran enrichis.
En mode light (Phase 1):
- Crée des EnrichedScreenState avec ui_elements vide
- Utilise uniquement l'embedding image (pas de fusion multi-modale)
- Assure la compatibilité arrière avec le système existant
"""
def __init__(
self,
logger: Logger,
data_dir: str = "data",
mode: str = "light"
):
"""
Initialise le gestionnaire d'états d'écran.
Args:
logger: Logger pour journalisation
data_dir: Répertoire de données
mode: Mode de traitement ("light", "enriched", "complete")
"""
self.logger = logger
self.data_dir = Path(data_dir)
self.mode = mode
# Créer les répertoires nécessaires
self.screens_dir = self.data_dir / "screens"
self.embeddings_dir = self.data_dir / "embeddings" / "screens"
self.states_dir = self.data_dir / "screen_states"
self.screens_dir.mkdir(parents=True, exist_ok=True)
self.embeddings_dir.mkdir(parents=True, exist_ok=True)
self.states_dir.mkdir(parents=True, exist_ok=True)
self.logger.log_action({
"action": "screen_state_manager_initialized",
"mode": self.mode,
"data_dir": str(self.data_dir)
})
def create_screen_state(
self,
session_id: str,
window_title: str,
app_name: str,
screenshot_path: str,
screen_resolution: tuple,
embedding_provider: str = "openclip_ViT-B-32",
embedding_vector_id: Optional[str] = None,
detected_text: Optional[list] = None,
context_tags: Optional[list] = None,
workflow_candidate: Optional[str] = None
) -> EnrichedScreenState:
"""
Crée un EnrichedScreenState en mode light.
Args:
session_id: ID de session
window_title: Titre de la fenêtre
app_name: Nom de l'application
screenshot_path: Chemin vers le screenshot
screen_resolution: Résolution d'écran (width, height)
embedding_provider: Provider de l'embedding
embedding_vector_id: ID du vecteur d'embedding (généré si None)
detected_text: Texte détecté (optionnel)
context_tags: Tags de contexte (optionnel)
workflow_candidate: Workflow candidat (optionnel)
Returns:
EnrichedScreenState créé
"""
# Générer un ID unique pour l'état d'écran
timestamp = datetime.now()
screen_state_id = f"screen_{timestamp.strftime('%Y%m%d_%H%M%S_%f')}"
# Générer l'ID du vecteur d'embedding si non fourni
if embedding_vector_id is None:
embedding_vector_id = str(self.embeddings_dir / f"{screen_state_id}.npy")
# Créer les informations de fenêtre
window = WindowInfo(
app_name=app_name,
window_title=window_title,
screen_resolution=screen_resolution
)
# Créer les données de perception
perception = PerceptionData(
detected_text=detected_text or [],
ocr_results=None
)
# Créer le state embedding (mode light: image uniquement)
state_embedding = StateEmbedding(
provider=embedding_provider,
vector_id=embedding_vector_id,
components=None # Pas de composantes en mode light
)
# Créer le contexte
context = ContextData(
current_workflow_candidate=workflow_candidate,
tags=context_tags or [],
metadata={}
)
# Créer l'EnrichedScreenState
screen_state = EnrichedScreenState(
screen_state_id=screen_state_id,
timestamp=timestamp,
session_id=session_id,
window=window,
raw=RawData(screenshot_path=screenshot_path),
perception=perception,
ui_elements=[], # Vide en mode light
state_embedding=state_embedding,
context=context,
mode=self.mode
)
self.logger.log_action({
"action": "screen_state_created",
"screen_state_id": screen_state_id,
"mode": self.mode,
"session_id": session_id,
"app_name": app_name
})
return screen_state
def save_screen_state(
self,
screen_state: EnrichedScreenState,
save_embedding: bool = False,
embedding_vector: Optional[np.ndarray] = None
) -> Path:
"""
Sauvegarde un EnrichedScreenState sur disque.
Args:
screen_state: État d'écran à sauvegarder
save_embedding: Si True, sauvegarde aussi le vecteur d'embedding
embedding_vector: Vecteur d'embedding à sauvegarder (si save_embedding=True)
Returns:
Chemin du fichier JSON créé
"""
# Créer le fichier JSON
state_file = self.states_dir / f"{screen_state.screen_state_id}.json"
try:
# Sérialiser en JSON
json_str = screen_state.to_json()
# Écrire le fichier
with open(state_file, 'w', encoding='utf-8') as f:
f.write(json_str)
# Sauvegarder l'embedding si demandé
if save_embedding and embedding_vector is not None:
embedding_path = Path(screen_state.state_embedding.vector_id)
embedding_path.parent.mkdir(parents=True, exist_ok=True)
np.save(embedding_path, embedding_vector)
self.logger.log_action({
"action": "screen_state_saved",
"screen_state_id": screen_state.screen_state_id,
"file": str(state_file),
"embedding_saved": save_embedding
})
return state_file
except Exception as e:
self.logger.log_action({
"action": "screen_state_save_failed",
"screen_state_id": screen_state.screen_state_id,
"error": str(e)
})
raise
def load_screen_state(self, screen_state_id: str) -> Optional[EnrichedScreenState]:
"""
Charge un EnrichedScreenState depuis le disque.
Args:
screen_state_id: ID de l'état d'écran à charger
Returns:
EnrichedScreenState chargé ou None si non trouvé
"""
state_file = self.states_dir / f"{screen_state_id}.json"
if not state_file.exists():
self.logger.log_action({
"action": "screen_state_not_found",
"screen_state_id": screen_state_id
})
return None
try:
# Lire le fichier JSON
with open(state_file, 'r', encoding='utf-8') as f:
json_str = f.read()
# Désérialiser
screen_state = EnrichedScreenState.from_json(json_str)
self.logger.log_action({
"action": "screen_state_loaded",
"screen_state_id": screen_state_id,
"mode": screen_state.mode
})
return screen_state
except Exception as e:
self.logger.log_action({
"action": "screen_state_load_failed",
"screen_state_id": screen_state_id,
"error": str(e)
})
return None
def load_embedding(self, vector_id: str) -> Optional[np.ndarray]:
"""
Charge un vecteur d'embedding depuis le disque.
Args:
vector_id: Chemin vers le fichier .npy
Returns:
Vecteur numpy ou None si non trouvé
"""
embedding_path = Path(vector_id)
if not embedding_path.exists():
self.logger.log_action({
"action": "embedding_not_found",
"vector_id": vector_id
})
return None
try:
embedding = np.load(embedding_path)
return embedding
except Exception as e:
self.logger.log_action({
"action": "embedding_load_failed",
"vector_id": vector_id,
"error": str(e)
})
return None
def list_screen_states(
self,
session_id: Optional[str] = None,
limit: Optional[int] = None
) -> list:
"""
Liste les états d'écran disponibles.
Args:
session_id: Filtrer par session (optionnel)
limit: Limiter le nombre de résultats (optionnel)
Returns:
Liste des screen_state_id
"""
state_files = sorted(self.states_dir.glob("*.json"), reverse=True)
screen_state_ids = []
for state_file in state_files:
if limit and len(screen_state_ids) >= limit:
break
# Si on filtre par session, charger et vérifier
if session_id:
try:
with open(state_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get("session_id") == session_id:
screen_state_ids.append(state_file.stem)
except:
continue
else:
screen_state_ids.append(state_file.stem)
return screen_state_ids
if __name__ == "__main__":
# Tests basiques
from .logger import Logger
print("Test du ScreenStateManager")
print("=" * 50)
# Créer un logger de test
logger = Logger(log_dir="test_logs")
# Créer le manager
manager = ScreenStateManager(
logger=logger,
data_dir="test_data",
mode="light"
)
print("\n1. Test création d'un screen state:")
screen_state = manager.create_screen_state(
session_id="test_session_001",
window_title="Test Window",
app_name="test_app",
screenshot_path="test_data/screens/test_001.png",
screen_resolution=(1920, 1080),
detected_text=["Test", "Button"],
context_tags=["test"]
)
print(f" Screen State ID: {screen_state.screen_state_id}")
print(f" Mode: {screen_state.mode}")
print(f" Session ID: {screen_state.session_id}")
print(f" UI Elements: {len(screen_state.ui_elements)}")
print("\n2. Test sauvegarde:")
# Créer un embedding de test
test_embedding = np.random.rand(512)
state_file = manager.save_screen_state(
screen_state,
save_embedding=True,
embedding_vector=test_embedding
)
print(f" Saved to: {state_file}")
print("\n3. Test chargement:")
loaded_state = manager.load_screen_state(screen_state.screen_state_id)
if loaded_state:
print(f" Loaded screen_state_id: {loaded_state.screen_state_id}")
print(f" Loaded mode: {loaded_state.mode}")
print(f" Loaded session_id: {loaded_state.session_id}")
print("\n4. Test chargement d'embedding:")
loaded_embedding = manager.load_embedding(screen_state.state_embedding.vector_id)
if loaded_embedding is not None:
print(f" Loaded embedding shape: {loaded_embedding.shape}")
print(f" Embeddings match: {np.allclose(test_embedding, loaded_embedding)}")
print("\n5. Test listage:")
state_ids = manager.list_screen_states(session_id="test_session_001")
print(f" Found {len(state_ids)} screen states")
print("\n✓ Tous les tests ScreenStateManager réussis!")
# Nettoyage
import shutil
if Path("test_data").exists():
shutil.rmtree("test_data")
if Path("test_logs").exists():
shutil.rmtree("test_logs")