From 454891713076c1d0e8759fb61ba8bb05c9985e69 Mon Sep 17 00:00:00 2001 From: Domi31tls Date: Fri, 29 May 2026 17:58:46 +0200 Subject: [PATCH] =?UTF-8?q?feat(q1):=20add=20quarantine.py=20module=20?= =?UTF-8?q?=E2=80=94=20entries,=20manager,=20logger?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Module standalone pour la quarantaine différentielle Q-1 : - QuarantineEntry dataclass (doc_name, reason, detail, severity, flags...) - QuarantineManager (flag, has_full_quarantine, finalize, INDEX.md gen) - DocLogger (B-2 logs par doc, append-only) - Constantes SEUIL_TEXTE_MINI=100, SEUIL_RESCAN_RESIDUEL=0 Smoke test OK : 2 entrées (full + partial), INDEX.md, errors.log, reason.txt générés conformes spec §6 du consolidé v2. Ref: docs/coordination/inbox/for-dom/2026-05-29_consolide_pseudocode-Q1-v2.md Co-Authored-By: Claude Opus 4.7 (1M context) --- quarantine.py | 254 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 quarantine.py diff --git a/quarantine.py b/quarantine.py new file mode 100644 index 0000000..2939d7b --- /dev/null +++ b/quarantine.py @@ -0,0 +1,254 @@ +"""Quarantaine différentielle pour Pseudonymisation v11.0. + +Un document n'est livré "anonymisé" que si toutes les étapes critiques ont +réussi. Sinon, quarantaine différentielle : +- partial : texte OK sort, PDF en quarantaine si rédaction rate +- full : document entier en quarantaine si pré-flight ou rescan critique + +Ce module est totalement standalone : il n'importe rien du core ni d'autre +module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement +(pas de logging global, pas de print). +""" +from __future__ import annotations + +import json +import traceback +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Literal, Optional + +# === Constantes === + +SEUIL_TEXTE_MINI = 100 # B-3 pré-flight : sous ce seuil = OCR raté/doc vide +SEUIL_RESCAN_RESIDUEL = 0 # Tolérance zéro pour PII résiduelles après rescan +QUARANTINE_DIR_NAME = "quarantaine" + + +@dataclass +class QuarantineEntry: + """Une entrée dans le registre de quarantaine d'un batch.""" + doc_name: str # nom de base sans extension + reason: str # code court normalisé (cf REASON_CODES) + detail: str # message libre + timestamp: str # ISO 8601 avec timezone + severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier + flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées + stacktrace: Optional[str] = None # tb.format_exc() si exception + extracted_chars: int = 0 # nb caractères extraits (utile pour preflight) + + +# Codes raison normalisés (voir §5 du consolidé v2) +REASON_CODES: dict[str, Literal["partial", "full"]] = { + "preflight_text_too_short": "full", + "extraction_total_failure": "full", + "rescan_residual_pii": "full", + "pdf_redaction_failed": "partial", + "pdf_vector_fallback_to_raster": "partial", + "regex_user_invalid": "partial", +} + + +# Actions recommandées par code raison (utilisées dans INDEX.md) +RECOMMENDED_ACTIONS: dict[str, str] = { + "preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé", + "extraction_total_failure": "Inspecter le PDF source (corrompu? chiffré?)", + "rescan_residual_pii": "Inspection manuelle, fix regex ou whitelist", + "pdf_redaction_failed": "Voir le .pseudonymise.txt, ré-essayer manuellement", + "pdf_vector_fallback_to_raster": "PDF en qualité raster (lisible mais moins précis)", +} +_DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt" + + +class QuarantineManager: + """Gestion centralisée de la quarantaine pour un batch entier. + + Une instance par batch. Maintient le registre des entrées, écrit les + fichiers .reason.txt par doc, append errors.log, et génère INDEX.md + à la fin du batch. + """ + + def __init__(self, output_dir: Path, app_version: str = "0.11.0", + commit_sha: str = "", profile_name: str = "standard_local") -> None: + self.output_dir: Path = Path(output_dir) + self.quarantine_dir: Path = self.output_dir / QUARANTINE_DIR_NAME + self.app_version: str = app_version + self.commit_sha: str = commit_sha + self.profile_name: str = profile_name + self.entries: list[QuarantineEntry] = [] + self._errors_log_path: Path = self.output_dir / "errors.log" + + def flag(self, doc_name: str, reason: str, detail: str, + severity: Literal["partial", "full"], *, + exc: Optional[BaseException] = None, + extracted_chars: int = 0, + flags: Optional[list[str]] = None) -> QuarantineEntry: + """Crée une entrée, écrit le .reason.txt, append errors.log. + + Si `exc` est fourni, on capture la stacktrace courante via + ``traceback.format_exc()`` — ce qui suppose qu'on est appelé depuis + un bloc ``except``. Sinon la stacktrace reste None. + """ + self.quarantine_dir.mkdir(parents=True, exist_ok=True) + entry = QuarantineEntry( + doc_name=doc_name, + reason=reason, + detail=detail, + timestamp=datetime.now().astimezone().isoformat(), + severity=severity, + flags=flags if flags is not None else [reason], + stacktrace=traceback.format_exc() if exc is not None else None, + extracted_chars=extracted_chars, + ) + self.entries.append(entry) + self._write_reason_txt(entry) + self._append_errors_log(entry) + return entry + + def has_full_quarantine(self, doc_name: str) -> bool: + """True si un (ou plusieurs) flag `full` existe pour ce doc.""" + return any(e.doc_name == doc_name and e.severity == "full" for e in self.entries) + + def has_any_flag(self, doc_name: str) -> bool: + """True si au moins un flag (partial ou full) existe pour ce doc.""" + return any(e.doc_name == doc_name for e in self.entries) + + def finalize(self, total_docs_processed: Optional[int] = None) -> None: + """Écrit quarantaine/INDEX.md à la fin du batch. + + `total_docs_processed` est optionnel : si fourni, le taux de mise en + quarantaine est calculé et affiché ; sinon la ligne `Taux` est omise. + """ + if not self.entries: + return + self.quarantine_dir.mkdir(parents=True, exist_ok=True) + index_path = self.quarantine_dir / "INDEX.md" + content = self._build_index_md(total_docs_processed=total_docs_processed) + index_path.write_text(content, encoding="utf-8") + + def _write_reason_txt(self, entry: QuarantineEntry) -> None: + """Écrit quarantaine/.reason.txt selon format §6.1 du consolidé.""" + self.quarantine_dir.mkdir(parents=True, exist_ok=True) + path = self.quarantine_dir / f"{entry.doc_name}.reason.txt" + severity_label = ( + "le document entier a été placé en quarantaine" + if entry.severity == "full" + else "le PDF de sortie n a pas pu être généré, le texte anonymisé est disponible" + ) + commit_short = self.commit_sha[:7] if self.commit_sha else "unknown" + lines: list[str] = [ + f"Document : {entry.doc_name}", + f"Sévérité : {entry.severity} ({severity_label})", + f"Raison : {entry.reason}", + f"Détail : {entry.detail}", + f"Horodatage : {entry.timestamp}", + f"Version code : {self.app_version} (commit {commit_short})", + f"Profil appliqué: {self.profile_name}", + f"Caractères extraits : {entry.extracted_chars}", + f"Flags : {', '.join(entry.flags)}", + "", + ] + if entry.stacktrace: + lines.append("--- stack trace ---") + lines.append(entry.stacktrace) + path.write_text("\n".join(lines), encoding="utf-8") + + def _append_errors_log(self, entry: QuarantineEntry) -> None: + """Append une ligne JSON dans errors.log (format JSON-lines).""" + self.output_dir.mkdir(parents=True, exist_ok=True) + category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason + record: dict[str, object] = { + "ts": entry.timestamp, + "doc": entry.doc_name, + "level": "ERROR" if entry.severity == "full" else "WARNING", + "category": category, + "msg": entry.detail, + "severity": entry.severity, + } + with open(self._errors_log_path, "a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + + def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str: + """Construit le contenu du fichier INDEX.md selon §6.2 du consolidé.""" + full_entries = [e for e in self.entries if e.severity == "full"] + partial_entries = [e for e in self.entries if e.severity == "partial"] + total_flagged = len(self.entries) + batch_ts = datetime.now().astimezone().isoformat() + commit_short = self.commit_sha[:7] if self.commit_sha else "unknown" + + lines: list[str] = [] + lines.append(f"# Quarantaine — batch {batch_ts}") + lines.append("") + if total_docs_processed is not None: + lines.append(f"**Documents traités** : {total_docs_processed}") + else: + lines.append(f"**Documents flaggés** : {total_flagged}") + lines.append(f"**Quarantaine totale** : {len(full_entries)} (texte non livré)") + lines.append(f"**Quarantaine partielle** : {len(partial_entries)} (texte OK, PDF en erreur)") + if total_docs_processed is not None and total_docs_processed > 0: + taux = (total_flagged / total_docs_processed) * 100.0 + lines.append(f"**Taux** : {taux:.1f}%") + lines.append("") + + # === Quarantaine totale === + lines.append("## Quarantaine totale (full)") + lines.append("") + if full_entries: + lines.append("| Document | Raison | Caractères extraits | Action recommandée |") + lines.append("|---|---|---|---|") + for e in full_entries: + action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION) + lines.append( + f"| {e.doc_name} | {e.reason} | {e.extracted_chars} | {action} |" + ) + else: + lines.append("_Aucun document en quarantaine totale._") + lines.append("") + + # === Quarantaine partielle === + lines.append("## Quarantaine partielle (partial)") + lines.append("") + if partial_entries: + lines.append("| Document | Raison | Texte livré dans | Flags |") + lines.append("|---|---|---|---|") + for e in partial_entries: + txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt" + flags_str = ", ".join(e.flags) + lines.append( + f"| {e.doc_name} | {e.reason} | {txt_path} | {flags_str} |" + ) + else: + lines.append("_Aucun document en quarantaine partielle._") + lines.append("") + + # === Contexte batch === + lines.append("## Contexte batch") + lines.append("") + lines.append(f"- Version : {self.app_version} (commit {commit_short})") + lines.append(f"- Profil appliqué : {self.profile_name}") + lines.append(f"- Horodatage : {batch_ts}") + lines.append("") + + return "\n".join(lines) + + +class DocLogger: + """Logger fichier par document. Append-only, pas de buffer.""" + + def __init__(self, log_path: Path) -> None: + self.log_path: Path = Path(log_path) + self.log_path.parent.mkdir(parents=True, exist_ok=True) + + def _write(self, level: str, msg: str) -> None: + ts = datetime.now().astimezone().isoformat() + with open(self.log_path, "a", encoding="utf-8") as f: + f.write(f"{ts} [{level}] {msg}\n") + + def info(self, msg: str) -> None: + self._write("INFO", msg) + + def warning(self, msg: str) -> None: + self._write("WARNING", msg) + + def error(self, msg: str) -> None: + self._write("ERROR", msg)