Files
rpa_vision_v3/tools/session_cleaner.py
Dom 4f61741420
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 14s
security-audit / pip-audit (CVE dépendances) (push) Successful in 10s
security-audit / Scan secrets (grep) (push) Successful in 8s
tests / Lint (ruff + black) (push) Successful in 13s
tests / Tests unitaires (sans GPU) (push) Failing after 14s
tests / Tests sécurité (critique) (push) Has been skipped
feat: journée 17 avril — tests E2E validés, dashboard fleet+audit, VWB bridge, cleaner C2
Pipeline E2E complet validé :
  Capture VM → streaming → serveur → cleaner → replay → audit trail
  Mode apprentissage supervisé fonctionne (Léa échoue → humain → reprise)

Dashboard :
  - Cleanup 14→10 onglets (RCE supprimée)
  - Fleet : enregistrer/révoquer agents, tokens, ZIP pré-configuré téléchargeable
  - Audit trail MVP (/audit) : filtres, tableau, export CSV, conformité AI Act/RGPD
  - Formulaire Fleet simplifié (nom + email, machine_id auto)

VWB bridge Léa→VWB :
  - Compound décomposés en N steps (saisie + raccourci visibles)
  - Layout serpentin 3 colonnes (plus colonne verticale)
  - Badge OS 🪟/🐧, filtre OS retiré (admin Linux voit Windows)
  - Fix import SQLite readonly

Cleaner intelligent :
  - Descriptions lisibles (UIA/C2) + détection doublons
  - Logique C2 : UIElement identifié = jamais parasite
  - Patterns parasites resserrés
  - Message Léa : "Je n'y arrive pas, montrez-moi comment faire"

Config agent (INC-1 à INC-7) :
  - SERVER_URL + SERVER_BASE unifiés
  - RPA_OLLAMA_HOST séparé
  - allow_redirects=False sur POST
  - Middleware réécriture URL serveur

CI Gitea : fix token + Flask-SocketIO + ruff propre
Fleet endpoints : /agents/enroll|uninstall|fleet + agent_registry SQLite
Backup : script quotidien workflows.db + audit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-17 17:46:40 +02:00

1631 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Session Cleaner -- Outil leger de nettoyage de sessions avant replay.
Petit serveur Flask standalone qui permet de :
- Lister les sessions enregistrees recentes
- Visualiser chaque session avec ses screenshots (crop + full)
- Marquer les clics parasites a supprimer (auto-detection des toasts,
clics droit, fenetres Lea/systray, derniers 3 evenements)
- Re-construire un replay nettoye et l'injecter dans la queue
Option A du rapport audit VWB.
Port : 5006
"""
import json
import logging
import math
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from flask import (
Flask,
redirect,
render_template_string,
request,
send_from_directory,
url_for,
)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
STREAMING_SERVER = os.environ.get("RPA_STREAMING_SERVER", "http://localhost:5005")
LIVE_SESSIONS_DIR = os.environ.get(
"RPA_LIVE_SESSIONS_DIR",
os.path.join(os.path.dirname(__file__), "..", "data", "training", "live_sessions"),
)
PORT = int(os.environ.get("SESSION_CLEANER_PORT", "5006"))
# Charger le token API depuis l'environnement ou .env.local
API_TOKEN = os.environ.get("RPA_API_TOKEN", "")
if not API_TOKEN:
env_local = os.path.join(os.path.dirname(__file__), "..", ".env.local")
if os.path.isfile(env_local):
try:
with open(env_local, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("RPA_API_TOKEN="):
API_TOKEN = line.split("=", 1)[1].strip().strip('"').strip("'")
break
except OSError:
pass
# ---------------------------------------------------------------------------
# Import optionnel de build_replay_from_raw_events
# ---------------------------------------------------------------------------
_build_replay_fn = None
try:
from agent_v0.server_v1.stream_processor import build_replay_from_raw_events
_build_replay_fn = build_replay_from_raw_events
except ImportError:
pass
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("session_cleaner")
# ---------------------------------------------------------------------------
# Application Flask
# ---------------------------------------------------------------------------
app = Flask(__name__)
# ---------------------------------------------------------------------------
# Utilitaires
# ---------------------------------------------------------------------------
# Fenetres considerees comme parasites
# ATTENTION : ces patterns sont compares en lowercase via `in` — ils doivent
# etre suffisamment specifiques pour ne pas attraper de faux positifs dans
# les logiciels metier (DPI, codage, facturation, etc.).
# "program manager" retire volontairement : le bureau Windows est souvent
# le point de depart d'un workflow (clic taskbar, icone, etc.).
# "assistant" retire : trop large (ex: "Assistant de codage PMSI").
# "lea"/"léa" remplace par des patterns specifiques a l'outil Lea RPA.
_PARASITIC_WINDOW_PATTERNS = [
"fenetre de depassement",
"fenêtre de dépassement",
"léa - rpa",
"lea - rpa",
"activer windows",
]
# Types d'evenements exploitables (affiches a l'utilisateur)
_ACTIONABLE_TYPES = frozenset({"mouse_click", "text_input", "key_combo", "key_press", "type"})
def _resolve_sessions_dir() -> Path:
"""Resoudre le repertoire racine des live_sessions."""
return Path(LIVE_SESSIONS_DIR).resolve()
def _discover_sessions(limit: int = 50) -> List[Dict[str, Any]]:
"""Decouvrir les sessions recentes.
Parcourt deux niveaux :
- <base>/<machine_id>/sess_* (format actuel)
- <base>/sess_* (ancien format, sessions au niveau racine)
"""
base = _resolve_sessions_dir()
if not base.is_dir():
logger.warning("Repertoire live_sessions introuvable : %s", base)
return []
sessions: List[Dict[str, Any]] = []
for item in base.iterdir():
if not item.is_dir():
continue
# Sessions directement a la racine (ancien format)
if item.name.startswith("sess_"):
jsonl = item / "live_events.jsonl"
if jsonl.is_file():
sessions.append(_build_session_info("(racine)", item.name, item, jsonl))
continue
# Ignorer les dossiers systeme
if item.name.startswith(".") or item.name in ("embeddings", "streaming_sessions", "workflows", "test_gpu"):
continue
# Sous-dossiers machine_id
for sub in item.iterdir():
if sub.is_dir() and sub.name.startswith("sess_"):
jsonl = sub / "live_events.jsonl"
if jsonl.is_file():
sessions.append(_build_session_info(item.name, sub.name, sub, jsonl))
# Tri par date decroissante (mtime du JSONL)
sessions.sort(key=lambda s: s["mtime"], reverse=True)
return sessions[:limit]
def _build_session_info(machine_id: str, session_id: str, session_dir: Path, jsonl_path: Path) -> Dict[str, Any]:
"""Construire les metadonnees d'une session."""
mtime = jsonl_path.stat().st_mtime
event_count = 0
try:
with open(jsonl_path, encoding="utf-8") as f:
for line in f:
if line.strip():
event_count += 1
except OSError:
pass
# Extraire la date depuis le nom de session (sess_YYYYMMDDTHHMMSS_...)
date_str = ""
try:
parts = session_id.split("_")
if len(parts) >= 2:
raw = parts[1] # 20260410T222352
dt = datetime.strptime(raw, "%Y%m%dT%H%M%S")
date_str = dt.strftime("%d/%m/%Y %H:%M:%S")
except (ValueError, IndexError):
date_str = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S")
return {
"machine_id": machine_id,
"session_id": session_id,
"session_dir": str(session_dir),
"date_str": date_str,
"event_count": event_count,
"mtime": mtime,
}
def _load_events(session_dir: Path) -> List[Dict[str, Any]]:
"""Charger les evenements depuis live_events.jsonl."""
jsonl = session_dir / "live_events.jsonl"
events: List[Dict[str, Any]] = []
if not jsonl.is_file():
return events
try:
with open(jsonl, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except OSError as e:
logger.error("Erreur lecture %s : %s", jsonl, e)
return events
def _get_app_name(event: Dict[str, Any]) -> str:
"""Extraire le nom de l'application depuis l'evenement.
Cherche dans event.event.window.app_name (format actuel).
"""
inner = event.get("event", {})
window = inner.get("window") or {}
if isinstance(window, dict):
return window.get("app_name", "") or ""
return ""
def _has_identified_ui_element(event: Dict[str, Any]) -> bool:
"""Verifier si l'evenement cible un element UI identifie par C2/UIA.
Un clic sur un element UI nomme (bouton, champ, onglet) est tres
probablement un acte metier reel. Les donnees proviennent de :
- uia_snapshot.name : nom de l'element via UI Automation (lea_uia.exe)
- ui_elements : liste d'elements detectes par le pipeline C2 (vision)
- vision_info.ui_elements : idem, format alternatif
ATTENTION : cette fonction ne garantit pas que le clic n'est pas
parasite — un clic systray a aussi un uia_name. C'est un indice
positif, pas une preuve absolue. Utiliser en conjonction avec les
filtres negatifs (systray, clic droit, etc.).
"""
inner = event.get("event", {})
# UIA snapshot — l'element a un nom identifie
uia = inner.get("uia_snapshot") or {}
if uia.get("name"):
return True
# Pipeline C2 — elements visuels detectes
if inner.get("ui_elements"):
return True
vision_info = inner.get("vision_info") or {}
if vision_info.get("ui_elements"):
return True
return False
def _get_window_title(event: Dict[str, Any]) -> str:
"""Extraire le titre de fenetre d'un evenement.
Les evenements plus recents stockent la fenetre dans event.window.title,
les anciens dans event.active_window_title.
"""
inner = event.get("event", {})
# Format actuel : inner.window.title
window = inner.get("window") or {}
if isinstance(window, dict) and window.get("title"):
return window["title"]
# Ancien format
return inner.get("active_window_title", "")
def _get_shot_filename(click_index: int, session_dir: Path) -> Optional[str]:
"""Trouver le fichier screenshot pour un clic donne.
Essaie dans l'ordre :
1. shot_XXXX_crop.png (ancien format)
2. shot_XXXX_full.png (ancien format)
3. res_shot_XXXX.png (format recent — resultat post-action)
``click_index`` est 1-based (premier clic = 1).
"""
shots_dir = session_dir / "shots"
if not shots_dir.is_dir():
return None
shot_id = f"shot_{click_index:04d}"
# Priorite au crop (plus informatif en thumbnail)
for pattern in [f"{shot_id}_crop.png", f"{shot_id}_full.png", f"res_{shot_id}.png"]:
if (shots_dir / pattern).is_file():
return pattern
return None
def _is_parasitic(event: Dict[str, Any], index: int, total: int) -> bool:
"""Determiner si un evenement est probablement parasite.
Logique en 3 couches :
1. Signaux durs (toujours parasites) : types non-exploitables, clics droit
2. Signaux positifs C2/UIA (jamais parasites) : element UI identifie par
nom dans une app metier, ou app connue non-systray
3. Patterns de fenetre (parasites si aucun signal positif)
L'ancienne regle « 3 derniers = parasites » a ete supprimee car elle
generait trop de faux positifs sur les logiciels metier (le dernier
clic est souvent Valider/Sauvegarder/Confirmer).
La detection de l'arret d'enregistrement est maintenant faite par
_is_stop_recording_event() dans _parse_actions().
"""
inner = event.get("event", {})
etype = inner.get("type", "")
# --- Couche 1 : signaux durs, toujours parasites ---
# Types toujours parasites
if etype in ("heartbeat", "focus_change", "window_focus_change", "action_result",
"screenshot", "status", "ping", "pong"):
return True
# Clics droit
if etype == "mouse_click" and inner.get("button") == "right":
return True
# --- Couche 2 : signaux positifs C2/UIA ---
# Si on a un element UI identifie ET que ce n'est pas la systray,
# c'est un vrai clic metier — on le preserve quoi qu'il arrive.
is_systray = _is_systray_interaction(event)
if not is_systray and _has_identified_ui_element(event):
# Un clic sur un bouton/champ/onglet identifie par UIA ou C2
# dans une fenetre qui n'est pas la systray = acte metier reel
return False
# Si l'app n'est pas le bureau/systray, c'est une vraie application
# metier — les clics dedans sont legitimes meme sans info UIA/C2
app_name = _get_app_name(event).lower()
_NON_BUSINESS_APPS = frozenset({
"", "explorer.exe", "pythonw.exe",
})
if app_name and app_name not in _NON_BUSINESS_APPS:
return False
# --- Couche 3 : patterns de fenetre ---
win_title = _get_window_title(event).lower()
if win_title:
for pattern in _PARASITIC_WINDOW_PATTERNS:
if pattern in win_title:
return True
return False
def _is_stop_recording_event(event: Dict[str, Any], is_last_actionable: bool) -> bool:
"""Detecter si un evenement est un arret d'enregistrement Lea.
Plutot que de marquer aveuglement les 3 derniers evenements comme
parasites (ce qui supprime des clics metier importants comme
Valider/Sauvegarder), on detecte finement les patterns d'arret :
- Le dernier evenement exploitable est un key_combo Ctrl+Shift+L
(raccourci explicite d'arret)
- Le dernier evenement est un clic sur une fenetre Lea/systray
(l'utilisateur clique sur l'icone systray pour arreter)
- Clic sur la zone systray (barre des taches, icones cachees, etc.)
identifie par le uia_snapshot
"""
if not is_last_actionable:
return False
inner = event.get("event", {})
etype = inner.get("type", "")
# Raccourci Ctrl+Shift+L → arret explicite
if etype == "key_combo":
keys = inner.get("keys", [])
if isinstance(keys, list):
keys_lower = [str(k).lower() for k in keys]
# Ctrl+Shift+L (le \x0c ou 'l' selon l'encoding)
if "ctrl" in keys_lower and "shift" in keys_lower:
return True
# Clic sur fenetre Lea (pythonw.exe = agent Lea)
if etype == "mouse_click":
window = inner.get("window", {})
if isinstance(window, dict):
app_name = (window.get("app_name", "") or "").lower()
if app_name == "pythonw.exe":
return True
return False
def _is_systray_interaction(event: Dict[str, Any]) -> bool:
"""Detecter si un evenement est une interaction avec la systray.
La systray (zone de notification) est identifiee par :
- Le uia_snapshot contenant 'Afficher les icônes cachées',
'Barre des tâches', etc.
- Le parent_path contenant 'Fenêtre de dépassement'
"""
inner = event.get("event", {})
uia = inner.get("uia_snapshot", {})
if not uia:
return False
uia_name = (uia.get("name", "") or "").lower()
# "afficher les icônes cachées" = bouton systray Windows
if "icônes cachées" in uia_name or "icones cachees" in uia_name:
return True
# Verifier le parent_path pour la systray
parent_path = uia.get("parent_path", [])
if isinstance(parent_path, list):
for parent in parent_path:
if isinstance(parent, dict):
parent_name = (parent.get("name", "") or "").lower()
if "dépassement" in parent_name or "depassement" in parent_name:
return True
return False
# ---------------------------------------------------------------------------
# Alias de raccourcis clavier courants
# ---------------------------------------------------------------------------
_KEY_COMBO_ALIASES: Dict[str, str] = {
"ctrl+s": "Sauvegarde",
"ctrl+z": "Annuler",
"ctrl+y": "Rétablir",
"ctrl+c": "Copier",
"ctrl+v": "Coller",
"ctrl+x": "Couper",
"ctrl+a": "Tout sélectionner",
"ctrl+n": "Nouveau",
"ctrl+o": "Ouvrir",
"ctrl+p": "Imprimer",
"ctrl+f": "Rechercher",
"ctrl+w": "Fermer l'onglet",
"ctrl+shift+l": "Arrêt enregistrement Léa",
"alt+f4": "Fermer la fenêtre",
}
def _normalize_keys_for_alias(keys_raw) -> str:
"""Normaliser les touches pour la recherche d'alias.
Gère les caractères de contrôle (ex: \\x13 = Ctrl+S) et uniformise
en lowercase avec '+' comme séparateur.
Tri : modifiers (ctrl, shift, alt) en premier, puis la touche finale.
"""
if isinstance(keys_raw, str):
keys_list = [keys_raw]
elif isinstance(keys_raw, (list, tuple)):
keys_list = [str(k) for k in keys_raw]
else:
return ""
_MODIFIERS = {"ctrl", "shift", "alt", "meta", "super", "win"}
modifiers = []
others = []
for k in keys_list:
k_clean = k.strip().lower()
# Caractères de contrôle : \x01=a, \x03=c, \x04=d, ..., \x13=s, \x16=v, \x1a=z
if len(k_clean) == 1 and ord(k_clean) < 32:
k_clean = chr(ord(k_clean) + ord('a') - 1)
if k_clean in _MODIFIERS:
modifiers.append(k_clean)
else:
others.append(k_clean)
return "+".join(sorted(modifiers) + sorted(others))
def _generate_description(event: Dict[str, Any]) -> str:
"""Generer une description lisible en français pour un evenement.
Utilise les donnees UIA/C2 quand disponibles, sinon position + fenetre.
"""
inner = event.get("event", {})
etype = inner.get("type", "")
if etype == "mouse_click":
uia = inner.get("uia_snapshot") or {}
uia_name = uia.get("name", "") if uia else ""
uia_ct = uia.get("control_type", "") if uia else ""
if uia_name:
ct_label = f" ({uia_ct})" if uia_ct else ""
return f'Clic sur « {uia_name} »{ct_label}'
else:
pos = inner.get("pos", [])
win_title = _get_window_title(event)
pos_str = f"({pos[0]}, {pos[1]})" if pos and len(pos) >= 2 else "(?)"
if win_title and win_title != "unknown_window":
return f'Clic à {pos_str} dans « {win_title} »'
return f'Clic à {pos_str}'
elif etype in ("text_input", "type"):
text = inner.get("text", "")
if text:
# Tronquer si trop long
display = text if len(text) <= 40 else text[:37] + "..."
return f'Saisie : « {display} »'
return "Saisie (vide)"
elif etype == "key_combo":
keys_raw = inner.get("keys", [])
keys_display = " + ".join(str(k) for k in keys_raw) if isinstance(keys_raw, list) else str(keys_raw)
# Normaliser pour chercher l'alias
norm = _normalize_keys_for_alias(keys_raw)
alias = _KEY_COMBO_ALIASES.get(norm, "")
# Affichage propre des noms de touches
keys_pretty = _pretty_keys(keys_raw)
if alias:
return f"Raccourci : {keys_pretty} ({alias})"
return f"Raccourci : {keys_pretty}"
elif etype == "key_press":
key = inner.get("key", "")
return f"Touche : {_pretty_key(str(key))}"
return etype
def _pretty_keys(keys_raw) -> str:
"""Formater une liste de touches pour l'affichage."""
if isinstance(keys_raw, (list, tuple)):
return "+".join(_pretty_key(str(k)) for k in keys_raw)
return _pretty_key(str(keys_raw))
def _pretty_key(key: str) -> str:
"""Formater une touche individuelle pour l'affichage."""
k = key.strip().lower()
# Caractères de contrôle
if len(k) == 1 and ord(k) < 32:
return chr(ord(k) + ord('A') - 1)
mapping = {
"ctrl": "Ctrl",
"shift": "Shift",
"alt": "Alt",
"enter": "Entrée",
"return": "Entrée",
"tab": "Tab",
"escape": "Échap",
"esc": "Échap",
"space": "Espace",
"backspace": "Retour arrière",
"delete": "Suppr",
"up": "",
"down": "",
"left": "",
"right": "",
}
if k in mapping:
return mapping[k]
# Touches de fonction : f1-f12
if k.startswith("f") and k[1:].isdigit():
return k.upper()
return key.capitalize() if len(key) == 1 else key
# ---------------------------------------------------------------------------
# Detection des doublons
# ---------------------------------------------------------------------------
def _detect_duplicates(actions: List[Dict[str, Any]], events: List[Dict[str, Any]]) -> None:
"""Detecter les actions dupliquees et marquer is_duplicate=True.
Criteres (tous requis) :
- Meme type (mouse_click)
- Meme position (distance euclidienne < 10px)
- Meme fenetre
- Ecart temporel < 1s
Le SECOND evenement du doublon est marque (le premier est preserve).
Ne supprime rien — signale seulement.
"""
for j in range(1, len(actions)):
a_prev = actions[j - 1]
a_curr = actions[j]
# Seulement les mouse_click
if a_prev["type"] != "mouse_click" or a_curr["type"] != "mouse_click":
continue
# Verifier la fenetre
if a_prev["window_title"] != a_curr["window_title"]:
# Autoriser unknown_window comme equivalent
win_a = a_prev["window_title"]
win_b = a_curr["window_title"]
if win_a and win_b and win_a != "unknown_window" and win_b != "unknown_window":
continue
# Verifier la position (distance euclidienne < 10px)
ev_prev = events[a_prev["global_index"]].get("event", {})
ev_curr = events[a_curr["global_index"]].get("event", {})
pos_prev = ev_prev.get("pos", [])
pos_curr = ev_curr.get("pos", [])
if not pos_prev or not pos_curr or len(pos_prev) < 2 or len(pos_curr) < 2:
continue
dx = pos_prev[0] - pos_curr[0]
dy = pos_prev[1] - pos_curr[1]
dist = math.sqrt(dx * dx + dy * dy)
if dist >= 10:
continue
# Verifier l'ecart temporel < 1s
ts_prev = ev_prev.get("timestamp", 0)
ts_curr = ev_curr.get("timestamp", 0)
dt = abs(ts_curr - ts_prev)
if dt >= 1.0:
continue
# C'est un doublon
a_curr["is_duplicate"] = True
a_curr["duplicate_info"] = (
f"Doublon (< {dt:.0f}s, {dist:.0f}px de distance)"
if dist > 0
else f"Doublon (< {dt:.1f}s, même position)"
)
def _parse_actions(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict[str, Any]]:
"""Convertir les evenements bruts en liste d'actions affichables.
Retourne une liste de dicts avec : index_global, type, position, fenetre,
texte, touches, shot_file, is_parasitic, description, is_duplicate, etc.
"""
actions: List[Dict[str, Any]] = []
click_count = 0
total_events = len(events)
# Pre-calculer le dernier indice d'evenement exploitable
# pour la detection fine de l'arret d'enregistrement
actionable_indices = [
i for i, ev in enumerate(events)
if ev.get("event", {}).get("type", "") in _ACTIONABLE_TYPES
]
last_actionable_idx = actionable_indices[-1] if actionable_indices else -1
for i, event in enumerate(events):
inner = event.get("event", {})
etype = inner.get("type", "")
# Ne montrer que les evenements exploitables
if etype not in _ACTIONABLE_TYPES:
continue
action: Dict[str, Any] = {
"global_index": i,
"type": etype,
"position": "",
"window_title": _get_window_title(event),
"text": "",
"keys": "",
"shot_file": None,
"is_parasitic": False,
"description": _generate_description(event),
"is_duplicate": False,
"duplicate_info": "",
}
# Position (pour les clics)
pos = inner.get("pos")
if pos and isinstance(pos, (list, tuple)) and len(pos) >= 2:
action["position"] = f"({pos[0]}, {pos[1]})"
# Bouton de clic
if etype == "mouse_click":
action["button"] = inner.get("button", "left")
click_count += 1
action["shot_file"] = _get_shot_filename(click_count, session_dir)
action["click_number"] = click_count
# Texte tape
if etype in ("text_input", "type"):
action["text"] = inner.get("text", "")
# Touches pour key_combo / key_press
if etype in ("key_combo", "key_press"):
keys = inner.get("keys", [])
if isinstance(keys, list):
action["keys"] = " + ".join(str(k) for k in keys)
else:
action["keys"] = str(inner.get("key", keys))
# Detection parasite — logique centralisee + extensions
parasitic = _is_parasitic(event, i, total_events)
# Extensions specifiques a _parse_actions :
# - interaction systray (icones cachees, fenetre depassement)
if not parasitic and _is_systray_interaction(event):
parasitic = True
# - arret d'enregistrement (dernier evenement exploitable uniquement)
is_last = (i == last_actionable_idx)
if not parasitic and _is_stop_recording_event(event, is_last):
parasitic = True
action["is_parasitic"] = parasitic
actions.append(action)
# Detection des doublons (marque is_duplicate sur le 2eme du pair)
_detect_duplicates(actions, events)
return actions
# ---------------------------------------------------------------------------
# Templates HTML
# ---------------------------------------------------------------------------
_BASE_CSS = """
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
margin: 0; padding: 20px; background: #f5f5f5; color: #333; }
h1 { color: #2c3e50; border-bottom: 2px solid #3498db; padding-bottom: 10px; }
h2 { color: #34495e; }
a { color: #2980b9; text-decoration: none; }
a:hover { text-decoration: underline; }
table { border-collapse: collapse; width: 100%; background: white; border-radius: 6px;
overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,0.12); }
th { background: #2c3e50; color: white; padding: 12px 15px; text-align: left; }
td { padding: 10px 15px; border-bottom: 1px solid #eee; }
tr:hover { background: #f0f7ff; }
.btn { display: inline-block; padding: 10px 20px; background: #e74c3c; color: white;
border: none; border-radius: 4px; cursor: pointer; font-size: 14px; }
.btn:hover { background: #c0392b; }
.btn-secondary { background: #3498db; }
.btn-secondary:hover { background: #2980b9; }
.info-box { background: #eaf4ff; border: 1px solid #b8d4f0; border-radius: 6px;
padding: 15px; margin: 15px 0; }
.warning-box { background: #fff3cd; border: 1px solid #ffc107; border-radius: 6px;
padding: 15px; margin: 15px 0; }
.success-box { background: #d4edda; border: 1px solid #28a745; border-radius: 6px;
padding: 15px; margin: 15px 0; }
.error-box { background: #f8d7da; border: 1px solid #dc3545; border-radius: 6px;
padding: 15px; margin: 15px 0; }
.parasitic { background: #ffe0e0; }
.normal { background: #e0ffe0; }
.duplicate { background: #f0f0f0; color: #999; }
.duplicate td { color: #999; }
.desc { font-size: 13px; color: #555; max-width: 300px; }
.badge-dup { display: inline-block; background: #ddd; color: #888; font-size: 11px;
padding: 2px 6px; border-radius: 3px; margin-left: 4px; cursor: help; }
.counter { font-size: 18px; font-weight: bold; margin: 15px 0; }
.counter .remove { color: #e74c3c; }
.counter .total { color: #2c3e50; }
img.thumb { max-height: 80px; border: 1px solid #ccc; border-radius: 4px; cursor: pointer; }
img.thumb:hover { box-shadow: 0 2px 8px rgba(0,0,0,0.3); }
.nav { margin-bottom: 20px; }
.mono { font-family: 'Fira Code', 'Consolas', monospace; font-size: 13px; }
label { cursor: pointer; }
"""
_INDEX_TEMPLATE = """<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Session Cleaner -- Lea</title>
<style>{{ css }}</style>
</head>
<body>
<h1>Session Cleaner</h1>
<p>Outil de nettoyage des sessions avant replay. Selectionnez une session pour voir ses actions.</p>
{% if sessions %}
<table>
<thead>
<tr>
<th>Date</th>
<th>Machine</th>
<th>Session ID</th>
<th>Evenements</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for s in sessions %}
<tr>
<td>{{ s.date_str }}</td>
<td class="mono">{{ s.machine_id }}</td>
<td class="mono">{{ s.session_id }}</td>
<td>{{ s.event_count }}</td>
<td><a href="{{ url_for('view_session', machine_id=s.machine_id, session_id=s.session_id) }}"
class="btn btn-secondary">Voir</a></td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="warning-box">
<p>Aucune session trouvee dans <code>{{ sessions_dir }}</code>.</p>
<p>Lancez un enregistrement depuis l'Agent V1 pour creer des sessions.</p>
</div>
{% endif %}
</body>
</html>"""
_SESSION_TEMPLATE = """<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Session {{ session_id }} -- Session Cleaner</title>
<style>{{ css }}</style>
<style>
.overlay { display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%;
background: rgba(0,0,0,0.8); z-index: 1000; justify-content: center;
align-items: center; }
.overlay.active { display: flex; }
.overlay img { max-width: 90%; max-height: 90%; border-radius: 8px; }
</style>
</head>
<body>
<div class="nav">
<a href="{{ url_for('index') }}">&larr; Retour a la liste</a>
</div>
<h1>Session : <span class="mono">{{ session_id }}</span></h1>
<div class="info-box">
<strong>Machine :</strong> {{ machine_id }} |
<strong>Date :</strong> {{ date_str }} |
<strong>Evenements bruts :</strong> {{ total_events }}
</div>
{% if actions %}
<div class="counter" id="counter">
<span class="remove" id="remove-count">{{ parasitic_count }}</span> actions a supprimer /
<span class="total">{{ actions|length }}</span> total
{% if duplicate_count > 0 %}
| <span style="color:#999">{{ duplicate_count }} doublon{{ 's' if duplicate_count > 1 else '' }}</span>
{% endif %}
</div>
<form method="POST" action="{{ url_for('clean_and_replay') }}" id="clean-form">
<input type="hidden" name="session_id" value="{{ session_id }}">
<input type="hidden" name="machine_id" value="{{ machine_id }}">
<table>
<thead>
<tr>
<th>Supprimer</th>
<th>#</th>
<th>Type</th>
<th>Description</th>
<th>Position</th>
<th>Fenetre</th>
<th>Texte / Touches</th>
<th>Screenshot</th>
</tr>
</thead>
<tbody>
{% for a in actions %}
<tr class="{{ 'parasitic' if a.is_parasitic else ('duplicate' if a.is_duplicate else 'normal') }}">
<td>
<label>
<input type="checkbox" name="remove_indices"
value="{{ a.global_index }}"
{{ 'checked' if a.is_parasitic else '' }}
onchange="updateCounter()">
</label>
</td>
<td>{{ loop.index }}</td>
<td class="mono">
{{ a.type }}
{% if a.button is defined and a.button == 'right' %}
<span style="color:#e74c3c">(droit)</span>
{% endif %}
{% if a.is_duplicate %}
<span class="badge-dup" title="{{ a.duplicate_info }}">doublon</span>
{% endif %}
</td>
<td class="desc">{{ a.description }}</td>
<td class="mono">{{ a.position }}</td>
<td>{{ a.window_title|truncate(40) }}</td>
<td class="mono">
{% if a.text %}{{ a.text|truncate(60) }}{% endif %}
{% if a.keys %}{{ a.keys }}{% endif %}
</td>
<td>
{% if a.shot_file %}
<img src="{{ url_for('serve_shot', machine_id=machine_id, session_id=session_id, filename=a.shot_file) }}"
class="thumb"
alt="Screenshot action {{ loop.index }}"
onclick="showOverlay(this.src)"
loading="lazy">
{% else %}
<span style="color:#aaa">--</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
<div style="margin-top: 20px; display: flex; gap: 10px; align-items: center;">
<button type="submit" class="btn" id="btn-replay">
Nettoyer et relancer le replay
</button>
<button type="button" class="btn btn-secondary" onclick="selectAll(true)">
Tout cocher
</button>
<button type="button" class="btn btn-secondary" onclick="selectAll(false)">
Tout decocher
</button>
<button type="button" class="btn btn-secondary" onclick="resetParasitic()">
Reinitialiser (auto-detection)
</button>
</div>
</form>
{% else %}
<div class="warning-box">
Aucune action exploitable dans cette session.
</div>
{% endif %}
<!-- Overlay pour zoom screenshot -->
<div class="overlay" id="overlay" onclick="this.classList.remove('active')">
<img id="overlay-img" src="" alt="Screenshot agrandi">
</div>
<script>
// Indices des evenements auto-detectes comme parasites (pour reset)
var parasiticIndices = {{ parasitic_indices|tojson }};
function updateCounter() {
var checked = document.querySelectorAll('input[name="remove_indices"]:checked').length;
document.getElementById('remove-count').textContent = checked;
// Mettre a jour les couleurs des lignes
document.querySelectorAll('input[name="remove_indices"]').forEach(function(cb) {
var tr = cb.closest('tr');
tr.className = cb.checked ? 'parasitic' : 'normal';
});
}
function selectAll(state) {
document.querySelectorAll('input[name="remove_indices"]').forEach(function(cb) {
cb.checked = state;
});
updateCounter();
}
function resetParasitic() {
document.querySelectorAll('input[name="remove_indices"]').forEach(function(cb) {
cb.checked = parasiticIndices.indexOf(parseInt(cb.value)) !== -1;
});
updateCounter();
}
function showOverlay(src) {
document.getElementById('overlay-img').src = src;
document.getElementById('overlay').classList.add('active');
}
</script>
</body>
</html>"""
_RESULT_TEMPLATE = """<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Replay lance -- Session Cleaner</title>
<style>{{ css }}</style>
</head>
<body>
<div class="nav">
<a href="{{ url_for('index') }}">&larr; Retour a la liste</a>
</div>
<h1>Replay lance</h1>
{% if success %}
<div class="success-box">
<p><strong>Replay demarre avec succes.</strong></p>
<p><strong>Replay ID :</strong> <code>{{ replay_id }}</code></p>
<p><strong>Session :</strong> <code>{{ session_id }}</code></p>
<p><strong>Machine cible :</strong> <code>{{ machine_id }}</code></p>
<p><strong>Actions injectees :</strong> {{ action_count }}</p>
<p><strong>Actions supprimees :</strong> {{ removed_count }}</p>
</div>
{% else %}
<div class="error-box">
<p><strong>Erreur lors du lancement du replay.</strong></p>
<p>{{ error_message }}</p>
</div>
{% endif %}
</body>
</html>"""
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.route("/")
def index():
"""Page d'accueil : liste des sessions recentes."""
sessions = _discover_sessions(limit=50)
return render_template_string(
_INDEX_TEMPLATE,
sessions=sessions,
sessions_dir=str(_resolve_sessions_dir()),
css=_BASE_CSS,
)
@app.route("/session/<machine_id>/<session_id>")
def view_session(machine_id: str, session_id: str):
"""Vue detaillee d'une session avec ses actions."""
session_dir = _find_session_dir(machine_id, session_id)
if session_dir is None:
return render_template_string(
"""<!DOCTYPE html><html lang="fr"><head><meta charset="utf-8">
<title>Session introuvable</title><style>{{ css }}</style></head>
<body><div class="error-box"><p>Session <code>{{ sid }}</code>
introuvable pour la machine <code>{{ mid }}</code>.</p></div>
<a href="{{ url_for('index') }}">Retour</a></body></html>""",
sid=session_id, mid=machine_id, css=_BASE_CSS,
), 404
events = _load_events(session_dir)
actions = _parse_actions(events, session_dir)
# Compter les parasites, doublons et collecter les indices globaux
parasitic_count = sum(1 for a in actions if a["is_parasitic"])
parasitic_indices = [a["global_index"] for a in actions if a["is_parasitic"]]
duplicate_count = sum(1 for a in actions if a.get("is_duplicate"))
# Date depuis le nom de session
date_str = ""
try:
parts = session_id.split("_")
if len(parts) >= 2:
dt = datetime.strptime(parts[1], "%Y%m%dT%H%M%S")
date_str = dt.strftime("%d/%m/%Y %H:%M:%S")
except (ValueError, IndexError):
date_str = "?"
return render_template_string(
_SESSION_TEMPLATE,
session_id=session_id,
machine_id=machine_id,
date_str=date_str,
total_events=len(events),
actions=actions,
parasitic_count=parasitic_count,
parasitic_indices=parasitic_indices,
duplicate_count=duplicate_count,
css=_BASE_CSS,
)
@app.route("/shots/<machine_id>/<session_id>/<filename>")
def serve_shot(machine_id: str, session_id: str, filename: str):
"""Servir un fichier screenshot."""
session_dir = _find_session_dir(machine_id, session_id)
if session_dir is None:
return "Session introuvable", 404
shots_dir = session_dir / "shots"
if not shots_dir.is_dir():
return "Repertoire shots introuvable", 404
# Securite : empecher la traversee de repertoire
safe_name = Path(filename).name
if safe_name != filename:
return "Nom de fichier invalide", 400
target = shots_dir / safe_name
if not target.is_file():
return "Fichier introuvable", 404
return send_from_directory(str(shots_dir), safe_name, mimetype="image/png")
@app.route("/clean-and-replay", methods=["POST"])
def clean_and_replay():
"""Nettoyer les evenements et lancer un replay."""
session_id = request.form.get("session_id", "")
machine_id = request.form.get("machine_id", "")
remove_indices_raw = request.form.getlist("remove_indices")
# Convertir les indices en entiers
remove_indices = set()
for idx_str in remove_indices_raw:
try:
remove_indices.add(int(idx_str))
except ValueError:
continue
# Trouver le repertoire de session
session_dir = _find_session_dir(machine_id, session_id)
if session_dir is None:
return render_template_string(
_RESULT_TEMPLATE,
success=False,
error_message=f"Session {session_id} introuvable pour la machine {machine_id}.",
replay_id="", session_id=session_id, machine_id=machine_id,
action_count=0, removed_count=0, css=_BASE_CSS,
)
# Charger les evenements et filtrer
all_events = _load_events(session_dir)
cleaned_events = [
ev for i, ev in enumerate(all_events)
if i not in remove_indices
]
removed_count = len(all_events) - len(cleaned_events)
logger.info(
"Nettoyage session %s : %d evenements -> %d (suppression de %d)",
session_id, len(all_events), len(cleaned_events), removed_count,
)
# Construire les actions de replay
replay_actions = None
error_message = ""
# Pour le session_cleaner, on utilise TOUJOURS le fallback simple.
# build_replay_from_raw_events transforme les events (réordonne,
# injecte du setup "ouvrir l'app", fusionne des actions) — ce qui
# décale les clics par rapport à l'enregistrement original.
# Le fallback simple reproduit les events 1:1 en coords brutes,
# ce qui est exactement ce qu'on veut pour "nettoyer et rejouer".
if not replay_actions:
# Fallback : filtrage simple et conversion directe.
# Se declenche si build_replay_from_raw_events a crashe OU
# retourne une liste vide OU n'est pas disponible.
try:
replay_actions = _simple_build_replay(cleaned_events, session_dir)
logger.info("Fallback simple_build_replay a produit %d actions", len(replay_actions))
error_message = "" # le fallback a reussi, on efface l'erreur precedente
except Exception as e:
logger.error("Erreur fallback simple_build_replay : %s", e)
error_message = f"Erreur lors de la construction du replay (fallback) : {e}"
if not replay_actions:
if not error_message:
error_message = "Aucune action exploitable apres nettoyage."
return render_template_string(
_RESULT_TEMPLATE,
success=False, error_message=error_message,
replay_id="", session_id=session_id, machine_id=machine_id,
action_count=0, removed_count=removed_count, css=_BASE_CSS,
)
# Envoyer au streaming server
replay_id = f"replay_clean_{uuid.uuid4().hex[:8]}"
try:
import requests as _requests
headers = {"Content-Type": "application/json"}
if API_TOKEN:
headers["Authorization"] = f"Bearer {API_TOKEN}"
payload = {
"session_id": session_id,
"actions": replay_actions,
"machine_id": machine_id if machine_id != "(racine)" else "",
"task_description": f"Replay nettoye de {session_id} ({removed_count} actions supprimees)",
}
resp = _requests.post(
f"{STREAMING_SERVER}/api/v1/traces/stream/replay/raw",
json=payload,
headers=headers,
timeout=30,
)
if resp.status_code == 200:
data = resp.json()
replay_id = data.get("replay_id", replay_id)
logger.info("Replay lance : %s (%d actions)", replay_id, len(replay_actions))
return render_template_string(
_RESULT_TEMPLATE,
success=True, replay_id=replay_id,
session_id=session_id, machine_id=machine_id,
action_count=len(replay_actions), removed_count=removed_count,
error_message="", css=_BASE_CSS,
)
else:
error_message = f"Serveur streaming a repondu {resp.status_code} : {resp.text[:300]}"
logger.error("Erreur POST replay : %s", error_message)
except ImportError:
error_message = (
"Module 'requests' non disponible. "
"Installez-le avec : pip install requests"
)
except Exception as e:
error_message = f"Erreur de connexion au serveur streaming ({STREAMING_SERVER}) : {e}"
logger.error("Erreur connexion streaming : %s", e)
return render_template_string(
_RESULT_TEMPLATE,
success=False, error_message=error_message,
replay_id="", session_id=session_id, machine_id=machine_id,
action_count=0, removed_count=removed_count, css=_BASE_CSS,
)
# ---------------------------------------------------------------------------
# Helpers internes
# ---------------------------------------------------------------------------
def _find_session_dir(machine_id: str, session_id: str) -> Optional[Path]:
"""Trouver le repertoire d'une session.
Cherche dans :
1. <base>/<machine_id>/<session_id>/
2. <base>/<session_id>/ (ancien format, racine)
"""
base = _resolve_sessions_dir()
# Sous machine_id
if machine_id and machine_id != "(racine)":
candidate = base / machine_id / session_id
if candidate.is_dir():
return candidate
# Directement a la racine
candidate = base / session_id
if candidate.is_dir():
return candidate
# Recherche exhaustive (au cas ou le machine_id a change)
for item in base.iterdir():
if item.is_dir() and not item.name.startswith("."):
candidate = item / session_id
if candidate.is_dir():
return candidate
return None
def _load_crop_as_base64(session_dir: Path, screenshot_id: str) -> str:
"""Charger un crop screenshot et le retourner en base64.
Le crop (80x80 autour du clic) sert d'ancre pour le template matching —
le GroundingEngine compare cette vignette a l'ecran actuel via OpenCV.
"""
if not screenshot_id:
return ""
crop_path = session_dir / "shots" / f"{screenshot_id}_crop.png"
if not crop_path.is_file():
return ""
try:
import base64
data = crop_path.read_bytes()
return base64.b64encode(data).decode("ascii")
except Exception:
return ""
def _build_vlm_description(
uia_snapshot: Dict[str, Any], window_info: Dict[str, Any],
) -> str:
"""Construire une description naturelle pour le VLM.
Le VLM recoit cette phrase + le screenshot actuel et doit localiser
l'element decrit. Plus la description est precise, meilleur le grounding.
"""
name = uia_snapshot.get("name", "")
control_type = uia_snapshot.get("control_type", "")
window_title = window_info.get("title", "") if window_info else ""
parts = []
if control_type:
parts.append(f"le {control_type}")
if name:
parts.append(f"'{name}'")
if window_title and window_title != "unknown_window":
parts.append(f"dans la fenetre '{window_title}'")
if parts:
return " ".join(parts)
return ""
def _build_full_target_spec(
event: Dict[str, Any], session_dir: Path,
) -> Dict[str, Any]:
"""Construire un target_spec complet pour la cascade de resolution visuelle.
Exploite TOUTES les donnees capturees pendant l'enregistrement :
- uia_snapshot → resolution UIA locale (lea_uia.exe, 10-20ms)
- crop screenshot → template matching OpenCV (~100ms)
- nom UIA + window_title → OCR docTR + VLM grounding (1-5s)
La cascade : UIA → template → serveur (docTR+VLM) → VLM local.
Si tout echoue → pause supervisee (pas de clic aveugle).
"""
uia_snapshot = event.get("uia_snapshot", {})
window_info = event.get("window", {})
vision_info = event.get("vision_info", {})
screenshot_id = event.get("screenshot_id", "")
name = uia_snapshot.get("name", "") if uia_snapshot else ""
control_type = uia_snapshot.get("control_type", "") if uia_snapshot else ""
automation_id = uia_snapshot.get("automation_id", "") if uia_snapshot else ""
parent_path = uia_snapshot.get("parent_path", []) if uia_snapshot else []
window_title = window_info.get("title", "") if window_info else ""
# Cascade de resolution — UIA d'abord (rapide), puis vision
resolve_order = []
# UIA : disponible si on a un nom ou automation_id
has_uia = bool(name or automation_id)
if has_uia:
resolve_order.append("uia")
# Template matching : disponible si on a un crop
anchor_b64 = _load_crop_as_base64(session_dir, screenshot_id)
if anchor_b64:
resolve_order.append("template")
# Serveur (docTR OCR + SomEngine + VLM) : toujours en fallback
resolve_order.append("server")
# VLM local : dernier recours
resolve_order.append("vlm_local")
if not resolve_order:
return {}
target_spec: Dict[str, Any] = {
"resolve_order": resolve_order,
"window_title": window_title,
}
# UIA target
if has_uia:
target_spec["uia_target"] = {
"name": name,
"control_type": control_type,
"automation_id": automation_id,
"parent_path": parent_path,
}
# Anchor pour template matching
if anchor_b64:
target_spec["anchor_image_base64"] = anchor_b64
# Texte pour OCR (docTR sur le serveur)
if name:
target_spec["by_text"] = name
# Description VLM
vlm_desc = _build_vlm_description(uia_snapshot or {}, window_info or {})
if vlm_desc:
target_spec["vlm_description"] = vlm_desc
return target_spec
def _build_desktop_cleanup_actions(screen_w: int, screen_h: int) -> List[Dict[str, Any]]:
"""Construire les actions de nettoyage bureau AVANT le replay.
Sur Windows 11, un clic sur l'extreme droite de la barre des taches
(le pixel invisible 'Afficher le bureau') minimise toutes les fenetres.
C'est exactement ce qu'un humain ferait avant de commencer un travail :
repartir d'un bureau propre.
100% visuel — pas de raccourci clavier injecte (cf feedback_100pct_visual).
"""
# Le bouton 'Afficher le bureau' est au pixel tout en bas a droite
# de la taskbar. Sur Win11, c'est une fine bande cliquable.
x_pct = round((screen_w - 2) / screen_w, 6) # avant-dernier pixel
y_pct = round((screen_h - 2) / screen_h, 6) # idem vertical
return [
{
"action_id": f"act_setup_desktop_{uuid.uuid4().hex[:6]}",
"type": "click",
"x_pct": x_pct,
"y_pct": y_pct,
"button": "left",
"visual_mode": False, # position fixe, pas besoin de grounding
"wait_before": 0.3,
"_setup_action": True, # marqueur pour le distinguer des vrais clics
},
{
"action_id": f"act_setup_wait_{uuid.uuid4().hex[:6]}",
"type": "wait",
"duration_ms": 1000,
"wait_before": 0,
"_setup_action": True,
},
]
def _simple_build_replay(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict[str, Any]]:
"""Construire un replay visuel depuis les evenements bruts.
Chaque clic est enrichi avec un target_spec complet qui alimente
la cascade de resolution du GroundingEngine :
UIA local (10ms) → template matching (100ms) → serveur docTR/VLM (2-5s)
Les coordonnees x_pct/y_pct sont incluses comme hint de derniere chance.
Lea ne clique pas en aveugle — elle VOIT l'ecran et CHERCHE l'element.
Le replay commence par un nettoyage du bureau (clic 'Afficher le bureau')
pour partir d'un etat propre — exactement comme un humain.
"""
actions: List[Dict[str, Any]] = []
click_count = 0
# Essayer d'extraire la resolution d'ecran
screen_w, screen_h = 1920, 1080
for ev in events:
inner = ev.get("event", {})
meta = inner.get("screen_metadata", {})
res = meta.get("screen_resolution")
if res and isinstance(res, (list, tuple)) and len(res) >= 2:
screen_w, screen_h = int(res[0]), int(res[1])
break
# ── Étape 0 : nettoyer le bureau ──
actions.extend(_build_desktop_cleanup_actions(screen_w, screen_h))
for ev in events:
inner = ev.get("event", {})
etype = inner.get("type", "")
if etype not in _ACTIONABLE_TYPES:
continue
action_id = f"act_clean_{uuid.uuid4().hex[:6]}"
if etype == "mouse_click":
pos = inner.get("pos", [0, 0])
click_count += 1
x_pct = round(pos[0] / screen_w, 6) if screen_w else 0.0
y_pct = round(pos[1] / screen_h, 6) if screen_h else 0.0
action = {
"action_id": action_id,
"type": "click",
"x_pct": x_pct,
"y_pct": y_pct,
"button": inner.get("button", "left"),
"wait_before": 0.5,
}
# Enrichir avec la cascade visuelle complete
target_spec = _build_full_target_spec(inner, session_dir)
if target_spec:
action["visual_mode"] = True
action["target_spec"] = target_spec
uia_name = inner.get("uia_snapshot", {}).get("name", "?")
methods = target_spec.get("resolve_order", [])
logger.info(
"Action %s enrichie [%s] : '%s' (%s)",
action_id, "+".join(methods), uia_name,
inner.get("uia_snapshot", {}).get("control_type", "?"),
)
else:
# Pas de donnee visuelle du tout → coords brutes en dernier recours
action["visual_mode"] = False
logger.warning("Action %s : aucune donnee visuelle, coords brutes", action_id)
actions.append(action)
elif etype in ("text_input", "type"):
text = inner.get("text", "")
if text:
action = {
"action_id": action_id,
"type": "type",
"text": text,
"wait_before": 0.3,
}
actions.append(action)
elif etype in ("key_combo", "key_press"):
keys = inner.get("keys", [])
if isinstance(keys, str):
keys = [keys]
key_single = inner.get("key", "")
if not keys and key_single:
keys = [key_single]
if keys:
action = {
"action_id": action_id,
"type": "key_combo",
"keys": keys,
"wait_before": 0.3,
}
actions.append(action)
# ── Étape finale : détecter les blocs conditionnels (dialogues) ──
# Quand le window_title change entre deux actions, les actions dans
# la nouvelle fenêtre sont conditionnelles : elles ne s'exécutent que
# si le dialogue apparaît effectivement au replay.
# Ex: Ctrl+S → "Enregistrer sous" (conditionnel) → retour app
actions = _mark_conditional_blocks(actions, events)
return actions
def _mark_conditional_blocks(
actions: List[Dict[str, Any]], events: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Marquer les actions qui appartiennent a un dialogue conditionnel.
Detecte les dialogues systeme transitoires (Enregistrer sous, Ouvrir,
Confirmer, etc.) qui n'apparaissent que dans certains contextes.
Au replay, si le dialogue n'est pas present → skip tout le bloc.
Methode : un dialogue systeme est une fenetre qui :
1. N'a PAS de separateur " " ou " - " (pas une app)
2. N'apparait que pour 1-3 actions consecutives
3. Est encadree par des actions dans une vraie app
"""
# Extraire le window_title de chaque evenement actionnable
event_windows: List[str] = []
for ev in events:
inner = ev.get("event", {})
etype = inner.get("type", "")
if etype not in _ACTIONABLE_TYPES:
continue
win = inner.get("window", {}).get("title", "")
event_windows.append(win)
def _is_app_window(title):
"""True si le titre ressemble a une fenetre d'application (pas un dialogue)."""
if not title or title == "unknown_window":
return False
# Les apps ont un separateur : "fichier.txt Bloc-notes"
return any(sep in title for sep in [" ", " - ", ""])
def _is_known_dialog(title):
"""True si le titre est un dialogue systeme connu."""
if not title:
return False
title_lower = title.lower().strip()
dialog_patterns = (
"enregistrer sous", "save as",
"ouvrir", "open",
"imprimer", "print",
"confirmer", "confirmation", "confirm",
"voulez-vous", "do you want",
"avertissement", "warning",
"erreur", "error",
"propriétés", "properties",
)
return any(p in title_lower for p in dialog_patterns)
# Parcourir les actions et marquer les dialogues
action_idx = 0
n_setup = sum(1 for a in actions if a.get("_setup_action"))
for i, action in enumerate(actions):
if action.get("_setup_action"):
continue
if action_idx >= len(event_windows):
break
win = event_windows[action_idx]
action_idx += 1
if not win or win == "unknown_window":
continue
# Marquer si c'est un dialogue connu OU une fenetre sans separateur app
# entouree de fenetres d'app (transitoire)
if _is_known_dialog(win):
action["conditional_on_window"] = win
logger.debug(
"Action %s conditionnelle (dialogue connu) : '%s'",
action.get("action_id", "?"), win,
)
# Log resume
n_conditional = sum(1 for a in actions if a.get("conditional_on_window"))
if n_conditional:
logger.info(
"Blocs conditionnels : %d actions sur %d marquees comme dialogues",
n_conditional, len(actions) - n_setup,
)
return actions
# ---------------------------------------------------------------------------
# Point d'entree
# ---------------------------------------------------------------------------
def main():
"""Demarrer le serveur Session Cleaner."""
import argparse
parser = argparse.ArgumentParser(
description="Session Cleaner -- Nettoyage de sessions avant replay",
)
parser.add_argument(
"--port", type=int, default=PORT,
help=f"Port du serveur (defaut: {PORT})",
)
parser.add_argument(
"--host", default="0.0.0.0",
help="Adresse d'ecoute (defaut: 0.0.0.0)",
)
parser.add_argument(
"--debug", action="store_true",
help="Mode debug Flask",
)
args = parser.parse_args()
logger.info("Session Cleaner demarre sur http://%s:%d", args.host, args.port)
logger.info("Repertoire sessions : %s", _resolve_sessions_dir())
logger.info("Serveur streaming : %s", STREAMING_SERVER)
logger.info("Token API : %s", "configure" if API_TOKEN else "non configure")
app.run(host=args.host, port=args.port, debug=args.debug)
if __name__ == "__main__":
main()