# tests/unit/test_audit_trail.py """ Tests unitaires du module Audit Trail. Vérifie l'enregistrement, la recherche, l'export CSV et le résumé journalier des entrées d'audit. """ import csv import io import json import os import tempfile from datetime import date, datetime, timedelta from pathlib import Path import pytest # Importer depuis le bon chemin (agent_v0/server_v1/) import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) from agent_v0.server_v1.audit_trail import AuditEntry, AuditTrail # ========================================================================= # Fixtures # ========================================================================= @pytest.fixture def audit_dir(tmp_path): """Répertoire temporaire pour les fichiers d'audit.""" d = tmp_path / "audit" d.mkdir() return str(d) @pytest.fixture def audit(audit_dir): """Instance AuditTrail avec répertoire temporaire.""" return AuditTrail(audit_dir=audit_dir) def _make_entry(**kwargs) -> AuditEntry: """Créer une entrée d'audit avec des valeurs par défaut.""" defaults = { "timestamp": datetime.now().isoformat(), "session_id": "sess_test_001", "action_id": "act_001", "user_id": "tim_dupont", "user_name": "Marie Dupont", "machine_id": "PC-TIM-01", "action_type": "click", "action_detail": "Clic sur 'Enregistrer' dans DxCare", "target_app": "DxCare", "execution_mode": "assisted", "result": "success", "resolution_method": "som_text_match", "critic_result": "semantic_ok", "recovery_action": "", "domain": "tim_codage", "workflow_id": "wf_codage_cim10", "workflow_name": "Codage CIM-10 séjour", "duration_ms": 234.5, } defaults.update(kwargs) return AuditEntry(**defaults) # ========================================================================= # Tests AuditEntry # ========================================================================= class TestAuditEntry: """Tests de la structure AuditEntry.""" def test_creation_basique(self): """Créer une entrée avec tous les champs.""" entry = _make_entry() assert entry.user_id == "tim_dupont" assert entry.action_type == "click" assert entry.result == "success" assert entry.duration_ms == 234.5 def test_to_dict(self): """Sérialiser en dictionnaire.""" entry = _make_entry() d = entry.to_dict() assert isinstance(d, dict) assert d["user_id"] == "tim_dupont" assert d["domain"] == "tim_codage" assert d["duration_ms"] == 234.5 def test_from_dict(self): """Désérialiser depuis un dictionnaire.""" entry = _make_entry() d = entry.to_dict() restored = AuditEntry.from_dict(d) assert restored.user_id == entry.user_id assert restored.action_detail == entry.action_detail assert restored.duration_ms == entry.duration_ms def test_from_dict_ignore_unknown_keys(self): """Les clés inconnues sont ignorées (compatibilité future).""" d = {"user_id": "test", "unknown_field": "valeur", "future_key": 42} entry = AuditEntry.from_dict(d) assert entry.user_id == "test" # Les champs inconnus ne lèvent pas d'erreur def test_to_dict_json_serializable(self): """Le dictionnaire est sérialisable en JSON.""" entry = _make_entry(action_detail="Clic sur 'Validé' — accent français") d = entry.to_dict() json_str = json.dumps(d, ensure_ascii=False) assert "accent français" in json_str def test_default_values(self): """Une entrée vide a des valeurs par défaut cohérentes.""" entry = AuditEntry() assert entry.timestamp == "" assert entry.user_id == "" assert entry.duration_ms == 0.0 assert entry.result == "" # ========================================================================= # Tests AuditTrail — enregistrement et lecture # ========================================================================= class TestAuditTrailRecord: """Tests d'enregistrement des entrées.""" def test_record_and_reload(self, audit, audit_dir): """Enregistrer une entrée puis la relire depuis le fichier.""" entry = _make_entry() audit.record(entry) # Vérifier que le fichier existe today = date.today().isoformat() filepath = Path(audit_dir) / f"audit_{today}.jsonl" assert filepath.exists() # Lire le fichier directement with open(filepath, "r", encoding="utf-8") as f: lines = f.readlines() assert len(lines) == 1 data = json.loads(lines[0]) assert data["user_id"] == "tim_dupont" assert data["action_detail"] == "Clic sur 'Enregistrer' dans DxCare" def test_record_multiple_entries(self, audit, audit_dir): """Enregistrer plusieurs entrées dans le même fichier.""" for i in range(5): entry = _make_entry(action_id=f"act_{i:03d}") audit.record(entry) today = date.today().isoformat() filepath = Path(audit_dir) / f"audit_{today}.jsonl" with open(filepath, "r", encoding="utf-8") as f: lines = f.readlines() assert len(lines) == 5 def test_record_auto_timestamp(self, audit): """Le timestamp est généré automatiquement si absent.""" entry = _make_entry(timestamp="") audit.record(entry) # Le timestamp doit avoir été rempli entries = audit.query() assert len(entries) == 1 assert entries[0]["timestamp"] != "" # Vérifier le format ISO 8601 datetime.fromisoformat(entries[0]["timestamp"]) def test_record_utf8_french(self, audit): """Les caractères français sont correctement enregistrés.""" entry = _make_entry( action_detail="Saisie du diagnostic 'Hépatite à cytomégalovirus' — CIM-10: B25.1", user_name="François Müller", workflow_name="Codage séjour réanimation néonatale", ) audit.record(entry) entries = audit.query() assert len(entries) == 1 assert "Hépatite" in entries[0]["action_detail"] assert "François Müller" in entries[0]["user_name"] assert "néonatale" in entries[0]["workflow_name"] def test_record_creates_directory(self, tmp_path): """Le répertoire est créé automatiquement s'il n'existe pas.""" new_dir = str(tmp_path / "sub" / "deep" / "audit") audit = AuditTrail(audit_dir=new_dir) entry = _make_entry() audit.record(entry) assert Path(new_dir).exists() entries = audit.query() assert len(entries) == 1 def test_record_different_dates(self, audit, audit_dir): """Les entrées de dates différentes vont dans des fichiers différents.""" today = date.today() yesterday = today - timedelta(days=1) entry_today = _make_entry(timestamp=datetime.now().isoformat()) entry_yesterday = _make_entry( timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(), action_id="act_yesterday", ) audit.record(entry_today) audit.record(entry_yesterday) # Vérifier les fichiers file_today = Path(audit_dir) / f"audit_{today.isoformat()}.jsonl" file_yesterday = Path(audit_dir) / f"audit_{yesterday.isoformat()}.jsonl" assert file_today.exists() assert file_yesterday.exists() def test_jsonl_format(self, audit, audit_dir): """Chaque ligne du fichier est un JSON valide (format JSONL).""" for i in range(3): audit.record(_make_entry(action_id=f"act_{i}")) today = date.today().isoformat() filepath = Path(audit_dir) / f"audit_{today}.jsonl" with open(filepath, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() assert line, f"Ligne {line_num} vide" data = json.loads(line) # Ne doit pas lever d'exception assert "action_id" in data assert "timestamp" in data # ========================================================================= # Tests AuditTrail — requêtes avec filtres # ========================================================================= class TestAuditTrailQuery: """Tests de recherche et filtrage.""" def _seed_entries(self, audit): """Insérer des entrées de test variées.""" entries = [ _make_entry( action_id="act_001", user_id="tim_dupont", result="success", action_type="click", workflow_id="wf_01", domain="tim_codage", ), _make_entry( action_id="act_002", user_id="tim_dupont", result="failed", action_type="type", workflow_id="wf_01", domain="generic", ), _make_entry( action_id="act_003", user_id="tim_martin", user_name="Jean Martin", result="success", action_type="click", workflow_id="wf_02", domain="generic", ), _make_entry( action_id="act_004", user_id="tim_martin", user_name="Jean Martin", result="recovered", action_type="key_combo", workflow_id="wf_02", domain="generic", ), _make_entry( action_id="act_005", user_id="tim_dupont", result="success", action_type="click", workflow_id="wf_01", domain="generic", ), ] for e in entries: audit.record(e) def test_query_all(self, audit): """Requête sans filtre retourne tout.""" self._seed_entries(audit) results = audit.query() assert len(results) == 5 def test_query_by_user(self, audit): """Filtrer par identifiant utilisateur.""" self._seed_entries(audit) results = audit.query(user_id="tim_dupont") assert len(results) == 3 assert all(r["user_id"] == "tim_dupont" for r in results) def test_query_by_result(self, audit): """Filtrer par résultat.""" self._seed_entries(audit) results = audit.query(result="success") assert len(results) == 3 assert all(r["result"] == "success" for r in results) def test_query_by_action_type(self, audit): """Filtrer par type d'action.""" self._seed_entries(audit) results = audit.query(action_type="click") assert len(results) == 3 def test_query_by_workflow(self, audit): """Filtrer par workflow.""" self._seed_entries(audit) results = audit.query(workflow_id="wf_02") assert len(results) == 2 def test_query_by_domain(self, audit): """Filtrer par domaine métier.""" self._seed_entries(audit) results = audit.query(domain="tim_codage") assert len(results) == 1 assert results[0]["action_id"] == "act_001" def test_query_by_session(self, audit): """Filtrer par session.""" self._seed_entries(audit) results = audit.query(session_id="sess_test_001") assert len(results) == 5 # Toutes les entrées ont la même session def test_query_combined_filters(self, audit): """Combinaison de plusieurs filtres (AND).""" self._seed_entries(audit) results = audit.query(user_id="tim_dupont", result="success") assert len(results) == 2 def test_query_no_match(self, audit): """Filtre sans correspondance retourne une liste vide.""" self._seed_entries(audit) results = audit.query(user_id="tim_inexistant") assert len(results) == 0 def test_query_pagination_limit(self, audit): """Limiter le nombre de résultats.""" self._seed_entries(audit) results = audit.query(limit=2) assert len(results) == 2 def test_query_pagination_offset(self, audit): """Décalage dans les résultats.""" self._seed_entries(audit) all_results = audit.query() offset_results = audit.query(offset=3) assert len(offset_results) == 2 assert offset_results[0] == all_results[3] def test_query_sorted_by_timestamp_desc(self, audit): """Les résultats sont triés par timestamp décroissant.""" now = datetime.now() for i in range(5): ts = (now - timedelta(minutes=i)).isoformat() audit.record(_make_entry( timestamp=ts, action_id=f"act_{i}", )) results = audit.query() timestamps = [r["timestamp"] for r in results] assert timestamps == sorted(timestamps, reverse=True) def test_query_date_range(self, audit): """Filtrer par plage de dates.""" today = date.today() yesterday = today - timedelta(days=1) # Entrée d'hier audit.record(_make_entry( timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(), action_id="act_yesterday", )) # Entrée d'aujourd'hui audit.record(_make_entry( timestamp=datetime.now().isoformat(), action_id="act_today", )) # Filtrer uniquement hier results = audit.query( date_from=yesterday.isoformat(), date_to=yesterday.isoformat(), ) assert len(results) == 1 assert results[0]["action_id"] == "act_yesterday" # Filtrer les deux jours results = audit.query( date_from=yesterday.isoformat(), date_to=today.isoformat(), ) assert len(results) == 2 # ========================================================================= # Tests AuditTrail — résumé journalier # ========================================================================= class TestAuditTrailSummary: """Tests du résumé journalier.""" def test_summary_empty(self, audit): """Résumé d'un jour sans données.""" summary = audit.get_summary("2025-01-01") assert summary["total_actions"] == 0 assert summary["success_rate"] == 0.0 assert summary["by_user"] == {} def test_summary_basic(self, audit): """Résumé avec quelques entrées.""" audit.record(_make_entry(user_id="tim_dupont", result="success")) audit.record(_make_entry(user_id="tim_dupont", result="failed")) audit.record(_make_entry(user_id="tim_martin", user_name="Jean Martin", result="success")) summary = audit.get_summary() assert summary["total_actions"] == 3 assert summary["success_rate"] == round(2 / 3, 3) def test_summary_by_user(self, audit): """Répartition par utilisateur.""" audit.record(_make_entry(user_id="tim_dupont", result="success")) audit.record(_make_entry(user_id="tim_dupont", result="success")) audit.record(_make_entry(user_id="tim_dupont", result="failed")) audit.record(_make_entry(user_id="tim_martin", user_name="Jean Martin", result="success")) summary = audit.get_summary() assert "tim_dupont" in summary["by_user"] assert summary["by_user"]["tim_dupont"]["total"] == 3 assert summary["by_user"]["tim_dupont"]["success"] == 2 assert summary["by_user"]["tim_dupont"]["success_rate"] == round(2 / 3, 3) assert summary["by_user"]["tim_martin"]["total"] == 1 assert summary["by_user"]["tim_martin"]["success_rate"] == 1.0 def test_summary_by_result(self, audit): """Répartition par résultat.""" audit.record(_make_entry(result="success")) audit.record(_make_entry(result="success")) audit.record(_make_entry(result="failed")) audit.record(_make_entry(result="recovered")) summary = audit.get_summary() assert summary["by_result"]["success"] == 2 assert summary["by_result"]["failed"] == 1 assert summary["by_result"]["recovered"] == 1 def test_summary_by_action_type(self, audit): """Répartition par type d'action.""" audit.record(_make_entry(action_type="click")) audit.record(_make_entry(action_type="click")) audit.record(_make_entry(action_type="type")) summary = audit.get_summary() assert summary["by_action_type"]["click"] == 2 assert summary["by_action_type"]["type"] == 1 def test_summary_by_workflow(self, audit): """Répartition par workflow.""" audit.record(_make_entry(workflow_id="wf_01")) audit.record(_make_entry(workflow_id="wf_01")) audit.record(_make_entry(workflow_id="wf_02")) summary = audit.get_summary() assert summary["by_workflow"]["wf_01"] == 2 assert summary["by_workflow"]["wf_02"] == 1 def test_summary_by_execution_mode(self, audit): """Répartition par mode d'exécution.""" audit.record(_make_entry(execution_mode="autonomous")) audit.record(_make_entry(execution_mode="assisted")) audit.record(_make_entry(execution_mode="assisted")) summary = audit.get_summary() assert summary["by_execution_mode"]["autonomous"] == 1 assert summary["by_execution_mode"]["assisted"] == 2 def test_summary_date_field(self, audit): """Le résumé contient la date demandée.""" today = date.today().isoformat() summary = audit.get_summary(today) assert summary["date"] == today # ========================================================================= # Tests AuditTrail — export CSV # ========================================================================= class TestAuditTrailExportCSV: """Tests de l'export CSV.""" def test_export_csv_empty(self, audit): """Export sans données retourne une chaîne vide.""" csv_data = audit.export_csv(date_from="2025-01-01") assert csv_data == "" def test_export_csv_basic(self, audit): """Export CSV avec quelques entrées.""" audit.record(_make_entry(action_id="act_001")) audit.record(_make_entry(action_id="act_002")) csv_data = audit.export_csv() assert csv_data assert "act_001" in csv_data assert "act_002" in csv_data def test_export_csv_header(self, audit): """L'en-tête CSV contient tous les champs du dataclass.""" audit.record(_make_entry()) csv_data = audit.export_csv() reader = csv.DictReader(io.StringIO(csv_data)) fieldnames = reader.fieldnames assert "timestamp" in fieldnames assert "user_id" in fieldnames assert "action_detail" in fieldnames assert "domain" in fieldnames assert "duration_ms" in fieldnames def test_export_csv_parseable(self, audit): """Le CSV produit est parseable par le module csv.""" for i in range(5): audit.record(_make_entry( action_id=f"act_{i}", action_detail=f"Action {i} — avec des 'guillemets' et des, virgules", )) csv_data = audit.export_csv() reader = csv.DictReader(io.StringIO(csv_data)) rows = list(reader) assert len(rows) == 5 # Vérifier que les valeurs sont correctes malgré les caractères spéciaux for row in rows: assert "virgules" in row["action_detail"] def test_export_csv_filter_by_user(self, audit): """Export filtré par utilisateur.""" audit.record(_make_entry(user_id="tim_dupont", action_id="act_001")) audit.record(_make_entry(user_id="tim_martin", action_id="act_002")) csv_data = audit.export_csv(user_id="tim_dupont") reader = csv.DictReader(io.StringIO(csv_data)) rows = list(reader) assert len(rows) == 1 assert rows[0]["user_id"] == "tim_dupont" def test_export_csv_utf8(self, audit): """L'export CSV gère correctement l'UTF-8 français.""" audit.record(_make_entry( action_detail="Saisie 'Hépatite à cytomégalovirus' — réanimation néonatale", user_name="François Müller", )) csv_data = audit.export_csv() assert "Hépatite" in csv_data assert "François Müller" in csv_data # ========================================================================= # Tests de robustesse # ========================================================================= class TestAuditTrailRobustness: """Tests de robustesse et cas limites.""" def test_directory_auto_creation(self, tmp_path): """Le répertoire est créé automatiquement s'il n'existe pas.""" audit_dir = str(tmp_path / "nonexistent" / "deep" / "audit") assert not Path(audit_dir).exists() audit = AuditTrail(audit_dir=audit_dir) assert Path(audit_dir).exists() def test_corrupted_jsonl_line(self, audit, audit_dir): """Une ligne corrompue dans le fichier JSONL ne fait pas crasher la lecture.""" # Écrire des entrées normales audit.record(_make_entry(action_id="act_001")) audit.record(_make_entry(action_id="act_002")) # Injecter une ligne corrompue today = date.today().isoformat() filepath = Path(audit_dir) / f"audit_{today}.jsonl" with open(filepath, "a", encoding="utf-8") as f: f.write("{invalid json line\n") # Ajouter encore une entrée valide audit.record(_make_entry(action_id="act_003")) # La lecture doit fonctionner et ignorer la ligne corrompue entries = audit.query() assert len(entries) == 3 # 2 valides avant + 1 valide après def test_empty_file(self, audit, audit_dir): """Un fichier vide ne fait pas crasher.""" today = date.today().isoformat() filepath = Path(audit_dir) / f"audit_{today}.jsonl" filepath.touch() # Fichier vide entries = audit.query() assert len(entries) == 0 def test_concurrent_writes(self, audit): """Écritures concurrentes grâce au verrou threading.""" import threading errors = [] def write_entries(start): try: for i in range(20): audit.record(_make_entry(action_id=f"act_{start}_{i}")) except Exception as e: errors.append(str(e)) threads = [ threading.Thread(target=write_entries, args=(t,)) for t in range(5) ] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Erreurs concurrentes: {errors}" entries = audit.query(limit=200) assert len(entries) == 100 # 5 threads x 20 entrées def test_query_invalid_date(self, audit): """Dates invalides ne font pas crasher.""" # Ne doit pas lever d'exception results = audit.query(date_from="not-a-date") assert isinstance(results, list) def test_summary_invalid_date(self, audit): """Date invalide dans get_summary ne fait pas crasher.""" summary = audit.get_summary("not-a-date") assert summary["total_actions"] == 0 def test_entry_all_fields_present_in_export(self, audit): """Tous les champs du dataclass sont présents dans l'export CSV.""" from dataclasses import fields as dc_fields entry = _make_entry() audit.record(entry) csv_data = audit.export_csv() reader = csv.DictReader(io.StringIO(csv_data)) row = next(reader) expected_fields = {f.name for f in dc_fields(AuditEntry)} actual_fields = set(row.keys()) assert expected_fields == actual_fields def test_date_range_reversed(self, audit): """Plage de dates inversée (date_to < date_from) fonctionne quand même.""" today = date.today() yesterday = today - timedelta(days=1) audit.record(_make_entry( timestamp=datetime.combine(yesterday, datetime.min.time()).isoformat(), )) # date_from > date_to → doit quand même fonctionner results = audit.query( date_from=today.isoformat(), date_to=yesterday.isoformat(), ) # L'implémentation inverse automatiquement les dates assert isinstance(results, list)