Files
rpa_vision_v3/tests/unit/test_audit_trail.py
Dom 99041f0117 feat: pipeline complet MACRO/MÉSO/MICRO — Critic, Observer, Policy, Recovery, Learning, Audit Trail, TaskPlanner
Architecture 3 niveaux implémentée et testée (137 tests unitaires + 21 visuels) :

MÉSO (acteur intelligent) :
- P0 Critic : vérification sémantique post-action via gemma4 (replay_verifier.py)
- P1 Observer : pré-analyse écran avant chaque action (api_stream.py /pre_analyze)
- P2 Grounding/Policy : séparation localisation (grounding.py) et décision (policy.py)
- P3 Recovery : rollback automatique Ctrl+Z/Escape/Alt+F4 (recovery.py)
- P4 Learning : apprentissage runtime avec boucle de consolidation (replay_learner.py)

MACRO (planificateur) :
- TaskPlanner : comprend les ordres en langage naturel via gemma4 (task_planner.py)
- Contexte métier TIM/CIM-10 pour les hôpitaux (domain_context.py)
- Endpoint POST /api/v1/task pour l'exécution par instruction

Traçabilité :
- Audit trail complet avec 18 champs par action (audit_trail.py)
- Endpoints GET /audit/history, /audit/summary, /audit/export (CSV)

Grounding :
- Fix parsing bbox_2d qwen2.5vl (pixels relatifs, pas grille 1000x1000)
- Benchmarks visuels sur captures réelles (3 approches : baseline, zoom, Citrix)
- Reproductibilité validée : variance < 0.008 sur 10 itérations

Sécurité :
- Tokens de production retirés du code source → .env.local
- Secret key aléatoire si non configuré
- Suppression logs qui leakent les tokens

Résultats : 80% de replay (vs 12.5% avant), 100% détection visuelle Citrix JPEG Q20

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 21:03:25 +02:00

684 lines
24 KiB
Python

# tests/unit/test_audit_trail.py
"""
Tests unitaires du module Audit Trail.
Vérifie l'enregistrement, la recherche, l'export CSV et le résumé
journalier des entrées d'audit.
"""
import csv
import io
import json
import os
import tempfile
from datetime import date, datetime, timedelta
from pathlib import Path
import pytest
# Importer depuis le bon chemin (agent_v0/server_v1/)
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from agent_v0.server_v1.audit_trail import AuditEntry, AuditTrail
# =========================================================================
# Fixtures
# =========================================================================
@pytest.fixture
def audit_dir(tmp_path):
"""Répertoire temporaire pour les fichiers d'audit."""
d = tmp_path / "audit"
d.mkdir()
return str(d)
@pytest.fixture
def audit(audit_dir):
"""Instance AuditTrail avec répertoire temporaire."""
return AuditTrail(audit_dir=audit_dir)
def _make_entry(**kwargs) -> AuditEntry:
"""Créer une entrée d'audit avec des valeurs par défaut."""
defaults = {
"timestamp": datetime.now().isoformat(),
"session_id": "sess_test_001",
"action_id": "act_001",
"user_id": "tim_dupont",
"user_name": "Marie Dupont",
"machine_id": "PC-TIM-01",
"action_type": "click",
"action_detail": "Clic sur 'Enregistrer' dans DxCare",
"target_app": "DxCare",
"execution_mode": "assisted",
"result": "success",
"resolution_method": "som_text_match",
"critic_result": "semantic_ok",
"recovery_action": "",
"domain": "tim_codage",
"workflow_id": "wf_codage_cim10",
"workflow_name": "Codage CIM-10 séjour",
"duration_ms": 234.5,
}
defaults.update(kwargs)
return AuditEntry(**defaults)
# =========================================================================
# Tests AuditEntry
# =========================================================================
class TestAuditEntry:
"""Tests de la structure AuditEntry."""
def test_creation_basique(self):
"""Créer une entrée avec tous les champs."""
entry = _make_entry()
assert entry.user_id == "tim_dupont"
assert entry.action_type == "click"
assert entry.result == "success"
assert entry.duration_ms == 234.5
def test_to_dict(self):
"""Sérialiser en dictionnaire."""
entry = _make_entry()
d = entry.to_dict()
assert isinstance(d, dict)
assert d["user_id"] == "tim_dupont"
assert d["domain"] == "tim_codage"
assert d["duration_ms"] == 234.5
def test_from_dict(self):
"""Désérialiser depuis un dictionnaire."""
entry = _make_entry()
d = entry.to_dict()
restored = AuditEntry.from_dict(d)
assert restored.user_id == entry.user_id
assert restored.action_detail == entry.action_detail
assert restored.duration_ms == entry.duration_ms
def test_from_dict_ignore_unknown_keys(self):
"""Les clés inconnues sont ignorées (compatibilité future)."""
d = {"user_id": "test", "unknown_field": "valeur", "future_key": 42}
entry = AuditEntry.from_dict(d)
assert entry.user_id == "test"
# Les champs inconnus ne lèvent pas d'erreur
def test_to_dict_json_serializable(self):
"""Le dictionnaire est sérialisable en JSON."""
entry = _make_entry(action_detail="Clic sur 'Validé' — accent français")
d = entry.to_dict()
json_str = json.dumps(d, ensure_ascii=False)
assert "accent français" in json_str
def test_default_values(self):
"""Une entrée vide a des valeurs par défaut cohérentes."""
entry = AuditEntry()
assert entry.timestamp == ""
assert entry.user_id == ""
assert entry.duration_ms == 0.0
assert entry.result == ""
# =========================================================================
# Tests AuditTrail — enregistrement et lecture
# =========================================================================
class TestAuditTrailRecord:
"""Tests d'enregistrement des entrées."""
def test_record_and_reload(self, audit, audit_dir):
"""Enregistrer une entrée puis la relire depuis le fichier."""
entry = _make_entry()
audit.record(entry)
# Vérifier que le fichier existe
today = date.today().isoformat()
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
assert filepath.exists()
# Lire le fichier directement
with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines()
assert len(lines) == 1
data = json.loads(lines[0])
assert data["user_id"] == "tim_dupont"
assert data["action_detail"] == "Clic sur 'Enregistrer' dans DxCare"
def test_record_multiple_entries(self, audit, audit_dir):
"""Enregistrer plusieurs entrées dans le même fichier."""
for i in range(5):
entry = _make_entry(action_id=f"act_{i:03d}")
audit.record(entry)
today = date.today().isoformat()
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines()
assert len(lines) == 5
def test_record_auto_timestamp(self, audit):
"""Le timestamp est généré automatiquement si absent."""
entry = _make_entry(timestamp="")
audit.record(entry)
# Le timestamp doit avoir été rempli
entries = audit.query()
assert len(entries) == 1
assert entries[0]["timestamp"] != ""
# Vérifier le format ISO 8601
datetime.fromisoformat(entries[0]["timestamp"])
def test_record_utf8_french(self, audit):
"""Les caractères français sont correctement enregistrés."""
entry = _make_entry(
action_detail="Saisie du diagnostic 'Hépatite à cytomégalovirus' — CIM-10: B25.1",
user_name="François Müller",
workflow_name="Codage séjour réanimation néonatale",
)
audit.record(entry)
entries = audit.query()
assert len(entries) == 1
assert "Hépatite" in entries[0]["action_detail"]
assert "François Müller" in entries[0]["user_name"]
assert "néonatale" in entries[0]["workflow_name"]
def test_record_creates_directory(self, tmp_path):
"""Le répertoire est créé automatiquement s'il n'existe pas."""
new_dir = str(tmp_path / "sub" / "deep" / "audit")
audit = AuditTrail(audit_dir=new_dir)
entry = _make_entry()
audit.record(entry)
assert Path(new_dir).exists()
entries = audit.query()
assert len(entries) == 1
def test_record_different_dates(self, audit, audit_dir):
"""Les entrées de dates différentes vont dans des fichiers différents."""
today = date.today()
yesterday = today - timedelta(days=1)
entry_today = _make_entry(timestamp=datetime.now().isoformat())
entry_yesterday = _make_entry(
timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(),
action_id="act_yesterday",
)
audit.record(entry_today)
audit.record(entry_yesterday)
# Vérifier les fichiers
file_today = Path(audit_dir) / f"audit_{today.isoformat()}.jsonl"
file_yesterday = Path(audit_dir) / f"audit_{yesterday.isoformat()}.jsonl"
assert file_today.exists()
assert file_yesterday.exists()
def test_jsonl_format(self, audit, audit_dir):
"""Chaque ligne du fichier est un JSON valide (format JSONL)."""
for i in range(3):
audit.record(_make_entry(action_id=f"act_{i}"))
today = date.today().isoformat()
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
with open(filepath, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
assert line, f"Ligne {line_num} vide"
data = json.loads(line) # Ne doit pas lever d'exception
assert "action_id" in data
assert "timestamp" in data
# =========================================================================
# Tests AuditTrail — requêtes avec filtres
# =========================================================================
class TestAuditTrailQuery:
"""Tests de recherche et filtrage."""
def _seed_entries(self, audit):
"""Insérer des entrées de test variées."""
entries = [
_make_entry(
action_id="act_001",
user_id="tim_dupont",
result="success",
action_type="click",
workflow_id="wf_01",
domain="tim_codage",
),
_make_entry(
action_id="act_002",
user_id="tim_dupont",
result="failed",
action_type="type",
workflow_id="wf_01",
domain="generic",
),
_make_entry(
action_id="act_003",
user_id="tim_martin",
user_name="Jean Martin",
result="success",
action_type="click",
workflow_id="wf_02",
domain="generic",
),
_make_entry(
action_id="act_004",
user_id="tim_martin",
user_name="Jean Martin",
result="recovered",
action_type="key_combo",
workflow_id="wf_02",
domain="generic",
),
_make_entry(
action_id="act_005",
user_id="tim_dupont",
result="success",
action_type="click",
workflow_id="wf_01",
domain="generic",
),
]
for e in entries:
audit.record(e)
def test_query_all(self, audit):
"""Requête sans filtre retourne tout."""
self._seed_entries(audit)
results = audit.query()
assert len(results) == 5
def test_query_by_user(self, audit):
"""Filtrer par identifiant utilisateur."""
self._seed_entries(audit)
results = audit.query(user_id="tim_dupont")
assert len(results) == 3
assert all(r["user_id"] == "tim_dupont" for r in results)
def test_query_by_result(self, audit):
"""Filtrer par résultat."""
self._seed_entries(audit)
results = audit.query(result="success")
assert len(results) == 3
assert all(r["result"] == "success" for r in results)
def test_query_by_action_type(self, audit):
"""Filtrer par type d'action."""
self._seed_entries(audit)
results = audit.query(action_type="click")
assert len(results) == 3
def test_query_by_workflow(self, audit):
"""Filtrer par workflow."""
self._seed_entries(audit)
results = audit.query(workflow_id="wf_02")
assert len(results) == 2
def test_query_by_domain(self, audit):
"""Filtrer par domaine métier."""
self._seed_entries(audit)
results = audit.query(domain="tim_codage")
assert len(results) == 1
assert results[0]["action_id"] == "act_001"
def test_query_by_session(self, audit):
"""Filtrer par session."""
self._seed_entries(audit)
results = audit.query(session_id="sess_test_001")
assert len(results) == 5 # Toutes les entrées ont la même session
def test_query_combined_filters(self, audit):
"""Combinaison de plusieurs filtres (AND)."""
self._seed_entries(audit)
results = audit.query(user_id="tim_dupont", result="success")
assert len(results) == 2
def test_query_no_match(self, audit):
"""Filtre sans correspondance retourne une liste vide."""
self._seed_entries(audit)
results = audit.query(user_id="tim_inexistant")
assert len(results) == 0
def test_query_pagination_limit(self, audit):
"""Limiter le nombre de résultats."""
self._seed_entries(audit)
results = audit.query(limit=2)
assert len(results) == 2
def test_query_pagination_offset(self, audit):
"""Décalage dans les résultats."""
self._seed_entries(audit)
all_results = audit.query()
offset_results = audit.query(offset=3)
assert len(offset_results) == 2
assert offset_results[0] == all_results[3]
def test_query_sorted_by_timestamp_desc(self, audit):
"""Les résultats sont triés par timestamp décroissant."""
now = datetime.now()
for i in range(5):
ts = (now - timedelta(minutes=i)).isoformat()
audit.record(_make_entry(
timestamp=ts,
action_id=f"act_{i}",
))
results = audit.query()
timestamps = [r["timestamp"] for r in results]
assert timestamps == sorted(timestamps, reverse=True)
def test_query_date_range(self, audit):
"""Filtrer par plage de dates."""
today = date.today()
yesterday = today - timedelta(days=1)
# Entrée d'hier
audit.record(_make_entry(
timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(),
action_id="act_yesterday",
))
# Entrée d'aujourd'hui
audit.record(_make_entry(
timestamp=datetime.now().isoformat(),
action_id="act_today",
))
# Filtrer uniquement hier
results = audit.query(
date_from=yesterday.isoformat(),
date_to=yesterday.isoformat(),
)
assert len(results) == 1
assert results[0]["action_id"] == "act_yesterday"
# Filtrer les deux jours
results = audit.query(
date_from=yesterday.isoformat(),
date_to=today.isoformat(),
)
assert len(results) == 2
# =========================================================================
# Tests AuditTrail — résumé journalier
# =========================================================================
class TestAuditTrailSummary:
"""Tests du résumé journalier."""
def test_summary_empty(self, audit):
"""Résumé d'un jour sans données."""
summary = audit.get_summary("2025-01-01")
assert summary["total_actions"] == 0
assert summary["success_rate"] == 0.0
assert summary["by_user"] == {}
def test_summary_basic(self, audit):
"""Résumé avec quelques entrées."""
audit.record(_make_entry(user_id="tim_dupont", result="success"))
audit.record(_make_entry(user_id="tim_dupont", result="failed"))
audit.record(_make_entry(user_id="tim_martin", user_name="Jean Martin", result="success"))
summary = audit.get_summary()
assert summary["total_actions"] == 3
assert summary["success_rate"] == round(2 / 3, 3)
def test_summary_by_user(self, audit):
"""Répartition par utilisateur."""
audit.record(_make_entry(user_id="tim_dupont", result="success"))
audit.record(_make_entry(user_id="tim_dupont", result="success"))
audit.record(_make_entry(user_id="tim_dupont", result="failed"))
audit.record(_make_entry(user_id="tim_martin", user_name="Jean Martin", result="success"))
summary = audit.get_summary()
assert "tim_dupont" in summary["by_user"]
assert summary["by_user"]["tim_dupont"]["total"] == 3
assert summary["by_user"]["tim_dupont"]["success"] == 2
assert summary["by_user"]["tim_dupont"]["success_rate"] == round(2 / 3, 3)
assert summary["by_user"]["tim_martin"]["total"] == 1
assert summary["by_user"]["tim_martin"]["success_rate"] == 1.0
def test_summary_by_result(self, audit):
"""Répartition par résultat."""
audit.record(_make_entry(result="success"))
audit.record(_make_entry(result="success"))
audit.record(_make_entry(result="failed"))
audit.record(_make_entry(result="recovered"))
summary = audit.get_summary()
assert summary["by_result"]["success"] == 2
assert summary["by_result"]["failed"] == 1
assert summary["by_result"]["recovered"] == 1
def test_summary_by_action_type(self, audit):
"""Répartition par type d'action."""
audit.record(_make_entry(action_type="click"))
audit.record(_make_entry(action_type="click"))
audit.record(_make_entry(action_type="type"))
summary = audit.get_summary()
assert summary["by_action_type"]["click"] == 2
assert summary["by_action_type"]["type"] == 1
def test_summary_by_workflow(self, audit):
"""Répartition par workflow."""
audit.record(_make_entry(workflow_id="wf_01"))
audit.record(_make_entry(workflow_id="wf_01"))
audit.record(_make_entry(workflow_id="wf_02"))
summary = audit.get_summary()
assert summary["by_workflow"]["wf_01"] == 2
assert summary["by_workflow"]["wf_02"] == 1
def test_summary_by_execution_mode(self, audit):
"""Répartition par mode d'exécution."""
audit.record(_make_entry(execution_mode="autonomous"))
audit.record(_make_entry(execution_mode="assisted"))
audit.record(_make_entry(execution_mode="assisted"))
summary = audit.get_summary()
assert summary["by_execution_mode"]["autonomous"] == 1
assert summary["by_execution_mode"]["assisted"] == 2
def test_summary_date_field(self, audit):
"""Le résumé contient la date demandée."""
today = date.today().isoformat()
summary = audit.get_summary(today)
assert summary["date"] == today
# =========================================================================
# Tests AuditTrail — export CSV
# =========================================================================
class TestAuditTrailExportCSV:
"""Tests de l'export CSV."""
def test_export_csv_empty(self, audit):
"""Export sans données retourne une chaîne vide."""
csv_data = audit.export_csv(date_from="2025-01-01")
assert csv_data == ""
def test_export_csv_basic(self, audit):
"""Export CSV avec quelques entrées."""
audit.record(_make_entry(action_id="act_001"))
audit.record(_make_entry(action_id="act_002"))
csv_data = audit.export_csv()
assert csv_data
assert "act_001" in csv_data
assert "act_002" in csv_data
def test_export_csv_header(self, audit):
"""L'en-tête CSV contient tous les champs du dataclass."""
audit.record(_make_entry())
csv_data = audit.export_csv()
reader = csv.DictReader(io.StringIO(csv_data))
fieldnames = reader.fieldnames
assert "timestamp" in fieldnames
assert "user_id" in fieldnames
assert "action_detail" in fieldnames
assert "domain" in fieldnames
assert "duration_ms" in fieldnames
def test_export_csv_parseable(self, audit):
"""Le CSV produit est parseable par le module csv."""
for i in range(5):
audit.record(_make_entry(
action_id=f"act_{i}",
action_detail=f"Action {i} — avec des 'guillemets' et des, virgules",
))
csv_data = audit.export_csv()
reader = csv.DictReader(io.StringIO(csv_data))
rows = list(reader)
assert len(rows) == 5
# Vérifier que les valeurs sont correctes malgré les caractères spéciaux
for row in rows:
assert "virgules" in row["action_detail"]
def test_export_csv_filter_by_user(self, audit):
"""Export filtré par utilisateur."""
audit.record(_make_entry(user_id="tim_dupont", action_id="act_001"))
audit.record(_make_entry(user_id="tim_martin", action_id="act_002"))
csv_data = audit.export_csv(user_id="tim_dupont")
reader = csv.DictReader(io.StringIO(csv_data))
rows = list(reader)
assert len(rows) == 1
assert rows[0]["user_id"] == "tim_dupont"
def test_export_csv_utf8(self, audit):
"""L'export CSV gère correctement l'UTF-8 français."""
audit.record(_make_entry(
action_detail="Saisie 'Hépatite à cytomégalovirus' — réanimation néonatale",
user_name="François Müller",
))
csv_data = audit.export_csv()
assert "Hépatite" in csv_data
assert "François Müller" in csv_data
# =========================================================================
# Tests de robustesse
# =========================================================================
class TestAuditTrailRobustness:
"""Tests de robustesse et cas limites."""
def test_directory_auto_creation(self, tmp_path):
"""Le répertoire est créé automatiquement s'il n'existe pas."""
audit_dir = str(tmp_path / "nonexistent" / "deep" / "audit")
assert not Path(audit_dir).exists()
audit = AuditTrail(audit_dir=audit_dir)
assert Path(audit_dir).exists()
def test_corrupted_jsonl_line(self, audit, audit_dir):
"""Une ligne corrompue dans le fichier JSONL ne fait pas crasher la lecture."""
# Écrire des entrées normales
audit.record(_make_entry(action_id="act_001"))
audit.record(_make_entry(action_id="act_002"))
# Injecter une ligne corrompue
today = date.today().isoformat()
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
with open(filepath, "a", encoding="utf-8") as f:
f.write("{invalid json line\n")
# Ajouter encore une entrée valide
audit.record(_make_entry(action_id="act_003"))
# La lecture doit fonctionner et ignorer la ligne corrompue
entries = audit.query()
assert len(entries) == 3 # 2 valides avant + 1 valide après
def test_empty_file(self, audit, audit_dir):
"""Un fichier vide ne fait pas crasher."""
today = date.today().isoformat()
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
filepath.touch() # Fichier vide
entries = audit.query()
assert len(entries) == 0
def test_concurrent_writes(self, audit):
"""Écritures concurrentes grâce au verrou threading."""
import threading
errors = []
def write_entries(start):
try:
for i in range(20):
audit.record(_make_entry(action_id=f"act_{start}_{i}"))
except Exception as e:
errors.append(str(e))
threads = [
threading.Thread(target=write_entries, args=(t,))
for t in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Erreurs concurrentes: {errors}"
entries = audit.query(limit=200)
assert len(entries) == 100 # 5 threads x 20 entrées
def test_query_invalid_date(self, audit):
"""Dates invalides ne font pas crasher."""
# Ne doit pas lever d'exception
results = audit.query(date_from="not-a-date")
assert isinstance(results, list)
def test_summary_invalid_date(self, audit):
"""Date invalide dans get_summary ne fait pas crasher."""
summary = audit.get_summary("not-a-date")
assert summary["total_actions"] == 0
def test_entry_all_fields_present_in_export(self, audit):
"""Tous les champs du dataclass sont présents dans l'export CSV."""
from dataclasses import fields as dc_fields
entry = _make_entry()
audit.record(entry)
csv_data = audit.export_csv()
reader = csv.DictReader(io.StringIO(csv_data))
row = next(reader)
expected_fields = {f.name for f in dc_fields(AuditEntry)}
actual_fields = set(row.keys())
assert expected_fields == actual_fields
def test_date_range_reversed(self, audit):
"""Plage de dates inversée (date_to < date_from) fonctionne quand même."""
today = date.today()
yesterday = today - timedelta(days=1)
audit.record(_make_entry(
timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(),
))
# date_from > date_to → doit quand même fonctionner
results = audit.query(
date_from=today.isoformat(),
date_to=yesterday.isoformat(),
)
# L'implémentation inverse automatiquement les dates
assert isinstance(results, list)