# agent_v0/server_v1/audit_trail.py """ Module Audit Trail — traçabilité complète des actions RPA. Responsabilité : "Chaque action exécutée par Léa est tracée, datée, attribuée." En milieu hospitalier (codage CIM-10 via DPI), la traçabilité est une obligation légale. Ce module enregistre chaque action avec : - L'identité du TIM (Technicien d'Information Médicale) superviseur - Le mode d'exécution (autonome, assisté, shadow) - Le résultat détaillé (succès, échec, correction) - L'horodatage ISO 8601 Format de stockage : fichiers JSONL datés dans data/audit/ (un par jour). Aucune dépendance externe (stdlib + dataclasses uniquement). Usage : audit = AuditTrail() audit.record(AuditEntry( session_id="sess_abc", action_id="act_001", user_id="tim_dupont", user_name="Marie Dupont", ... )) entries = audit.query(user_id="tim_dupont", date_from="2026-04-01") csv_data = audit.export_csv(date_from="2026-04-01", date_to="2026-04-06") summary = audit.get_summary("2026-04-05") """ import csv import io import json import logging import os import threading from dataclasses import dataclass, asdict, fields from datetime import datetime, date, timedelta from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # Répertoire par défaut pour le stockage des fichiers d'audit _DEFAULT_AUDIT_DIR = os.environ.get("RPA_AUDIT_DIR", "data/audit") @dataclass class AuditEntry: """Entrée d'audit — un événement tracé dans le système.""" # Horodatage ISO 8601 (ex: 2026-04-05T14:23:01.456789) timestamp: str = "" # Identifiants de session et d'action session_id: str = "" action_id: str = "" # Identité de l'utilisateur superviseur user_id: str = "" # Identifiant du TIM (login Windows ou configuré) user_name: str = "" # Nom affiché (ex: "Marie Dupont") machine_id: str = "" # ID du poste client (hostname ou configuré) # Description de l'action action_type: str = "" # click, type, key_combo, wait, etc. action_detail: str = "" # Description humaine ("Clic sur 'Enregistrer' dans DxCare") target_app: str = "" # Application cible (DxCare, Orbis, etc.) # Mode d'exécution execution_mode: str = "" # "autonomous", "assisted", "shadow" # Résultat result: str = "" # "success", "failed", "skipped", "recovered" resolution_method: str = "" # Comment la cible a été trouvée (som_text_match, vlm_direct, etc.) critic_result: str = "" # Résultat de la vérification sémantique recovery_action: str = "" # Action corrective si échec (undo, escape, retry, none) # Contexte métier domain: str = "" # Domaine métier (tim_codage, generic, etc.) workflow_id: str = "" # ID du workflow exécuté workflow_name: str = "" # Nom lisible du workflow # Performance duration_ms: float = 0.0 # Durée de l'action en millisecondes def to_dict(self) -> Dict[str, Any]: """Convertir en dictionnaire sérialisable JSON.""" return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "AuditEntry": """Créer une entrée depuis un dictionnaire. Ignore les clés inconnues pour la compatibilité future. """ known_fields = {f.name for f in fields(cls)} filtered = {k: v for k, v in data.items() if k in known_fields} return cls(**filtered) class AuditTrail: """Gestionnaire de traçabilité — enregistrement et consultation des actions. Stocke chaque événement dans un fichier JSONL daté (un fichier par jour). Thread-safe grâce à un verrou d'écriture. Fichiers produits : data/audit/audit_2026-04-05.jsonl data/audit/audit_2026-04-06.jsonl ... """ def __init__(self, audit_dir: str = ""): self.audit_dir = Path(audit_dir or _DEFAULT_AUDIT_DIR) self.audit_dir.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() logger.info(f"Audit Trail initialisé : {self.audit_dir}") def _file_for_date(self, d: date) -> Path: """Chemin du fichier JSONL pour une date donnée.""" return self.audit_dir / f"audit_{d.isoformat()}.jsonl" def record(self, entry: AuditEntry) -> None: """Enregistrer une entrée d'audit. Ajoute un horodatage ISO 8601 si absent, puis écrit en append dans le fichier JSONL du jour. """ # Horodatage automatique si absent if not entry.timestamp: entry.timestamp = datetime.now().isoformat() # Déterminer le fichier du jour à partir du timestamp try: entry_date = datetime.fromisoformat(entry.timestamp).date() except (ValueError, TypeError): entry_date = date.today() audit_file = self._file_for_date(entry_date) with self._lock: try: with open(audit_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry.to_dict(), ensure_ascii=False) + "\n") except Exception as e: logger.error(f"Audit Trail: échec écriture {audit_file}: {e}") return logger.debug( f"Audit: {entry.result} {entry.action_type} " f"'{entry.action_detail[:50]}' " f"[user={entry.user_id}] [session={entry.session_id}]" ) def _load_file(self, filepath: Path) -> List[AuditEntry]: """Charger toutes les entrées d'un fichier JSONL.""" if not filepath.is_file(): return [] entries = [] try: with open(filepath, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: data = json.loads(line) entries.append(AuditEntry.from_dict(data)) except json.JSONDecodeError as e: logger.warning( f"Audit Trail: ligne {line_num} invalide dans " f"{filepath.name}: {e}" ) except Exception as e: logger.error(f"Audit Trail: échec lecture {filepath}: {e}") return entries def _date_range(self, date_from: str = "", date_to: str = "") -> List[date]: """Calculer la liste de dates entre date_from et date_to (inclus). Si date_from est vide, utilise aujourd'hui. Si date_to est vide, utilise date_from. Format attendu : YYYY-MM-DD. """ if date_from: try: d_from = date.fromisoformat(date_from) except ValueError: d_from = date.today() else: d_from = date.today() if date_to: try: d_to = date.fromisoformat(date_to) except ValueError: d_to = d_from else: d_to = d_from # Assurer l'ordre chronologique if d_to < d_from: d_from, d_to = d_to, d_from dates = [] current = d_from while current <= d_to: dates.append(current) current += timedelta(days=1) return dates def query( self, date_from: str = "", date_to: str = "", user_id: str = "", session_id: str = "", result: str = "", action_type: str = "", workflow_id: str = "", domain: str = "", limit: int = 500, offset: int = 0, ) -> List[Dict[str, Any]]: """Rechercher des entrées d'audit avec filtres. Tous les filtres sont optionnels et combinés en AND. Retourne les entrées triées par timestamp décroissant (plus récentes d'abord). """ dates = self._date_range(date_from, date_to) all_entries: List[AuditEntry] = [] for d in dates: filepath = self._file_for_date(d) all_entries.extend(self._load_file(filepath)) # Appliquer les filtres filtered = [] for entry in all_entries: if user_id and entry.user_id != user_id: continue if session_id and entry.session_id != session_id: continue if result and entry.result != result: continue if action_type and entry.action_type != action_type: continue if workflow_id and entry.workflow_id != workflow_id: continue if domain and entry.domain != domain: continue filtered.append(entry) # Tri par timestamp décroissant (plus récent en premier) filtered.sort(key=lambda e: e.timestamp, reverse=True) # Pagination paginated = filtered[offset:offset + limit] return [e.to_dict() for e in paginated] def get_summary(self, target_date: str = "") -> Dict[str, Any]: """Résumé journalier d'une date donnée. Retourne les statistiques agrégées : - Nombre total d'actions - Taux de succès - Répartition par utilisateur - Répartition par résultat - Répartition par type d'action - Répartition par workflow - Répartition par mode d'exécution """ if not target_date: target_date = date.today().isoformat() try: d = date.fromisoformat(target_date) except ValueError: d = date.today() entries = self._load_file(self._file_for_date(d)) if not entries: return { "date": d.isoformat(), "total_actions": 0, "success_rate": 0.0, "by_user": {}, "by_result": {}, "by_action_type": {}, "by_workflow": {}, "by_execution_mode": {}, } total = len(entries) successes = sum(1 for e in entries if e.result == "success") # Agrégations by_user: Dict[str, Dict[str, Any]] = {} by_result: Dict[str, int] = {} by_action_type: Dict[str, int] = {} by_workflow: Dict[str, int] = {} by_execution_mode: Dict[str, int] = {} for entry in entries: # Par utilisateur uid = entry.user_id or "inconnu" if uid not in by_user: by_user[uid] = { "user_name": entry.user_name, "total": 0, "success": 0, } by_user[uid]["total"] += 1 if entry.result == "success": by_user[uid]["success"] += 1 # Par résultat r = entry.result or "inconnu" by_result[r] = by_result.get(r, 0) + 1 # Par type d'action at = entry.action_type or "inconnu" by_action_type[at] = by_action_type.get(at, 0) + 1 # Par workflow wf = entry.workflow_id or "inconnu" by_workflow[wf] = by_workflow.get(wf, 0) + 1 # Par mode d'exécution em = entry.execution_mode or "inconnu" by_execution_mode[em] = by_execution_mode.get(em, 0) + 1 # Calculer le taux de succès par utilisateur for uid, stats in by_user.items(): stats["success_rate"] = round( stats["success"] / stats["total"], 3 ) if stats["total"] > 0 else 0.0 return { "date": d.isoformat(), "total_actions": total, "success_rate": round(successes / total, 3) if total > 0 else 0.0, "by_user": by_user, "by_result": by_result, "by_action_type": by_action_type, "by_workflow": by_workflow, "by_execution_mode": by_execution_mode, } def export_csv( self, date_from: str = "", date_to: str = "", user_id: str = "", session_id: str = "", ) -> str: """Exporter les entrées d'audit en CSV. Retourne une chaîne CSV complète (avec en-tête). Filtres optionnels par date, utilisateur, session. """ # Récupérer les entrées avec les mêmes filtres que query() entries = self.query( date_from=date_from, date_to=date_to, user_id=user_id, session_id=session_id, limit=100000, # Pas de pagination pour l'export ) if not entries: return "" # En-têtes CSV — même ordre que le dataclass fieldnames = [f.name for f in fields(AuditEntry)] output = io.StringIO() writer = csv.DictWriter( output, fieldnames=fieldnames, extrasaction="ignore", quoting=csv.QUOTE_MINIMAL, ) writer.writeheader() for entry_dict in entries: writer.writerow(entry_dict) return output.getvalue()