""" LiveSessionManager — Gestion d'état des sessions de streaming avec persistance disque. Accumule les événements et screenshots reçus de l'Agent V1 en temps réel. Persiste les sessions sur disque (JSON) pour survivre aux redémarrages serveur. Fournit la conversion vers RawSession pour le traitement batch (GraphBuilder). """ import json import logging import threading from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class LiveSessionState: """État d'une session active en mémoire.""" session_id: str machine_id: str = "default" # Identifiant machine (multi-machine) events: List[Dict[str, Any]] = field(default_factory=list) shot_paths: Dict[str, str] = field(default_factory=dict) # shot_id -> file_path last_window_info: Dict[str, str] = field(default_factory=lambda: {"title": "Unknown", "app_name": "unknown"}) created_at: datetime = field(default_factory=datetime.now) last_activity: datetime = field(default_factory=datetime.now) finalized: bool = False # Compteur des titres de fenêtre vus → contextualisation automatique window_titles_seen: Dict[str, int] = field(default_factory=dict) app_names_seen: Dict[str, int] = field(default_factory=dict) def to_dict(self) -> dict: return { "session_id": self.session_id, "machine_id": self.machine_id, "events": self.events, "shot_paths": self.shot_paths, "last_window_info": self.last_window_info, "created_at": self.created_at.isoformat(), "last_activity": self.last_activity.isoformat(), "finalized": self.finalized, "window_titles_seen": self.window_titles_seen, "app_names_seen": self.app_names_seen, } @classmethod def from_dict(cls, data: dict) -> 'LiveSessionState': return cls( session_id=data["session_id"], machine_id=data.get("machine_id", "default"), events=data.get("events", []), shot_paths=data.get("shot_paths", {}), last_window_info=data.get("last_window_info", {"title": "Unknown", "app_name": "unknown"}), created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(), last_activity=datetime.fromisoformat(data["last_activity"]) if data.get("last_activity") else datetime.now(), finalized=data.get("finalized", False), window_titles_seen=data.get("window_titles_seen", {}), app_names_seen=data.get("app_names_seen", {}), ) class LiveSessionManager: """Gère les sessions live en mémoire côté serveur avec persistance disque.""" def __init__(self, persist_dir: str = "data/streaming_sessions", live_sessions_dir: Optional[str] = None): self._sessions: Dict[str, LiveSessionState] = {} self._lock = threading.Lock() self._persist_dir = Path(persist_dir) self._persist_dir.mkdir(parents=True, exist_ok=True) self._dirty: set = set() # Sessions modifiées depuis la dernière sauvegarde self._persist_counter = 0 # Compteur pour limiter la fréquence de persistance self._persist_interval = 10 # Persister toutes les N modifications # Dossier des sessions live (JSONL + screenshots) self._live_sessions_dir = Path(live_sessions_dir) if live_sessions_dir else None # Charger les sessions persistées au démarrage self._load_persisted_sessions() # Reconstruire les sessions depuis les live_events.jsonl sur disque self._discover_sessions_from_disk() def _load_persisted_sessions(self): """Charger les sessions sauvegardées au démarrage (JSON state files).""" count = 0 for session_file in sorted(self._persist_dir.glob("sess_*.json")): try: with open(session_file, 'r', encoding='utf-8') as f: data = json.load(f) session = LiveSessionState.from_dict(data) self._sessions[session.session_id] = session count += 1 except Exception as e: logger.warning(f"Impossible de charger la session {session_file.name}: {e}") if count: logger.info(f"{count} session(s) restaurée(s) depuis {self._persist_dir}") def _discover_sessions_from_disk(self): """Découvrir les sessions depuis les live_events.jsonl sur disque. Reconstruit les sessions manquantes du session_manager en scannant : - live_sessions/sess_*/live_events.jsonl (sessions racine) - live_sessions/{machine_id}/sess_*/live_events.jsonl (multi-machine) Ne touche pas aux sessions déjà chargées depuis le JSON persist. """ if self._live_sessions_dir is None: return live_dir = self._live_sessions_dir if not live_dir.exists(): return discovered = 0 for jsonl_file in sorted(live_dir.glob("**/live_events.jsonl")): session_dir = jsonl_file.parent session_id = session_dir.name if not session_id.startswith("sess_"): continue if session_id in self._sessions: continue # Déduire le machine_id depuis le chemin parent parent_name = session_dir.parent.name if parent_name == live_dir.name: machine_id = "default" else: machine_id = parent_name # Compter events et screenshots events_count = 0 try: with open(jsonl_file, 'r', encoding='utf-8') as f: for _ in f: events_count += 1 except Exception: pass shots_dir = session_dir / "shots" shots_count = len(list(shots_dir.glob("shot_*_full.png"))) if shots_dir.exists() else 0 # Créer la session en mémoire session = LiveSessionState( session_id=session_id, machine_id=machine_id, finalized=False, ) # Stocker le nombre d'events/shots dans les métadonnées session.shot_paths = {f"shot_{i:04d}": "" for i in range(shots_count)} self._sessions[session_id] = session discovered += 1 if discovered: logger.info( f"{discovered} session(s) découverte(s) depuis {live_dir} " f"(total: {len(self._sessions)} sessions en mémoire)" ) def _persist_session(self, session_id: str): """Sauvegarder une session sur disque (appelé périodiquement).""" session = self._sessions.get(session_id) if not session: return try: filepath = self._persist_dir / f"{session_id}.json" with open(filepath, 'w', encoding='utf-8') as f: json.dump(session.to_dict(), f, ensure_ascii=False) except Exception as e: logger.warning(f"Erreur persistance session {session_id}: {e}") def _maybe_persist(self, session_id: str): """Persister si le compteur atteint l'intervalle.""" self._dirty.add(session_id) self._persist_counter += 1 if self._persist_counter >= self._persist_interval: self._persist_counter = 0 for sid in list(self._dirty): self._persist_session(sid) self._dirty.clear() def flush(self): """Forcer la persistance de toutes les sessions dirty.""" with self._lock: for sid in list(self._dirty): self._persist_session(sid) self._dirty.clear() def register_session(self, session_id: str, machine_id: str = "default") -> LiveSessionState: with self._lock: if session_id not in self._sessions: self._sessions[session_id] = LiveSessionState( session_id=session_id, machine_id=machine_id, ) logger.info(f"Session enregistrée: {session_id} (machine={machine_id})") self._persist_session(session_id) else: # Mettre à jour le machine_id si la session existe déjà # (cas de re-register après redémarrage agent) if machine_id != "default": self._sessions[session_id].machine_id = machine_id return self._sessions[session_id] def get_session(self, session_id: str) -> Optional[LiveSessionState]: with self._lock: return self._sessions.get(session_id) def get_or_create(self, session_id: str, machine_id: str = "default") -> LiveSessionState: with self._lock: if session_id not in self._sessions: self._sessions[session_id] = LiveSessionState( session_id=session_id, machine_id=machine_id, ) elif machine_id != "default": self._sessions[session_id].machine_id = machine_id return self._sessions[session_id] def add_event(self, session_id: str, event_data: Dict[str, Any]) -> None: session = self.get_or_create(session_id) with self._lock: session.events.append(event_data) session.last_activity = datetime.now() # Extraire le contexte fenêtre si présent # Format 1 : {"window": {"title": ..., "app_name": ...}} (Python agent) # Format 2 : {"window_title": "...", "screen_resolution": [w, h]} (Rust agent) window = event_data.get("window") if window and isinstance(window, dict): session.last_window_info = window elif event_data.get("window_title"): # Format Rust agent : extraire le titre et la résolution info = { "title": event_data["window_title"], "app_name": session.last_window_info.get("app_name", "unknown"), } # Propager la résolution si fournie par l'agent screen_res = event_data.get("screen_resolution") if screen_res and isinstance(screen_res, list) and len(screen_res) == 2: info["screen_resolution"] = screen_res # Propager les métadonnées d'environnement graphique for meta_key in ("dpi_scale", "monitor_index", "window_bounds", "monitors", "os_theme", "os_language"): meta_val = event_data.get(meta_key) if meta_val is not None: info[meta_key] = meta_val session.last_window_info = info # Exploiter window_capture (envoyé par l'agent avec la capture fenêtre) # pour enrichir last_window_info avec le titre précis de la fenêtre cliquée window_capture = event_data.get("window_capture") if window_capture and isinstance(window_capture, dict): wc_title = window_capture.get("title", "").strip() wc_app = window_capture.get("app_name", "").strip() if wc_title: session.last_window_info["title"] = wc_title if wc_app: session.last_window_info["app_name"] = wc_app # Accumuler les titres/apps pour le nommage automatique title = session.last_window_info.get("title", "").strip() app_name = session.last_window_info.get("app_name", "").strip() if title and title != "Unknown": session.window_titles_seen[title] = session.window_titles_seen.get(title, 0) + 1 if app_name and app_name != "unknown": session.app_names_seen[app_name] = session.app_names_seen.get(app_name, 0) + 1 self._maybe_persist(session_id) def add_screenshot(self, session_id: str, shot_id: str, file_path: str) -> None: session = self.get_or_create(session_id) with self._lock: session.shot_paths[shot_id] = file_path session.last_activity = datetime.now() self._maybe_persist(session_id) def finalize(self, session_id: str) -> Optional[LiveSessionState]: with self._lock: session = self._sessions.get(session_id) if session: session.finalized = True self._persist_session(session_id) return session def remove_session(self, session_id: str) -> None: with self._lock: self._sessions.pop(session_id, None) # Supprimer aussi le fichier persisté filepath = self._persist_dir / f"{session_id}.json" filepath.unlink(missing_ok=True) def to_raw_session(self, session_id: str) -> Optional[dict]: """Convertir une session live en dict compatible RawSession.""" session = self.get_session(session_id) if not session: return None import platform import socket # Construire les événements au format RawSession # Important : copier TOUTES les données de l'événement (pos, text, keys, button...) # car Event.from_dict() met tout sauf t/type/window/screenshot_id dans event.data, # et le GraphBuilder utilise event.data pour construire les actions. events = [] for evt in session.events: # Extraire window info (plusieurs formats possibles) window_raw = evt.get("window") if isinstance(window_raw, dict): window_info = { "title": window_raw.get("title", session.last_window_info.get("title", "")), "app_name": window_raw.get("app_name", session.last_window_info.get("app_name", "unknown")), } else: window_info = { "title": evt.get("window_title", session.last_window_info.get("title", "")), "app_name": evt.get("app_name", session.last_window_info.get("app_name", "unknown")), } raw_event = { "t": evt.get("timestamp", 0), "type": evt.get("type", "unknown"), "window": window_info, "screenshot_id": evt.get("screenshot_id"), } # Copier les données spécifiques au type d'événement # (pos, button, text, keys, etc.) — indispensable pour le replay _skip_keys = {"type", "timestamp", "window", "window_title", "app_name", "screenshot_id", "machine_id", "screen_metadata", "vision_info"} for key, value in evt.items(): if key not in _skip_keys and key not in raw_event: raw_event[key] = value events.append(raw_event) # Construire les screenshots au format RawSession screenshots = [] for shot_id, path in sorted(session.shot_paths.items()): # Ne garder que les full screenshots pour le GraphBuilder if "_crop" in shot_id: continue screenshots.append({ "screenshot_id": shot_id, "relative_path": path, "captured_at": datetime.now().isoformat(), }) # Résolution réelle depuis les events (envoyée par l'agent Rust/Python), # fallback sur 1920x1080 si non disponible screen_res = session.last_window_info.get("screen_resolution", [1920, 1080]) # Métadonnées d'environnement graphique dynamiques screen_info: Dict[str, Any] = {"primary_resolution": screen_res} dpi_scale = session.last_window_info.get("dpi_scale") if dpi_scale is not None: screen_info["dpi_scale"] = dpi_scale monitors = session.last_window_info.get("monitors") if monitors is not None: screen_info["monitors"] = monitors monitor_index = session.last_window_info.get("monitor_index") if monitor_index is not None: screen_info["monitor_index"] = monitor_index env_info: Dict[str, Any] = { "os": platform.system().lower(), "hostname": socket.gethostname(), "machine_id": session.machine_id, "screen": screen_info, } # Propager os_theme / os_language si disponibles os_theme = session.last_window_info.get("os_theme") if os_theme is not None: env_info["os_theme"] = os_theme os_language = session.last_window_info.get("os_language") if os_language is not None: env_info["os_language"] = os_language return { "schema_version": "rawsession_v1", "session_id": session.session_id, "agent_version": "agent_v1_stream", "environment": env_info, "user": {"id": "remote_agent"}, "context": { "workflow": session.last_window_info.get("title", ""), "tags": "streaming,agent_v1", "machine_id": session.machine_id, }, "started_at": session.created_at.isoformat(), "ended_at": datetime.now().isoformat(), "events": events, "screenshots": screenshots, } @property def active_session_count(self) -> int: with self._lock: return sum(1 for s in self._sessions.values() if not s.finalized) @property def session_ids(self) -> List[str]: with self._lock: return list(self._sessions.keys()) def get_sessions_by_machine(self, machine_id: str) -> List[LiveSessionState]: """Retourner toutes les sessions d'une machine donnée.""" with self._lock: return [ s for s in self._sessions.values() if s.machine_id == machine_id ] def cleanup_old_sessions(self, max_age_hours: int = 24) -> int: """Supprimer de la mémoire les sessions finalisées plus vieilles que max_age_hours. Ne supprime PAS les fichiers sur disque (juste la RAM). Les sessions non finalisées (actives) ne sont jamais nettoyées. Args: max_age_hours: Age maximum en heures avant nettoyage (défaut: 24h) Returns: Nombre de sessions nettoyées """ from datetime import timedelta cutoff = datetime.now() - timedelta(hours=max_age_hours) to_remove = [] with self._lock: for sid, session in self._sessions.items(): if session.finalized and session.last_activity < cutoff: to_remove.append(sid) for sid in to_remove: del self._sessions[sid] self._dirty.discard(sid) if to_remove: logger.info( f"Nettoyage mémoire : {len(to_remove)} session(s) finalisée(s) " f"supprimée(s) (> {max_age_hours}h) — fichiers conservés sur disque" ) return len(to_remove) def get_machine_ids(self) -> List[str]: """Retourner la liste des identifiants machines uniques.""" with self._lock: return list(set(s.machine_id for s in self._sessions.values()))