"""Quarantaine différentielle pour Pseudonymisation v11.0. Un document n'est livré "anonymisé" que si toutes les étapes critiques ont réussi. Sinon, quarantaine différentielle : - partial : texte OK sort, PDF en quarantaine si rédaction rate - full : document entier en quarantaine si pré-flight ou rescan critique Ce module est totalement standalone : il n'importe rien du core ni d'autre module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement (pas de logging global, pas de print). """ from __future__ import annotations import json import traceback from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Literal, Optional # === Constantes === SEUIL_TEXTE_MINI = 100 # B-3 pré-flight : sous ce seuil = OCR raté/doc vide SEUIL_RESCAN_RESIDUEL = 0 # Tolérance zéro pour PII résiduelles après rescan QUARANTINE_DIR_NAME = "quarantaine" @dataclass class QuarantineEntry: """Une entrée dans le registre de quarantaine d'un batch.""" doc_name: str # nom de base sans extension reason: str # code court normalisé (cf REASON_CODES) detail: str # message libre timestamp: str # ISO 8601 avec timezone severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées stacktrace: Optional[str] = None # tb.format_exc() si exception extracted_chars: int = 0 # nb caractères extraits (utile pour preflight) # Codes raison normalisés (voir §5 du consolidé v2) REASON_CODES: dict[str, Literal["partial", "full"]] = { "preflight_text_too_short": "full", "extraction_total_failure": "full", "rescan_residual_pii": "full", "pdf_redaction_failed": "partial", "pdf_vector_fallback_to_raster": "partial", "regex_user_invalid": "partial", } # Actions recommandées par code raison (utilisées dans INDEX.md) RECOMMENDED_ACTIONS: dict[str, str] = { "preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé", "extraction_total_failure": "Inspecter le PDF source (corrompu? chiffré?)", "rescan_residual_pii": "Inspection manuelle, fix regex ou whitelist", "pdf_redaction_failed": "Voir le .pseudonymise.txt, ré-essayer manuellement", "pdf_vector_fallback_to_raster": "PDF en qualité raster (lisible mais moins précis)", } _DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt" class QuarantineManager: """Gestion centralisée de la quarantaine pour un batch entier. Une instance par batch. Maintient le registre des entrées, écrit les fichiers .reason.txt par doc, append errors.log, et génère INDEX.md à la fin du batch. """ def __init__(self, output_dir: Path, app_version: str = "0.11.0", commit_sha: str = "", profile_name: str = "standard_local") -> None: self.output_dir: Path = Path(output_dir) self.quarantine_dir: Path = self.output_dir / QUARANTINE_DIR_NAME self.app_version: str = app_version self.commit_sha: str = commit_sha self.profile_name: str = profile_name self.entries: list[QuarantineEntry] = [] self._errors_log_path: Path = self.output_dir / "errors.log" def flag(self, doc_name: str, reason: str, detail: str, severity: Literal["partial", "full"], *, exc: Optional[BaseException] = None, extracted_chars: int = 0, flags: Optional[list[str]] = None) -> QuarantineEntry: """Crée une entrée, écrit le .reason.txt, append errors.log. Si `exc` est fourni, on capture la stacktrace courante via ``traceback.format_exc()`` — ce qui suppose qu'on est appelé depuis un bloc ``except``. Sinon la stacktrace reste None. """ self.quarantine_dir.mkdir(parents=True, exist_ok=True) entry = QuarantineEntry( doc_name=doc_name, reason=reason, detail=detail, timestamp=datetime.now().astimezone().isoformat(), severity=severity, flags=flags if flags is not None else [reason], stacktrace=traceback.format_exc() if exc is not None else None, extracted_chars=extracted_chars, ) self.entries.append(entry) self._write_reason_txt(entry) self._append_errors_log(entry) return entry def has_full_quarantine(self, doc_name: str) -> bool: """True si un (ou plusieurs) flag `full` existe pour ce doc.""" return any(e.doc_name == doc_name and e.severity == "full" for e in self.entries) def has_any_flag(self, doc_name: str) -> bool: """True si au moins un flag (partial ou full) existe pour ce doc.""" return any(e.doc_name == doc_name for e in self.entries) def finalize(self, total_docs_processed: Optional[int] = None) -> None: """Écrit quarantaine/INDEX.md à la fin du batch. `total_docs_processed` est optionnel : si fourni, le taux de mise en quarantaine est calculé et affiché ; sinon la ligne `Taux` est omise. """ if not self.entries: return self.quarantine_dir.mkdir(parents=True, exist_ok=True) index_path = self.quarantine_dir / "INDEX.md" content = self._build_index_md(total_docs_processed=total_docs_processed) index_path.write_text(content, encoding="utf-8") def _write_reason_txt(self, entry: QuarantineEntry) -> None: """Écrit quarantaine/.reason.txt selon format §6.1 du consolidé.""" self.quarantine_dir.mkdir(parents=True, exist_ok=True) path = self.quarantine_dir / f"{entry.doc_name}.reason.txt" severity_label = ( "le document entier a été placé en quarantaine" if entry.severity == "full" else "le PDF de sortie n a pas pu être généré, le texte anonymisé est disponible" ) commit_short = self.commit_sha[:7] if self.commit_sha else "unknown" lines: list[str] = [ f"Document : {entry.doc_name}", f"Sévérité : {entry.severity} ({severity_label})", f"Raison : {entry.reason}", f"Détail : {entry.detail}", f"Horodatage : {entry.timestamp}", f"Version code : {self.app_version} (commit {commit_short})", f"Profil appliqué: {self.profile_name}", f"Caractères extraits : {entry.extracted_chars}", f"Flags : {', '.join(entry.flags)}", "", ] if entry.stacktrace: lines.append("--- stack trace ---") lines.append(entry.stacktrace) path.write_text("\n".join(lines), encoding="utf-8") def _append_errors_log(self, entry: QuarantineEntry) -> None: """Append une ligne JSON dans errors.log (format JSON-lines).""" self.output_dir.mkdir(parents=True, exist_ok=True) category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason record: dict[str, object] = { "ts": entry.timestamp, "doc": entry.doc_name, "level": "ERROR" if entry.severity == "full" else "WARNING", "category": category, "msg": entry.detail, "severity": entry.severity, } with open(self._errors_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str: """Construit le contenu du fichier INDEX.md selon §6.2 du consolidé.""" full_entries = [e for e in self.entries if e.severity == "full"] partial_entries = [e for e in self.entries if e.severity == "partial"] total_flagged = len(self.entries) batch_ts = datetime.now().astimezone().isoformat() commit_short = self.commit_sha[:7] if self.commit_sha else "unknown" lines: list[str] = [] lines.append(f"# Quarantaine — batch {batch_ts}") lines.append("") if total_docs_processed is not None: lines.append(f"**Documents traités** : {total_docs_processed}") else: lines.append(f"**Documents flaggés** : {total_flagged}") lines.append(f"**Quarantaine totale** : {len(full_entries)} (texte non livré)") lines.append(f"**Quarantaine partielle** : {len(partial_entries)} (texte OK, PDF en erreur)") if total_docs_processed is not None and total_docs_processed > 0: taux = (total_flagged / total_docs_processed) * 100.0 lines.append(f"**Taux** : {taux:.1f}%") lines.append("") # === Quarantaine totale === lines.append("## Quarantaine totale (full)") lines.append("") if full_entries: lines.append("| Document | Raison | Caractères extraits | Action recommandée |") lines.append("|---|---|---|---|") for e in full_entries: action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION) lines.append( f"| {e.doc_name} | {e.reason} | {e.extracted_chars} | {action} |" ) else: lines.append("_Aucun document en quarantaine totale._") lines.append("") # === Quarantaine partielle === lines.append("## Quarantaine partielle (partial)") lines.append("") if partial_entries: lines.append("| Document | Raison | Texte livré dans | Flags |") lines.append("|---|---|---|---|") for e in partial_entries: txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt" flags_str = ", ".join(e.flags) lines.append( f"| {e.doc_name} | {e.reason} | {txt_path} | {flags_str} |" ) else: lines.append("_Aucun document en quarantaine partielle._") lines.append("") # === Contexte batch === lines.append("## Contexte batch") lines.append("") lines.append(f"- Version : {self.app_version} (commit {commit_short})") lines.append(f"- Profil appliqué : {self.profile_name}") lines.append(f"- Horodatage : {batch_ts}") lines.append("") return "\n".join(lines) class DocLogger: """Logger fichier par document. Append-only, pas de buffer.""" def __init__(self, log_path: Path) -> None: self.log_path: Path = Path(log_path) self.log_path.parent.mkdir(parents=True, exist_ok=True) def _write(self, level: str, msg: str) -> None: ts = datetime.now().astimezone().isoformat() with open(self.log_path, "a", encoding="utf-8") as f: f.write(f"{ts} [{level}] {msg}\n") def info(self, msg: str) -> None: self._write("INFO", msg) def warning(self, msg: str) -> None: self._write("WARNING", msg) def error(self, msg: str) -> None: self._write("ERROR", msg)