Architecture 3 niveaux implémentée et testée (137 tests unitaires + 21 visuels) : MÉSO (acteur intelligent) : - P0 Critic : vérification sémantique post-action via gemma4 (replay_verifier.py) - P1 Observer : pré-analyse écran avant chaque action (api_stream.py /pre_analyze) - P2 Grounding/Policy : séparation localisation (grounding.py) et décision (policy.py) - P3 Recovery : rollback automatique Ctrl+Z/Escape/Alt+F4 (recovery.py) - P4 Learning : apprentissage runtime avec boucle de consolidation (replay_learner.py) MACRO (planificateur) : - TaskPlanner : comprend les ordres en langage naturel via gemma4 (task_planner.py) - Contexte métier TIM/CIM-10 pour les hôpitaux (domain_context.py) - Endpoint POST /api/v1/task pour l'exécution par instruction Traçabilité : - Audit trail complet avec 18 champs par action (audit_trail.py) - Endpoints GET /audit/history, /audit/summary, /audit/export (CSV) Grounding : - Fix parsing bbox_2d qwen2.5vl (pixels relatifs, pas grille 1000x1000) - Benchmarks visuels sur captures réelles (3 approches : baseline, zoom, Citrix) - Reproductibilité validée : variance < 0.008 sur 10 itérations Sécurité : - Tokens de production retirés du code source → .env.local - Secret key aléatoire si non configuré - Suppression logs qui leakent les tokens Résultats : 80% de replay (vs 12.5% avant), 100% détection visuelle Citrix JPEG Q20 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
684 lines
24 KiB
Python
684 lines
24 KiB
Python
# tests/unit/test_audit_trail.py
|
|
"""
|
|
Tests unitaires du module Audit Trail.
|
|
|
|
Vérifie l'enregistrement, la recherche, l'export CSV et le résumé
|
|
journalier des entrées d'audit.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from datetime import date, datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
# Importer depuis le bon chemin (agent_v0/server_v1/)
|
|
import sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
|
|
from agent_v0.server_v1.audit_trail import AuditEntry, AuditTrail
|
|
|
|
|
|
# =========================================================================
|
|
# Fixtures
|
|
# =========================================================================
|
|
|
|
@pytest.fixture
|
|
def audit_dir(tmp_path):
|
|
"""Répertoire temporaire pour les fichiers d'audit."""
|
|
d = tmp_path / "audit"
|
|
d.mkdir()
|
|
return str(d)
|
|
|
|
|
|
@pytest.fixture
|
|
def audit(audit_dir):
|
|
"""Instance AuditTrail avec répertoire temporaire."""
|
|
return AuditTrail(audit_dir=audit_dir)
|
|
|
|
|
|
def _make_entry(**kwargs) -> AuditEntry:
|
|
"""Créer une entrée d'audit avec des valeurs par défaut."""
|
|
defaults = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"session_id": "sess_test_001",
|
|
"action_id": "act_001",
|
|
"user_id": "tim_dupont",
|
|
"user_name": "Marie Dupont",
|
|
"machine_id": "PC-TIM-01",
|
|
"action_type": "click",
|
|
"action_detail": "Clic sur 'Enregistrer' dans DxCare",
|
|
"target_app": "DxCare",
|
|
"execution_mode": "assisted",
|
|
"result": "success",
|
|
"resolution_method": "som_text_match",
|
|
"critic_result": "semantic_ok",
|
|
"recovery_action": "",
|
|
"domain": "tim_codage",
|
|
"workflow_id": "wf_codage_cim10",
|
|
"workflow_name": "Codage CIM-10 séjour",
|
|
"duration_ms": 234.5,
|
|
}
|
|
defaults.update(kwargs)
|
|
return AuditEntry(**defaults)
|
|
|
|
|
|
# =========================================================================
|
|
# Tests AuditEntry
|
|
# =========================================================================
|
|
|
|
class TestAuditEntry:
|
|
"""Tests de la structure AuditEntry."""
|
|
|
|
def test_creation_basique(self):
|
|
"""Créer une entrée avec tous les champs."""
|
|
entry = _make_entry()
|
|
assert entry.user_id == "tim_dupont"
|
|
assert entry.action_type == "click"
|
|
assert entry.result == "success"
|
|
assert entry.duration_ms == 234.5
|
|
|
|
def test_to_dict(self):
|
|
"""Sérialiser en dictionnaire."""
|
|
entry = _make_entry()
|
|
d = entry.to_dict()
|
|
assert isinstance(d, dict)
|
|
assert d["user_id"] == "tim_dupont"
|
|
assert d["domain"] == "tim_codage"
|
|
assert d["duration_ms"] == 234.5
|
|
|
|
def test_from_dict(self):
|
|
"""Désérialiser depuis un dictionnaire."""
|
|
entry = _make_entry()
|
|
d = entry.to_dict()
|
|
restored = AuditEntry.from_dict(d)
|
|
assert restored.user_id == entry.user_id
|
|
assert restored.action_detail == entry.action_detail
|
|
assert restored.duration_ms == entry.duration_ms
|
|
|
|
def test_from_dict_ignore_unknown_keys(self):
|
|
"""Les clés inconnues sont ignorées (compatibilité future)."""
|
|
d = {"user_id": "test", "unknown_field": "valeur", "future_key": 42}
|
|
entry = AuditEntry.from_dict(d)
|
|
assert entry.user_id == "test"
|
|
# Les champs inconnus ne lèvent pas d'erreur
|
|
|
|
def test_to_dict_json_serializable(self):
|
|
"""Le dictionnaire est sérialisable en JSON."""
|
|
entry = _make_entry(action_detail="Clic sur 'Validé' — accent français")
|
|
d = entry.to_dict()
|
|
json_str = json.dumps(d, ensure_ascii=False)
|
|
assert "accent français" in json_str
|
|
|
|
def test_default_values(self):
|
|
"""Une entrée vide a des valeurs par défaut cohérentes."""
|
|
entry = AuditEntry()
|
|
assert entry.timestamp == ""
|
|
assert entry.user_id == ""
|
|
assert entry.duration_ms == 0.0
|
|
assert entry.result == ""
|
|
|
|
|
|
# =========================================================================
|
|
# Tests AuditTrail — enregistrement et lecture
|
|
# =========================================================================
|
|
|
|
class TestAuditTrailRecord:
|
|
"""Tests d'enregistrement des entrées."""
|
|
|
|
def test_record_and_reload(self, audit, audit_dir):
|
|
"""Enregistrer une entrée puis la relire depuis le fichier."""
|
|
entry = _make_entry()
|
|
audit.record(entry)
|
|
|
|
# Vérifier que le fichier existe
|
|
today = date.today().isoformat()
|
|
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
|
|
assert filepath.exists()
|
|
|
|
# Lire le fichier directement
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
assert len(lines) == 1
|
|
|
|
data = json.loads(lines[0])
|
|
assert data["user_id"] == "tim_dupont"
|
|
assert data["action_detail"] == "Clic sur 'Enregistrer' dans DxCare"
|
|
|
|
def test_record_multiple_entries(self, audit, audit_dir):
|
|
"""Enregistrer plusieurs entrées dans le même fichier."""
|
|
for i in range(5):
|
|
entry = _make_entry(action_id=f"act_{i:03d}")
|
|
audit.record(entry)
|
|
|
|
today = date.today().isoformat()
|
|
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
assert len(lines) == 5
|
|
|
|
def test_record_auto_timestamp(self, audit):
|
|
"""Le timestamp est généré automatiquement si absent."""
|
|
entry = _make_entry(timestamp="")
|
|
audit.record(entry)
|
|
|
|
# Le timestamp doit avoir été rempli
|
|
entries = audit.query()
|
|
assert len(entries) == 1
|
|
assert entries[0]["timestamp"] != ""
|
|
# Vérifier le format ISO 8601
|
|
datetime.fromisoformat(entries[0]["timestamp"])
|
|
|
|
def test_record_utf8_french(self, audit):
|
|
"""Les caractères français sont correctement enregistrés."""
|
|
entry = _make_entry(
|
|
action_detail="Saisie du diagnostic 'Hépatite à cytomégalovirus' — CIM-10: B25.1",
|
|
user_name="François Müller",
|
|
workflow_name="Codage séjour réanimation néonatale",
|
|
)
|
|
audit.record(entry)
|
|
|
|
entries = audit.query()
|
|
assert len(entries) == 1
|
|
assert "Hépatite" in entries[0]["action_detail"]
|
|
assert "François Müller" in entries[0]["user_name"]
|
|
assert "néonatale" in entries[0]["workflow_name"]
|
|
|
|
def test_record_creates_directory(self, tmp_path):
|
|
"""Le répertoire est créé automatiquement s'il n'existe pas."""
|
|
new_dir = str(tmp_path / "sub" / "deep" / "audit")
|
|
audit = AuditTrail(audit_dir=new_dir)
|
|
entry = _make_entry()
|
|
audit.record(entry)
|
|
|
|
assert Path(new_dir).exists()
|
|
entries = audit.query()
|
|
assert len(entries) == 1
|
|
|
|
def test_record_different_dates(self, audit, audit_dir):
|
|
"""Les entrées de dates différentes vont dans des fichiers différents."""
|
|
today = date.today()
|
|
yesterday = today - timedelta(days=1)
|
|
|
|
entry_today = _make_entry(timestamp=datetime.now().isoformat())
|
|
entry_yesterday = _make_entry(
|
|
timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(),
|
|
action_id="act_yesterday",
|
|
)
|
|
|
|
audit.record(entry_today)
|
|
audit.record(entry_yesterday)
|
|
|
|
# Vérifier les fichiers
|
|
file_today = Path(audit_dir) / f"audit_{today.isoformat()}.jsonl"
|
|
file_yesterday = Path(audit_dir) / f"audit_{yesterday.isoformat()}.jsonl"
|
|
assert file_today.exists()
|
|
assert file_yesterday.exists()
|
|
|
|
def test_jsonl_format(self, audit, audit_dir):
|
|
"""Chaque ligne du fichier est un JSON valide (format JSONL)."""
|
|
for i in range(3):
|
|
audit.record(_make_entry(action_id=f"act_{i}"))
|
|
|
|
today = date.today().isoformat()
|
|
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
assert line, f"Ligne {line_num} vide"
|
|
data = json.loads(line) # Ne doit pas lever d'exception
|
|
assert "action_id" in data
|
|
assert "timestamp" in data
|
|
|
|
|
|
# =========================================================================
|
|
# Tests AuditTrail — requêtes avec filtres
|
|
# =========================================================================
|
|
|
|
class TestAuditTrailQuery:
|
|
"""Tests de recherche et filtrage."""
|
|
|
|
def _seed_entries(self, audit):
|
|
"""Insérer des entrées de test variées."""
|
|
entries = [
|
|
_make_entry(
|
|
action_id="act_001",
|
|
user_id="tim_dupont",
|
|
result="success",
|
|
action_type="click",
|
|
workflow_id="wf_01",
|
|
domain="tim_codage",
|
|
),
|
|
_make_entry(
|
|
action_id="act_002",
|
|
user_id="tim_dupont",
|
|
result="failed",
|
|
action_type="type",
|
|
workflow_id="wf_01",
|
|
domain="generic",
|
|
),
|
|
_make_entry(
|
|
action_id="act_003",
|
|
user_id="tim_martin",
|
|
user_name="Jean Martin",
|
|
result="success",
|
|
action_type="click",
|
|
workflow_id="wf_02",
|
|
domain="generic",
|
|
),
|
|
_make_entry(
|
|
action_id="act_004",
|
|
user_id="tim_martin",
|
|
user_name="Jean Martin",
|
|
result="recovered",
|
|
action_type="key_combo",
|
|
workflow_id="wf_02",
|
|
domain="generic",
|
|
),
|
|
_make_entry(
|
|
action_id="act_005",
|
|
user_id="tim_dupont",
|
|
result="success",
|
|
action_type="click",
|
|
workflow_id="wf_01",
|
|
domain="generic",
|
|
),
|
|
]
|
|
for e in entries:
|
|
audit.record(e)
|
|
|
|
def test_query_all(self, audit):
|
|
"""Requête sans filtre retourne tout."""
|
|
self._seed_entries(audit)
|
|
results = audit.query()
|
|
assert len(results) == 5
|
|
|
|
def test_query_by_user(self, audit):
|
|
"""Filtrer par identifiant utilisateur."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(user_id="tim_dupont")
|
|
assert len(results) == 3
|
|
assert all(r["user_id"] == "tim_dupont" for r in results)
|
|
|
|
def test_query_by_result(self, audit):
|
|
"""Filtrer par résultat."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(result="success")
|
|
assert len(results) == 3
|
|
assert all(r["result"] == "success" for r in results)
|
|
|
|
def test_query_by_action_type(self, audit):
|
|
"""Filtrer par type d'action."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(action_type="click")
|
|
assert len(results) == 3
|
|
|
|
def test_query_by_workflow(self, audit):
|
|
"""Filtrer par workflow."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(workflow_id="wf_02")
|
|
assert len(results) == 2
|
|
|
|
def test_query_by_domain(self, audit):
|
|
"""Filtrer par domaine métier."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(domain="tim_codage")
|
|
assert len(results) == 1
|
|
assert results[0]["action_id"] == "act_001"
|
|
|
|
def test_query_by_session(self, audit):
|
|
"""Filtrer par session."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(session_id="sess_test_001")
|
|
assert len(results) == 5 # Toutes les entrées ont la même session
|
|
|
|
def test_query_combined_filters(self, audit):
|
|
"""Combinaison de plusieurs filtres (AND)."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(user_id="tim_dupont", result="success")
|
|
assert len(results) == 2
|
|
|
|
def test_query_no_match(self, audit):
|
|
"""Filtre sans correspondance retourne une liste vide."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(user_id="tim_inexistant")
|
|
assert len(results) == 0
|
|
|
|
def test_query_pagination_limit(self, audit):
|
|
"""Limiter le nombre de résultats."""
|
|
self._seed_entries(audit)
|
|
results = audit.query(limit=2)
|
|
assert len(results) == 2
|
|
|
|
def test_query_pagination_offset(self, audit):
|
|
"""Décalage dans les résultats."""
|
|
self._seed_entries(audit)
|
|
all_results = audit.query()
|
|
offset_results = audit.query(offset=3)
|
|
assert len(offset_results) == 2
|
|
assert offset_results[0] == all_results[3]
|
|
|
|
def test_query_sorted_by_timestamp_desc(self, audit):
|
|
"""Les résultats sont triés par timestamp décroissant."""
|
|
now = datetime.now()
|
|
for i in range(5):
|
|
ts = (now - timedelta(minutes=i)).isoformat()
|
|
audit.record(_make_entry(
|
|
timestamp=ts,
|
|
action_id=f"act_{i}",
|
|
))
|
|
|
|
results = audit.query()
|
|
timestamps = [r["timestamp"] for r in results]
|
|
assert timestamps == sorted(timestamps, reverse=True)
|
|
|
|
def test_query_date_range(self, audit):
|
|
"""Filtrer par plage de dates."""
|
|
today = date.today()
|
|
yesterday = today - timedelta(days=1)
|
|
|
|
# Entrée d'hier
|
|
audit.record(_make_entry(
|
|
timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(),
|
|
action_id="act_yesterday",
|
|
))
|
|
# Entrée d'aujourd'hui
|
|
audit.record(_make_entry(
|
|
timestamp=datetime.now().isoformat(),
|
|
action_id="act_today",
|
|
))
|
|
|
|
# Filtrer uniquement hier
|
|
results = audit.query(
|
|
date_from=yesterday.isoformat(),
|
|
date_to=yesterday.isoformat(),
|
|
)
|
|
assert len(results) == 1
|
|
assert results[0]["action_id"] == "act_yesterday"
|
|
|
|
# Filtrer les deux jours
|
|
results = audit.query(
|
|
date_from=yesterday.isoformat(),
|
|
date_to=today.isoformat(),
|
|
)
|
|
assert len(results) == 2
|
|
|
|
|
|
# =========================================================================
|
|
# Tests AuditTrail — résumé journalier
|
|
# =========================================================================
|
|
|
|
class TestAuditTrailSummary:
|
|
"""Tests du résumé journalier."""
|
|
|
|
def test_summary_empty(self, audit):
|
|
"""Résumé d'un jour sans données."""
|
|
summary = audit.get_summary("2025-01-01")
|
|
assert summary["total_actions"] == 0
|
|
assert summary["success_rate"] == 0.0
|
|
assert summary["by_user"] == {}
|
|
|
|
def test_summary_basic(self, audit):
|
|
"""Résumé avec quelques entrées."""
|
|
audit.record(_make_entry(user_id="tim_dupont", result="success"))
|
|
audit.record(_make_entry(user_id="tim_dupont", result="failed"))
|
|
audit.record(_make_entry(user_id="tim_martin", user_name="Jean Martin", result="success"))
|
|
|
|
summary = audit.get_summary()
|
|
assert summary["total_actions"] == 3
|
|
assert summary["success_rate"] == round(2 / 3, 3)
|
|
|
|
def test_summary_by_user(self, audit):
|
|
"""Répartition par utilisateur."""
|
|
audit.record(_make_entry(user_id="tim_dupont", result="success"))
|
|
audit.record(_make_entry(user_id="tim_dupont", result="success"))
|
|
audit.record(_make_entry(user_id="tim_dupont", result="failed"))
|
|
audit.record(_make_entry(user_id="tim_martin", user_name="Jean Martin", result="success"))
|
|
|
|
summary = audit.get_summary()
|
|
assert "tim_dupont" in summary["by_user"]
|
|
assert summary["by_user"]["tim_dupont"]["total"] == 3
|
|
assert summary["by_user"]["tim_dupont"]["success"] == 2
|
|
assert summary["by_user"]["tim_dupont"]["success_rate"] == round(2 / 3, 3)
|
|
assert summary["by_user"]["tim_martin"]["total"] == 1
|
|
assert summary["by_user"]["tim_martin"]["success_rate"] == 1.0
|
|
|
|
def test_summary_by_result(self, audit):
|
|
"""Répartition par résultat."""
|
|
audit.record(_make_entry(result="success"))
|
|
audit.record(_make_entry(result="success"))
|
|
audit.record(_make_entry(result="failed"))
|
|
audit.record(_make_entry(result="recovered"))
|
|
|
|
summary = audit.get_summary()
|
|
assert summary["by_result"]["success"] == 2
|
|
assert summary["by_result"]["failed"] == 1
|
|
assert summary["by_result"]["recovered"] == 1
|
|
|
|
def test_summary_by_action_type(self, audit):
|
|
"""Répartition par type d'action."""
|
|
audit.record(_make_entry(action_type="click"))
|
|
audit.record(_make_entry(action_type="click"))
|
|
audit.record(_make_entry(action_type="type"))
|
|
|
|
summary = audit.get_summary()
|
|
assert summary["by_action_type"]["click"] == 2
|
|
assert summary["by_action_type"]["type"] == 1
|
|
|
|
def test_summary_by_workflow(self, audit):
|
|
"""Répartition par workflow."""
|
|
audit.record(_make_entry(workflow_id="wf_01"))
|
|
audit.record(_make_entry(workflow_id="wf_01"))
|
|
audit.record(_make_entry(workflow_id="wf_02"))
|
|
|
|
summary = audit.get_summary()
|
|
assert summary["by_workflow"]["wf_01"] == 2
|
|
assert summary["by_workflow"]["wf_02"] == 1
|
|
|
|
def test_summary_by_execution_mode(self, audit):
|
|
"""Répartition par mode d'exécution."""
|
|
audit.record(_make_entry(execution_mode="autonomous"))
|
|
audit.record(_make_entry(execution_mode="assisted"))
|
|
audit.record(_make_entry(execution_mode="assisted"))
|
|
|
|
summary = audit.get_summary()
|
|
assert summary["by_execution_mode"]["autonomous"] == 1
|
|
assert summary["by_execution_mode"]["assisted"] == 2
|
|
|
|
def test_summary_date_field(self, audit):
|
|
"""Le résumé contient la date demandée."""
|
|
today = date.today().isoformat()
|
|
summary = audit.get_summary(today)
|
|
assert summary["date"] == today
|
|
|
|
|
|
# =========================================================================
|
|
# Tests AuditTrail — export CSV
|
|
# =========================================================================
|
|
|
|
class TestAuditTrailExportCSV:
|
|
"""Tests de l'export CSV."""
|
|
|
|
def test_export_csv_empty(self, audit):
|
|
"""Export sans données retourne une chaîne vide."""
|
|
csv_data = audit.export_csv(date_from="2025-01-01")
|
|
assert csv_data == ""
|
|
|
|
def test_export_csv_basic(self, audit):
|
|
"""Export CSV avec quelques entrées."""
|
|
audit.record(_make_entry(action_id="act_001"))
|
|
audit.record(_make_entry(action_id="act_002"))
|
|
|
|
csv_data = audit.export_csv()
|
|
assert csv_data
|
|
assert "act_001" in csv_data
|
|
assert "act_002" in csv_data
|
|
|
|
def test_export_csv_header(self, audit):
|
|
"""L'en-tête CSV contient tous les champs du dataclass."""
|
|
audit.record(_make_entry())
|
|
|
|
csv_data = audit.export_csv()
|
|
reader = csv.DictReader(io.StringIO(csv_data))
|
|
fieldnames = reader.fieldnames
|
|
assert "timestamp" in fieldnames
|
|
assert "user_id" in fieldnames
|
|
assert "action_detail" in fieldnames
|
|
assert "domain" in fieldnames
|
|
assert "duration_ms" in fieldnames
|
|
|
|
def test_export_csv_parseable(self, audit):
|
|
"""Le CSV produit est parseable par le module csv."""
|
|
for i in range(5):
|
|
audit.record(_make_entry(
|
|
action_id=f"act_{i}",
|
|
action_detail=f"Action {i} — avec des 'guillemets' et des, virgules",
|
|
))
|
|
|
|
csv_data = audit.export_csv()
|
|
reader = csv.DictReader(io.StringIO(csv_data))
|
|
rows = list(reader)
|
|
assert len(rows) == 5
|
|
|
|
# Vérifier que les valeurs sont correctes malgré les caractères spéciaux
|
|
for row in rows:
|
|
assert "virgules" in row["action_detail"]
|
|
|
|
def test_export_csv_filter_by_user(self, audit):
|
|
"""Export filtré par utilisateur."""
|
|
audit.record(_make_entry(user_id="tim_dupont", action_id="act_001"))
|
|
audit.record(_make_entry(user_id="tim_martin", action_id="act_002"))
|
|
|
|
csv_data = audit.export_csv(user_id="tim_dupont")
|
|
reader = csv.DictReader(io.StringIO(csv_data))
|
|
rows = list(reader)
|
|
assert len(rows) == 1
|
|
assert rows[0]["user_id"] == "tim_dupont"
|
|
|
|
def test_export_csv_utf8(self, audit):
|
|
"""L'export CSV gère correctement l'UTF-8 français."""
|
|
audit.record(_make_entry(
|
|
action_detail="Saisie 'Hépatite à cytomégalovirus' — réanimation néonatale",
|
|
user_name="François Müller",
|
|
))
|
|
|
|
csv_data = audit.export_csv()
|
|
assert "Hépatite" in csv_data
|
|
assert "François Müller" in csv_data
|
|
|
|
|
|
# =========================================================================
|
|
# Tests de robustesse
|
|
# =========================================================================
|
|
|
|
class TestAuditTrailRobustness:
|
|
"""Tests de robustesse et cas limites."""
|
|
|
|
def test_directory_auto_creation(self, tmp_path):
|
|
"""Le répertoire est créé automatiquement s'il n'existe pas."""
|
|
audit_dir = str(tmp_path / "nonexistent" / "deep" / "audit")
|
|
assert not Path(audit_dir).exists()
|
|
|
|
audit = AuditTrail(audit_dir=audit_dir)
|
|
assert Path(audit_dir).exists()
|
|
|
|
def test_corrupted_jsonl_line(self, audit, audit_dir):
|
|
"""Une ligne corrompue dans le fichier JSONL ne fait pas crasher la lecture."""
|
|
# Écrire des entrées normales
|
|
audit.record(_make_entry(action_id="act_001"))
|
|
audit.record(_make_entry(action_id="act_002"))
|
|
|
|
# Injecter une ligne corrompue
|
|
today = date.today().isoformat()
|
|
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
|
|
with open(filepath, "a", encoding="utf-8") as f:
|
|
f.write("{invalid json line\n")
|
|
|
|
# Ajouter encore une entrée valide
|
|
audit.record(_make_entry(action_id="act_003"))
|
|
|
|
# La lecture doit fonctionner et ignorer la ligne corrompue
|
|
entries = audit.query()
|
|
assert len(entries) == 3 # 2 valides avant + 1 valide après
|
|
|
|
def test_empty_file(self, audit, audit_dir):
|
|
"""Un fichier vide ne fait pas crasher."""
|
|
today = date.today().isoformat()
|
|
filepath = Path(audit_dir) / f"audit_{today}.jsonl"
|
|
filepath.touch() # Fichier vide
|
|
|
|
entries = audit.query()
|
|
assert len(entries) == 0
|
|
|
|
def test_concurrent_writes(self, audit):
|
|
"""Écritures concurrentes grâce au verrou threading."""
|
|
import threading
|
|
|
|
errors = []
|
|
|
|
def write_entries(start):
|
|
try:
|
|
for i in range(20):
|
|
audit.record(_make_entry(action_id=f"act_{start}_{i}"))
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
|
|
threads = [
|
|
threading.Thread(target=write_entries, args=(t,))
|
|
for t in range(5)
|
|
]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert not errors, f"Erreurs concurrentes: {errors}"
|
|
entries = audit.query(limit=200)
|
|
assert len(entries) == 100 # 5 threads x 20 entrées
|
|
|
|
def test_query_invalid_date(self, audit):
|
|
"""Dates invalides ne font pas crasher."""
|
|
# Ne doit pas lever d'exception
|
|
results = audit.query(date_from="not-a-date")
|
|
assert isinstance(results, list)
|
|
|
|
def test_summary_invalid_date(self, audit):
|
|
"""Date invalide dans get_summary ne fait pas crasher."""
|
|
summary = audit.get_summary("not-a-date")
|
|
assert summary["total_actions"] == 0
|
|
|
|
def test_entry_all_fields_present_in_export(self, audit):
|
|
"""Tous les champs du dataclass sont présents dans l'export CSV."""
|
|
from dataclasses import fields as dc_fields
|
|
entry = _make_entry()
|
|
audit.record(entry)
|
|
|
|
csv_data = audit.export_csv()
|
|
reader = csv.DictReader(io.StringIO(csv_data))
|
|
row = next(reader)
|
|
|
|
expected_fields = {f.name for f in dc_fields(AuditEntry)}
|
|
actual_fields = set(row.keys())
|
|
assert expected_fields == actual_fields
|
|
|
|
def test_date_range_reversed(self, audit):
|
|
"""Plage de dates inversée (date_to < date_from) fonctionne quand même."""
|
|
today = date.today()
|
|
yesterday = today - timedelta(days=1)
|
|
|
|
audit.record(_make_entry(
|
|
timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(),
|
|
))
|
|
|
|
# date_from > date_to → doit quand même fonctionner
|
|
results = audit.query(
|
|
date_from=today.isoformat(),
|
|
date_to=yesterday.isoformat(),
|
|
)
|
|
# L'implémentation inverse automatiquement les dates
|
|
assert isinstance(results, list)
|