feat: journée 17 avril — tests E2E validés, dashboard fleet+audit, VWB bridge, cleaner C2
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 14s
security-audit / pip-audit (CVE dépendances) (push) Successful in 10s
security-audit / Scan secrets (grep) (push) Successful in 8s
tests / Lint (ruff + black) (push) Successful in 13s
tests / Tests unitaires (sans GPU) (push) Failing after 14s
tests / Tests sécurité (critique) (push) Has been skipped

Pipeline E2E complet validé :
  Capture VM → streaming → serveur → cleaner → replay → audit trail
  Mode apprentissage supervisé fonctionne (Léa échoue → humain → reprise)

Dashboard :
  - Cleanup 14→10 onglets (RCE supprimée)
  - Fleet : enregistrer/révoquer agents, tokens, ZIP pré-configuré téléchargeable
  - Audit trail MVP (/audit) : filtres, tableau, export CSV, conformité AI Act/RGPD
  - Formulaire Fleet simplifié (nom + email, machine_id auto)

VWB bridge Léa→VWB :
  - Compound décomposés en N steps (saisie + raccourci visibles)
  - Layout serpentin 3 colonnes (plus colonne verticale)
  - Badge OS 🪟/🐧, filtre OS retiré (admin Linux voit Windows)
  - Fix import SQLite readonly

Cleaner intelligent :
  - Descriptions lisibles (UIA/C2) + détection doublons
  - Logique C2 : UIElement identifié = jamais parasite
  - Patterns parasites resserrés
  - Message Léa : "Je n'y arrive pas, montrez-moi comment faire"

Config agent (INC-1 à INC-7) :
  - SERVER_URL + SERVER_BASE unifiés
  - RPA_OLLAMA_HOST séparé
  - allow_redirects=False sur POST
  - Middleware réécriture URL serveur

CI Gitea : fix token + Flask-SocketIO + ruff propre
Fleet endpoints : /agents/enroll|uninstall|fleet + agent_registry SQLite
Backup : script quotidien workflows.db + audit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-17 17:46:40 +02:00
parent 2fa864b5c7
commit 4f61741420
27 changed files with 5088 additions and 1543 deletions

View File

@@ -15,6 +15,7 @@ Port : 5006
import json
import logging
import math
import os
import uuid
from datetime import datetime
@@ -86,13 +87,18 @@ app = Flask(__name__)
# ---------------------------------------------------------------------------
# Fenetres considerees comme parasites
# ATTENTION : ces patterns sont compares en lowercase via `in` — ils doivent
# etre suffisamment specifiques pour ne pas attraper de faux positifs dans
# les logiciels metier (DPI, codage, facturation, etc.).
# "program manager" retire volontairement : le bureau Windows est souvent
# le point de depart d'un workflow (clic taskbar, icone, etc.).
# "assistant" retire : trop large (ex: "Assistant de codage PMSI").
# "lea"/"léa" remplace par des patterns specifiques a l'outil Lea RPA.
_PARASITIC_WINDOW_PATTERNS = [
"program manager",
"fenetre de depassement",
"fenêtre de dépassement",
"léa",
"lea",
"assistant",
"léa - rpa",
"lea - rpa",
"activer windows",
]
@@ -199,6 +205,50 @@ def _load_events(session_dir: Path) -> List[Dict[str, Any]]:
return events
def _get_app_name(event: Dict[str, Any]) -> str:
"""Extraire le nom de l'application depuis l'evenement.
Cherche dans event.event.window.app_name (format actuel).
"""
inner = event.get("event", {})
window = inner.get("window") or {}
if isinstance(window, dict):
return window.get("app_name", "") or ""
return ""
def _has_identified_ui_element(event: Dict[str, Any]) -> bool:
"""Verifier si l'evenement cible un element UI identifie par C2/UIA.
Un clic sur un element UI nomme (bouton, champ, onglet) est tres
probablement un acte metier reel. Les donnees proviennent de :
- uia_snapshot.name : nom de l'element via UI Automation (lea_uia.exe)
- ui_elements : liste d'elements detectes par le pipeline C2 (vision)
- vision_info.ui_elements : idem, format alternatif
ATTENTION : cette fonction ne garantit pas que le clic n'est pas
parasite — un clic systray a aussi un uia_name. C'est un indice
positif, pas une preuve absolue. Utiliser en conjonction avec les
filtres negatifs (systray, clic droit, etc.).
"""
inner = event.get("event", {})
# UIA snapshot — l'element a un nom identifie
uia = inner.get("uia_snapshot") or {}
if uia.get("name"):
return True
# Pipeline C2 — elements visuels detectes
if inner.get("ui_elements"):
return True
vision_info = inner.get("vision_info") or {}
if vision_info.get("ui_elements"):
return True
return False
def _get_window_title(event: Dict[str, Any]) -> str:
"""Extraire le titre de fenetre d'un evenement.
@@ -241,15 +291,23 @@ def _get_shot_filename(click_index: int, session_dir: Path) -> Optional[str]:
def _is_parasitic(event: Dict[str, Any], index: int, total: int) -> bool:
"""Determiner si un evenement est probablement parasite.
Criteres :
- Fenetre contenant un pattern parasite (systray, Program Manager, Lea, etc.)
- Clic droit
- Types non-exploitables (heartbeat, focus_change, action_result)
- Parmi les 3 derniers evenements (souvent = arret enregistrement)
Logique en 3 couches :
1. Signaux durs (toujours parasites) : types non-exploitables, clics droit
2. Signaux positifs C2/UIA (jamais parasites) : element UI identifie par
nom dans une app metier, ou app connue non-systray
3. Patterns de fenetre (parasites si aucun signal positif)
L'ancienne regle « 3 derniers = parasites » a ete supprimee car elle
generait trop de faux positifs sur les logiciels metier (le dernier
clic est souvent Valider/Sauvegarder/Confirmer).
La detection de l'arret d'enregistrement est maintenant faite par
_is_stop_recording_event() dans _parse_actions().
"""
inner = event.get("event", {})
etype = inner.get("type", "")
# --- Couche 1 : signaux durs, toujours parasites ---
# Types toujours parasites
if etype in ("heartbeat", "focus_change", "window_focus_change", "action_result",
"screenshot", "status", "ping", "pong"):
@@ -259,37 +317,333 @@ def _is_parasitic(event: Dict[str, Any], index: int, total: int) -> bool:
if etype == "mouse_click" and inner.get("button") == "right":
return True
# Fenetre parasite
# --- Couche 2 : signaux positifs C2/UIA ---
# Si on a un element UI identifie ET que ce n'est pas la systray,
# c'est un vrai clic metier — on le preserve quoi qu'il arrive.
is_systray = _is_systray_interaction(event)
if not is_systray and _has_identified_ui_element(event):
# Un clic sur un bouton/champ/onglet identifie par UIA ou C2
# dans une fenetre qui n'est pas la systray = acte metier reel
return False
# Si l'app n'est pas le bureau/systray, c'est une vraie application
# metier — les clics dedans sont legitimes meme sans info UIA/C2
app_name = _get_app_name(event).lower()
_NON_BUSINESS_APPS = frozenset({
"", "explorer.exe", "pythonw.exe",
})
if app_name and app_name not in _NON_BUSINESS_APPS:
return False
# --- Couche 3 : patterns de fenetre ---
win_title = _get_window_title(event).lower()
if win_title:
for pattern in _PARASITIC_WINDOW_PATTERNS:
if pattern in win_title:
return True
# Derniers 3 evenements exploitables de la session
# (on les marque UNIQUEMENT si c'est un evenement exploitable, pas un heartbeat)
if etype in _ACTIONABLE_TYPES and index >= total - 3:
return True
return False
def _is_stop_recording_event(event: Dict[str, Any], is_last_actionable: bool) -> bool:
"""Detecter si un evenement est un arret d'enregistrement Lea.
Plutot que de marquer aveuglement les 3 derniers evenements comme
parasites (ce qui supprime des clics metier importants comme
Valider/Sauvegarder), on detecte finement les patterns d'arret :
- Le dernier evenement exploitable est un key_combo Ctrl+Shift+L
(raccourci explicite d'arret)
- Le dernier evenement est un clic sur une fenetre Lea/systray
(l'utilisateur clique sur l'icone systray pour arreter)
- Clic sur la zone systray (barre des taches, icones cachees, etc.)
identifie par le uia_snapshot
"""
if not is_last_actionable:
return False
inner = event.get("event", {})
etype = inner.get("type", "")
# Raccourci Ctrl+Shift+L → arret explicite
if etype == "key_combo":
keys = inner.get("keys", [])
if isinstance(keys, list):
keys_lower = [str(k).lower() for k in keys]
# Ctrl+Shift+L (le \x0c ou 'l' selon l'encoding)
if "ctrl" in keys_lower and "shift" in keys_lower:
return True
# Clic sur fenetre Lea (pythonw.exe = agent Lea)
if etype == "mouse_click":
window = inner.get("window", {})
if isinstance(window, dict):
app_name = (window.get("app_name", "") or "").lower()
if app_name == "pythonw.exe":
return True
return False
def _is_systray_interaction(event: Dict[str, Any]) -> bool:
"""Detecter si un evenement est une interaction avec la systray.
La systray (zone de notification) est identifiee par :
- Le uia_snapshot contenant 'Afficher les icônes cachées',
'Barre des tâches', etc.
- Le parent_path contenant 'Fenêtre de dépassement'
"""
inner = event.get("event", {})
uia = inner.get("uia_snapshot", {})
if not uia:
return False
uia_name = (uia.get("name", "") or "").lower()
# "afficher les icônes cachées" = bouton systray Windows
if "icônes cachées" in uia_name or "icones cachees" in uia_name:
return True
# Verifier le parent_path pour la systray
parent_path = uia.get("parent_path", [])
if isinstance(parent_path, list):
for parent in parent_path:
if isinstance(parent, dict):
parent_name = (parent.get("name", "") or "").lower()
if "dépassement" in parent_name or "depassement" in parent_name:
return True
return False
# ---------------------------------------------------------------------------
# Alias de raccourcis clavier courants
# ---------------------------------------------------------------------------
_KEY_COMBO_ALIASES: Dict[str, str] = {
"ctrl+s": "Sauvegarde",
"ctrl+z": "Annuler",
"ctrl+y": "Rétablir",
"ctrl+c": "Copier",
"ctrl+v": "Coller",
"ctrl+x": "Couper",
"ctrl+a": "Tout sélectionner",
"ctrl+n": "Nouveau",
"ctrl+o": "Ouvrir",
"ctrl+p": "Imprimer",
"ctrl+f": "Rechercher",
"ctrl+w": "Fermer l'onglet",
"ctrl+shift+l": "Arrêt enregistrement Léa",
"alt+f4": "Fermer la fenêtre",
}
def _normalize_keys_for_alias(keys_raw) -> str:
"""Normaliser les touches pour la recherche d'alias.
Gère les caractères de contrôle (ex: \\x13 = Ctrl+S) et uniformise
en lowercase avec '+' comme séparateur.
Tri : modifiers (ctrl, shift, alt) en premier, puis la touche finale.
"""
if isinstance(keys_raw, str):
keys_list = [keys_raw]
elif isinstance(keys_raw, (list, tuple)):
keys_list = [str(k) for k in keys_raw]
else:
return ""
_MODIFIERS = {"ctrl", "shift", "alt", "meta", "super", "win"}
modifiers = []
others = []
for k in keys_list:
k_clean = k.strip().lower()
# Caractères de contrôle : \x01=a, \x03=c, \x04=d, ..., \x13=s, \x16=v, \x1a=z
if len(k_clean) == 1 and ord(k_clean) < 32:
k_clean = chr(ord(k_clean) + ord('a') - 1)
if k_clean in _MODIFIERS:
modifiers.append(k_clean)
else:
others.append(k_clean)
return "+".join(sorted(modifiers) + sorted(others))
def _generate_description(event: Dict[str, Any]) -> str:
"""Generer une description lisible en français pour un evenement.
Utilise les donnees UIA/C2 quand disponibles, sinon position + fenetre.
"""
inner = event.get("event", {})
etype = inner.get("type", "")
if etype == "mouse_click":
uia = inner.get("uia_snapshot") or {}
uia_name = uia.get("name", "") if uia else ""
uia_ct = uia.get("control_type", "") if uia else ""
if uia_name:
ct_label = f" ({uia_ct})" if uia_ct else ""
return f'Clic sur « {uia_name} »{ct_label}'
else:
pos = inner.get("pos", [])
win_title = _get_window_title(event)
pos_str = f"({pos[0]}, {pos[1]})" if pos and len(pos) >= 2 else "(?)"
if win_title and win_title != "unknown_window":
return f'Clic à {pos_str} dans « {win_title} »'
return f'Clic à {pos_str}'
elif etype in ("text_input", "type"):
text = inner.get("text", "")
if text:
# Tronquer si trop long
display = text if len(text) <= 40 else text[:37] + "..."
return f'Saisie : « {display} »'
return "Saisie (vide)"
elif etype == "key_combo":
keys_raw = inner.get("keys", [])
keys_display = " + ".join(str(k) for k in keys_raw) if isinstance(keys_raw, list) else str(keys_raw)
# Normaliser pour chercher l'alias
norm = _normalize_keys_for_alias(keys_raw)
alias = _KEY_COMBO_ALIASES.get(norm, "")
# Affichage propre des noms de touches
keys_pretty = _pretty_keys(keys_raw)
if alias:
return f"Raccourci : {keys_pretty} ({alias})"
return f"Raccourci : {keys_pretty}"
elif etype == "key_press":
key = inner.get("key", "")
return f"Touche : {_pretty_key(str(key))}"
return etype
def _pretty_keys(keys_raw) -> str:
"""Formater une liste de touches pour l'affichage."""
if isinstance(keys_raw, (list, tuple)):
return "+".join(_pretty_key(str(k)) for k in keys_raw)
return _pretty_key(str(keys_raw))
def _pretty_key(key: str) -> str:
"""Formater une touche individuelle pour l'affichage."""
k = key.strip().lower()
# Caractères de contrôle
if len(k) == 1 and ord(k) < 32:
return chr(ord(k) + ord('A') - 1)
mapping = {
"ctrl": "Ctrl",
"shift": "Shift",
"alt": "Alt",
"enter": "Entrée",
"return": "Entrée",
"tab": "Tab",
"escape": "Échap",
"esc": "Échap",
"space": "Espace",
"backspace": "Retour arrière",
"delete": "Suppr",
"up": "",
"down": "",
"left": "",
"right": "",
}
if k in mapping:
return mapping[k]
# Touches de fonction : f1-f12
if k.startswith("f") and k[1:].isdigit():
return k.upper()
return key.capitalize() if len(key) == 1 else key
# ---------------------------------------------------------------------------
# Detection des doublons
# ---------------------------------------------------------------------------
def _detect_duplicates(actions: List[Dict[str, Any]], events: List[Dict[str, Any]]) -> None:
"""Detecter les actions dupliquees et marquer is_duplicate=True.
Criteres (tous requis) :
- Meme type (mouse_click)
- Meme position (distance euclidienne < 10px)
- Meme fenetre
- Ecart temporel < 1s
Le SECOND evenement du doublon est marque (le premier est preserve).
Ne supprime rien — signale seulement.
"""
for j in range(1, len(actions)):
a_prev = actions[j - 1]
a_curr = actions[j]
# Seulement les mouse_click
if a_prev["type"] != "mouse_click" or a_curr["type"] != "mouse_click":
continue
# Verifier la fenetre
if a_prev["window_title"] != a_curr["window_title"]:
# Autoriser unknown_window comme equivalent
win_a = a_prev["window_title"]
win_b = a_curr["window_title"]
if win_a and win_b and win_a != "unknown_window" and win_b != "unknown_window":
continue
# Verifier la position (distance euclidienne < 10px)
ev_prev = events[a_prev["global_index"]].get("event", {})
ev_curr = events[a_curr["global_index"]].get("event", {})
pos_prev = ev_prev.get("pos", [])
pos_curr = ev_curr.get("pos", [])
if not pos_prev or not pos_curr or len(pos_prev) < 2 or len(pos_curr) < 2:
continue
dx = pos_prev[0] - pos_curr[0]
dy = pos_prev[1] - pos_curr[1]
dist = math.sqrt(dx * dx + dy * dy)
if dist >= 10:
continue
# Verifier l'ecart temporel < 1s
ts_prev = ev_prev.get("timestamp", 0)
ts_curr = ev_curr.get("timestamp", 0)
dt = abs(ts_curr - ts_prev)
if dt >= 1.0:
continue
# C'est un doublon
a_curr["is_duplicate"] = True
a_curr["duplicate_info"] = (
f"Doublon (< {dt:.0f}s, {dist:.0f}px de distance)"
if dist > 0
else f"Doublon (< {dt:.1f}s, même position)"
)
def _parse_actions(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict[str, Any]]:
"""Convertir les evenements bruts en liste d'actions affichables.
Retourne une liste de dicts avec : index_global, type, position, fenetre,
texte, touches, shot_file, is_parasitic, etc.
texte, touches, shot_file, is_parasitic, description, is_duplicate, etc.
"""
actions: List[Dict[str, Any]] = []
click_count = 0
total_events = len(events)
# Pre-calculer les 3 derniers indices d'evenements exploitables
# Pre-calculer le dernier indice d'evenement exploitable
# pour la detection fine de l'arret d'enregistrement
actionable_indices = [
i for i, ev in enumerate(events)
if ev.get("event", {}).get("type", "") in _ACTIONABLE_TYPES
]
last_3_actionable = set(actionable_indices[-3:]) if len(actionable_indices) >= 3 else set(actionable_indices)
last_actionable_idx = actionable_indices[-1] if actionable_indices else -1
for i, event in enumerate(events):
inner = event.get("event", {})
@@ -308,6 +662,9 @@ def _parse_actions(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict
"keys": "",
"shot_file": None,
"is_parasitic": False,
"description": _generate_description(event),
"is_duplicate": False,
"duplicate_info": "",
}
# Position (pour les clics)
@@ -334,30 +691,25 @@ def _parse_actions(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict
else:
action["keys"] = str(inner.get("key", keys))
# Detection parasite
# Utiliser les 3 derniers indices exploitables (pas les indices globaux)
parasitic = False
inner_type = etype
# Detection parasite — logique centralisee + extensions
parasitic = _is_parasitic(event, i, total_events)
# Clic droit
if inner_type == "mouse_click" and inner.get("button") == "right":
# Extensions specifiques a _parse_actions :
# - interaction systray (icones cachees, fenetre depassement)
if not parasitic and _is_systray_interaction(event):
parasitic = True
# Fenetre parasite
win_lower = action["window_title"].lower()
if win_lower:
for pattern in _PARASITIC_WINDOW_PATTERNS:
if pattern in win_lower:
parasitic = True
break
# Derniers 3 evenements exploitables
if i in last_3_actionable:
# - arret d'enregistrement (dernier evenement exploitable uniquement)
is_last = (i == last_actionable_idx)
if not parasitic and _is_stop_recording_event(event, is_last):
parasitic = True
action["is_parasitic"] = parasitic
actions.append(action)
# Detection des doublons (marque is_duplicate sur le 2eme du pair)
_detect_duplicates(actions, events)
return actions
@@ -392,6 +744,11 @@ tr:hover { background: #f0f7ff; }
padding: 15px; margin: 15px 0; }
.parasitic { background: #ffe0e0; }
.normal { background: #e0ffe0; }
.duplicate { background: #f0f0f0; color: #999; }
.duplicate td { color: #999; }
.desc { font-size: 13px; color: #555; max-width: 300px; }
.badge-dup { display: inline-block; background: #ddd; color: #888; font-size: 11px;
padding: 2px 6px; border-radius: 3px; margin-left: 4px; cursor: help; }
.counter { font-size: 18px; font-weight: bold; margin: 15px 0; }
.counter .remove { color: #e74c3c; }
.counter .total { color: #2c3e50; }
@@ -478,6 +835,9 @@ _SESSION_TEMPLATE = """<!DOCTYPE html>
<div class="counter" id="counter">
<span class="remove" id="remove-count">{{ parasitic_count }}</span> actions a supprimer /
<span class="total">{{ actions|length }}</span> total
{% if duplicate_count > 0 %}
| <span style="color:#999">{{ duplicate_count }} doublon{{ 's' if duplicate_count > 1 else '' }}</span>
{% endif %}
</div>
<form method="POST" action="{{ url_for('clean_and_replay') }}" id="clean-form">
@@ -490,6 +850,7 @@ _SESSION_TEMPLATE = """<!DOCTYPE html>
<th>Supprimer</th>
<th>#</th>
<th>Type</th>
<th>Description</th>
<th>Position</th>
<th>Fenetre</th>
<th>Texte / Touches</th>
@@ -498,7 +859,7 @@ _SESSION_TEMPLATE = """<!DOCTYPE html>
</thead>
<tbody>
{% for a in actions %}
<tr class="{{ 'parasitic' if a.is_parasitic else 'normal' }}">
<tr class="{{ 'parasitic' if a.is_parasitic else ('duplicate' if a.is_duplicate else 'normal') }}">
<td>
<label>
<input type="checkbox" name="remove_indices"
@@ -513,7 +874,11 @@ _SESSION_TEMPLATE = """<!DOCTYPE html>
{% if a.button is defined and a.button == 'right' %}
<span style="color:#e74c3c">(droit)</span>
{% endif %}
{% if a.is_duplicate %}
<span class="badge-dup" title="{{ a.duplicate_info }}">doublon</span>
{% endif %}
</td>
<td class="desc">{{ a.description }}</td>
<td class="mono">{{ a.position }}</td>
<td>{{ a.window_title|truncate(40) }}</td>
<td class="mono">
@@ -665,9 +1030,10 @@ def view_session(machine_id: str, session_id: str):
events = _load_events(session_dir)
actions = _parse_actions(events, session_dir)
# Compter les parasites et collecter leurs indices globaux
# Compter les parasites, doublons et collecter les indices globaux
parasitic_count = sum(1 for a in actions if a["is_parasitic"])
parasitic_indices = [a["global_index"] for a in actions if a["is_parasitic"]]
duplicate_count = sum(1 for a in actions if a.get("is_duplicate"))
# Date depuis le nom de session
date_str = ""
@@ -688,6 +1054,7 @@ def view_session(machine_id: str, session_id: str):
actions=actions,
parasitic_count=parasitic_count,
parasitic_indices=parasitic_indices,
duplicate_count=duplicate_count,
css=_BASE_CSS,
)