Files
rpa_vision_v3/agent_v0/server_v1/live_session_manager.py
Dom ae65be2555 chore: ajouter agent_v0/ au tracking git (était un repo embarqué)
Suppression du .git embarqué dans agent_v0/ — le code est maintenant
tracké normalement dans le repo principal.
Inclut : agent_v1 (client), server_v1 (streaming), lea_ui (chat client)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 11:12:23 +01:00

307 lines
12 KiB
Python

"""
LiveSessionManager — Gestion d'état des sessions de streaming avec persistance disque.
Accumule les événements et screenshots reçus de l'Agent V1 en temps réel.
Persiste les sessions sur disque (JSON) pour survivre aux redémarrages serveur.
Fournit la conversion vers RawSession pour le traitement batch (GraphBuilder).
"""
import json
import logging
import threading
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class LiveSessionState:
"""État d'une session active en mémoire."""
session_id: str
machine_id: str = "default" # Identifiant machine (multi-machine)
events: List[Dict[str, Any]] = field(default_factory=list)
shot_paths: Dict[str, str] = field(default_factory=dict) # shot_id -> file_path
last_window_info: Dict[str, str] = field(default_factory=lambda: {"title": "Unknown", "app_name": "unknown"})
created_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
finalized: bool = False
# Compteur des titres de fenêtre vus → contextualisation automatique
window_titles_seen: Dict[str, int] = field(default_factory=dict)
app_names_seen: Dict[str, int] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"machine_id": self.machine_id,
"events": self.events,
"shot_paths": self.shot_paths,
"last_window_info": self.last_window_info,
"created_at": self.created_at.isoformat(),
"last_activity": self.last_activity.isoformat(),
"finalized": self.finalized,
"window_titles_seen": self.window_titles_seen,
"app_names_seen": self.app_names_seen,
}
@classmethod
def from_dict(cls, data: dict) -> 'LiveSessionState':
return cls(
session_id=data["session_id"],
machine_id=data.get("machine_id", "default"),
events=data.get("events", []),
shot_paths=data.get("shot_paths", {}),
last_window_info=data.get("last_window_info", {"title": "Unknown", "app_name": "unknown"}),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(),
last_activity=datetime.fromisoformat(data["last_activity"]) if data.get("last_activity") else datetime.now(),
finalized=data.get("finalized", False),
window_titles_seen=data.get("window_titles_seen", {}),
app_names_seen=data.get("app_names_seen", {}),
)
class LiveSessionManager:
"""Gère les sessions live en mémoire côté serveur avec persistance disque."""
def __init__(self, persist_dir: str = "data/streaming_sessions"):
self._sessions: Dict[str, LiveSessionState] = {}
self._lock = threading.Lock()
self._persist_dir = Path(persist_dir)
self._persist_dir.mkdir(parents=True, exist_ok=True)
self._dirty: set = set() # Sessions modifiées depuis la dernière sauvegarde
self._persist_counter = 0 # Compteur pour limiter la fréquence de persistance
self._persist_interval = 10 # Persister toutes les N modifications
# Charger les sessions persistées au démarrage
self._load_persisted_sessions()
def _load_persisted_sessions(self):
"""Charger les sessions sauvegardées au démarrage."""
count = 0
for session_file in sorted(self._persist_dir.glob("sess_*.json")):
try:
with open(session_file, 'r', encoding='utf-8') as f:
data = json.load(f)
session = LiveSessionState.from_dict(data)
self._sessions[session.session_id] = session
count += 1
except Exception as e:
logger.warning(f"Impossible de charger la session {session_file.name}: {e}")
if count:
logger.info(f"{count} session(s) restaurée(s) depuis {self._persist_dir}")
def _persist_session(self, session_id: str):
"""Sauvegarder une session sur disque (appelé périodiquement)."""
session = self._sessions.get(session_id)
if not session:
return
try:
filepath = self._persist_dir / f"{session_id}.json"
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(session.to_dict(), f, ensure_ascii=False)
except Exception as e:
logger.debug(f"Erreur persistance session {session_id}: {e}")
def _maybe_persist(self, session_id: str):
"""Persister si le compteur atteint l'intervalle."""
self._dirty.add(session_id)
self._persist_counter += 1
if self._persist_counter >= self._persist_interval:
self._persist_counter = 0
for sid in list(self._dirty):
self._persist_session(sid)
self._dirty.clear()
def flush(self):
"""Forcer la persistance de toutes les sessions dirty."""
with self._lock:
for sid in list(self._dirty):
self._persist_session(sid)
self._dirty.clear()
def register_session(self, session_id: str, machine_id: str = "default") -> LiveSessionState:
with self._lock:
if session_id not in self._sessions:
self._sessions[session_id] = LiveSessionState(
session_id=session_id,
machine_id=machine_id,
)
logger.info(f"Session enregistrée: {session_id} (machine={machine_id})")
self._persist_session(session_id)
else:
# Mettre à jour le machine_id si la session existe déjà
# (cas de re-register après redémarrage agent)
if machine_id != "default":
self._sessions[session_id].machine_id = machine_id
return self._sessions[session_id]
def get_session(self, session_id: str) -> Optional[LiveSessionState]:
with self._lock:
return self._sessions.get(session_id)
def get_or_create(self, session_id: str, machine_id: str = "default") -> LiveSessionState:
with self._lock:
if session_id not in self._sessions:
self._sessions[session_id] = LiveSessionState(
session_id=session_id,
machine_id=machine_id,
)
elif machine_id != "default":
self._sessions[session_id].machine_id = machine_id
return self._sessions[session_id]
def add_event(self, session_id: str, event_data: Dict[str, Any]) -> None:
session = self.get_or_create(session_id)
with self._lock:
session.events.append(event_data)
session.last_activity = datetime.now()
# Extraire le contexte fenêtre si présent
window = event_data.get("window")
if window and isinstance(window, dict):
session.last_window_info = window
# Accumuler les titres/apps pour le nommage automatique
title = window.get("title", "").strip()
app_name = window.get("app_name", "").strip()
if title and title != "Unknown":
session.window_titles_seen[title] = session.window_titles_seen.get(title, 0) + 1
if app_name and app_name != "unknown":
session.app_names_seen[app_name] = session.app_names_seen.get(app_name, 0) + 1
self._maybe_persist(session_id)
def add_screenshot(self, session_id: str, shot_id: str, file_path: str) -> None:
session = self.get_or_create(session_id)
with self._lock:
session.shot_paths[shot_id] = file_path
session.last_activity = datetime.now()
self._maybe_persist(session_id)
def finalize(self, session_id: str) -> Optional[LiveSessionState]:
with self._lock:
session = self._sessions.get(session_id)
if session:
session.finalized = True
self._persist_session(session_id)
return session
def remove_session(self, session_id: str) -> None:
with self._lock:
self._sessions.pop(session_id, None)
# Supprimer aussi le fichier persisté
filepath = self._persist_dir / f"{session_id}.json"
filepath.unlink(missing_ok=True)
def to_raw_session(self, session_id: str) -> Optional[dict]:
"""Convertir une session live en dict compatible RawSession."""
session = self.get_session(session_id)
if not session:
return None
import platform
import socket
# Construire les événements au format RawSession
events = []
for evt in session.events:
window_info = {
"title": evt.get("window_title", session.last_window_info.get("title", "")),
"app_name": evt.get("app_name", session.last_window_info.get("app_name", "unknown")),
}
events.append({
"t": evt.get("timestamp", 0),
"type": evt.get("type", "unknown"),
"window": window_info,
"screenshot_id": evt.get("screenshot_id"),
})
# Construire les screenshots au format RawSession
screenshots = []
for shot_id, path in sorted(session.shot_paths.items()):
# Ne garder que les full screenshots pour le GraphBuilder
if "_crop" in shot_id:
continue
screenshots.append({
"screenshot_id": shot_id,
"relative_path": path,
"captured_at": datetime.now().isoformat(),
})
return {
"schema_version": "rawsession_v1",
"session_id": session.session_id,
"agent_version": "agent_v1_stream",
"environment": {
"os": platform.system().lower(),
"hostname": socket.gethostname(),
"machine_id": session.machine_id,
"screen": {"primary_resolution": [1920, 1080]},
},
"user": {"id": "remote_agent"},
"context": {
"workflow": session.last_window_info.get("title", ""),
"tags": "streaming,agent_v1",
"machine_id": session.machine_id,
},
"started_at": session.created_at.isoformat(),
"ended_at": datetime.now().isoformat(),
"events": events,
"screenshots": screenshots,
}
@property
def active_session_count(self) -> int:
with self._lock:
return sum(1 for s in self._sessions.values() if not s.finalized)
@property
def session_ids(self) -> List[str]:
with self._lock:
return list(self._sessions.keys())
def get_sessions_by_machine(self, machine_id: str) -> List[LiveSessionState]:
"""Retourner toutes les sessions d'une machine donnée."""
with self._lock:
return [
s for s in self._sessions.values()
if s.machine_id == machine_id
]
def cleanup_old_sessions(self, max_age_hours: int = 24) -> int:
"""Supprimer de la mémoire les sessions finalisées plus vieilles que max_age_hours.
Ne supprime PAS les fichiers sur disque (juste la RAM).
Les sessions non finalisées (actives) ne sont jamais nettoyées.
Args:
max_age_hours: Age maximum en heures avant nettoyage (défaut: 24h)
Returns:
Nombre de sessions nettoyées
"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(hours=max_age_hours)
to_remove = []
with self._lock:
for sid, session in self._sessions.items():
if session.finalized and session.last_activity < cutoff:
to_remove.append(sid)
for sid in to_remove:
del self._sessions[sid]
self._dirty.discard(sid)
if to_remove:
logger.info(
f"Nettoyage mémoire : {len(to_remove)} session(s) finalisée(s) "
f"supprimée(s) (> {max_age_hours}h) — fichiers conservés sur disque"
)
return len(to_remove)
def get_machine_ids(self) -> List[str]:
"""Retourner la liste des identifiants machines uniques."""
with self._lock:
return list(set(s.machine_id for s in self._sessions.values()))