Files
anonymisation/quarantine.py
Domi31tls 7fc97aa11f feat(q1): add quarantine.py module — entries, manager, logger
Module standalone pour la quarantaine différentielle Q-1 :
- QuarantineEntry dataclass (doc_name, reason, detail, severity, flags...)
- QuarantineManager (flag, has_full_quarantine, finalize, INDEX.md gen)
- DocLogger (B-2 logs par doc, append-only)
- Constantes SEUIL_TEXTE_MINI=100, SEUIL_RESCAN_RESIDUEL=0

Smoke test OK : 2 entrées (full + partial), INDEX.md, errors.log,
reason.txt générés conformes spec §6 du consolidé v2.

Ref: docs/coordination/inbox/for-dom/2026-05-29_consolide_pseudocode-Q1-v2.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-29 17:58:46 +02:00

255 lines
11 KiB
Python

"""Quarantaine différentielle pour Pseudonymisation v11.0.
Un document n'est livré "anonymisé" que si toutes les étapes critiques ont
réussi. Sinon, quarantaine différentielle :
- partial : texte OK sort, PDF en quarantaine si rédaction rate
- full : document entier en quarantaine si pré-flight ou rescan critique
Ce module est totalement standalone : il n'importe rien du core ni d'autre
module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement
(pas de logging global, pas de print).
"""
from __future__ import annotations
import json
import traceback
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Literal, Optional
# === Constantes ===
SEUIL_TEXTE_MINI = 100 # B-3 pré-flight : sous ce seuil = OCR raté/doc vide
SEUIL_RESCAN_RESIDUEL = 0 # Tolérance zéro pour PII résiduelles après rescan
QUARANTINE_DIR_NAME = "quarantaine"
@dataclass
class QuarantineEntry:
"""Une entrée dans le registre de quarantaine d'un batch."""
doc_name: str # nom de base sans extension
reason: str # code court normalisé (cf REASON_CODES)
detail: str # message libre
timestamp: str # ISO 8601 avec timezone
severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier
flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées
stacktrace: Optional[str] = None # tb.format_exc() si exception
extracted_chars: int = 0 # nb caractères extraits (utile pour preflight)
# Codes raison normalisés (voir §5 du consolidé v2)
REASON_CODES: dict[str, Literal["partial", "full"]] = {
"preflight_text_too_short": "full",
"extraction_total_failure": "full",
"rescan_residual_pii": "full",
"pdf_redaction_failed": "partial",
"pdf_vector_fallback_to_raster": "partial",
"regex_user_invalid": "partial",
}
# Actions recommandées par code raison (utilisées dans INDEX.md)
RECOMMENDED_ACTIONS: dict[str, str] = {
"preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé",
"extraction_total_failure": "Inspecter le PDF source (corrompu? chiffré?)",
"rescan_residual_pii": "Inspection manuelle, fix regex ou whitelist",
"pdf_redaction_failed": "Voir le .pseudonymise.txt, ré-essayer manuellement",
"pdf_vector_fallback_to_raster": "PDF en qualité raster (lisible mais moins précis)",
}
_DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt"
class QuarantineManager:
"""Gestion centralisée de la quarantaine pour un batch entier.
Une instance par batch. Maintient le registre des entrées, écrit les
fichiers .reason.txt par doc, append errors.log, et génère INDEX.md
à la fin du batch.
"""
def __init__(self, output_dir: Path, app_version: str = "0.11.0",
commit_sha: str = "", profile_name: str = "standard_local") -> None:
self.output_dir: Path = Path(output_dir)
self.quarantine_dir: Path = self.output_dir / QUARANTINE_DIR_NAME
self.app_version: str = app_version
self.commit_sha: str = commit_sha
self.profile_name: str = profile_name
self.entries: list[QuarantineEntry] = []
self._errors_log_path: Path = self.output_dir / "errors.log"
def flag(self, doc_name: str, reason: str, detail: str,
severity: Literal["partial", "full"], *,
exc: Optional[BaseException] = None,
extracted_chars: int = 0,
flags: Optional[list[str]] = None) -> QuarantineEntry:
"""Crée une entrée, écrit le .reason.txt, append errors.log.
Si `exc` est fourni, on capture la stacktrace courante via
``traceback.format_exc()`` — ce qui suppose qu'on est appelé depuis
un bloc ``except``. Sinon la stacktrace reste None.
"""
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
entry = QuarantineEntry(
doc_name=doc_name,
reason=reason,
detail=detail,
timestamp=datetime.now().astimezone().isoformat(),
severity=severity,
flags=flags if flags is not None else [reason],
stacktrace=traceback.format_exc() if exc is not None else None,
extracted_chars=extracted_chars,
)
self.entries.append(entry)
self._write_reason_txt(entry)
self._append_errors_log(entry)
return entry
def has_full_quarantine(self, doc_name: str) -> bool:
"""True si un (ou plusieurs) flag `full` existe pour ce doc."""
return any(e.doc_name == doc_name and e.severity == "full" for e in self.entries)
def has_any_flag(self, doc_name: str) -> bool:
"""True si au moins un flag (partial ou full) existe pour ce doc."""
return any(e.doc_name == doc_name for e in self.entries)
def finalize(self, total_docs_processed: Optional[int] = None) -> None:
"""Écrit quarantaine/INDEX.md à la fin du batch.
`total_docs_processed` est optionnel : si fourni, le taux de mise en
quarantaine est calculé et affiché ; sinon la ligne `Taux` est omise.
"""
if not self.entries:
return
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
index_path = self.quarantine_dir / "INDEX.md"
content = self._build_index_md(total_docs_processed=total_docs_processed)
index_path.write_text(content, encoding="utf-8")
def _write_reason_txt(self, entry: QuarantineEntry) -> None:
"""Écrit quarantaine/<docname>.reason.txt selon format §6.1 du consolidé."""
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
path = self.quarantine_dir / f"{entry.doc_name}.reason.txt"
severity_label = (
"le document entier a été placé en quarantaine"
if entry.severity == "full"
else "le PDF de sortie n a pas pu être généré, le texte anonymisé est disponible"
)
commit_short = self.commit_sha[:7] if self.commit_sha else "unknown"
lines: list[str] = [
f"Document : {entry.doc_name}",
f"Sévérité : {entry.severity} ({severity_label})",
f"Raison : {entry.reason}",
f"Détail : {entry.detail}",
f"Horodatage : {entry.timestamp}",
f"Version code : {self.app_version} (commit {commit_short})",
f"Profil appliqué: {self.profile_name}",
f"Caractères extraits : {entry.extracted_chars}",
f"Flags : {', '.join(entry.flags)}",
"",
]
if entry.stacktrace:
lines.append("--- stack trace ---")
lines.append(entry.stacktrace)
path.write_text("\n".join(lines), encoding="utf-8")
def _append_errors_log(self, entry: QuarantineEntry) -> None:
"""Append une ligne JSON dans errors.log (format JSON-lines)."""
self.output_dir.mkdir(parents=True, exist_ok=True)
category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason
record: dict[str, object] = {
"ts": entry.timestamp,
"doc": entry.doc_name,
"level": "ERROR" if entry.severity == "full" else "WARNING",
"category": category,
"msg": entry.detail,
"severity": entry.severity,
}
with open(self._errors_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str:
"""Construit le contenu du fichier INDEX.md selon §6.2 du consolidé."""
full_entries = [e for e in self.entries if e.severity == "full"]
partial_entries = [e for e in self.entries if e.severity == "partial"]
total_flagged = len(self.entries)
batch_ts = datetime.now().astimezone().isoformat()
commit_short = self.commit_sha[:7] if self.commit_sha else "unknown"
lines: list[str] = []
lines.append(f"# Quarantaine — batch {batch_ts}")
lines.append("")
if total_docs_processed is not None:
lines.append(f"**Documents traités** : {total_docs_processed}")
else:
lines.append(f"**Documents flaggés** : {total_flagged}")
lines.append(f"**Quarantaine totale** : {len(full_entries)} (texte non livré)")
lines.append(f"**Quarantaine partielle** : {len(partial_entries)} (texte OK, PDF en erreur)")
if total_docs_processed is not None and total_docs_processed > 0:
taux = (total_flagged / total_docs_processed) * 100.0
lines.append(f"**Taux** : {taux:.1f}%")
lines.append("")
# === Quarantaine totale ===
lines.append("## Quarantaine totale (full)")
lines.append("")
if full_entries:
lines.append("| Document | Raison | Caractères extraits | Action recommandée |")
lines.append("|---|---|---|---|")
for e in full_entries:
action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION)
lines.append(
f"| {e.doc_name} | {e.reason} | {e.extracted_chars} | {action} |"
)
else:
lines.append("_Aucun document en quarantaine totale._")
lines.append("")
# === Quarantaine partielle ===
lines.append("## Quarantaine partielle (partial)")
lines.append("")
if partial_entries:
lines.append("| Document | Raison | Texte livré dans | Flags |")
lines.append("|---|---|---|---|")
for e in partial_entries:
txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt"
flags_str = ", ".join(e.flags)
lines.append(
f"| {e.doc_name} | {e.reason} | {txt_path} | {flags_str} |"
)
else:
lines.append("_Aucun document en quarantaine partielle._")
lines.append("")
# === Contexte batch ===
lines.append("## Contexte batch")
lines.append("")
lines.append(f"- Version : {self.app_version} (commit {commit_short})")
lines.append(f"- Profil appliqué : {self.profile_name}")
lines.append(f"- Horodatage : {batch_ts}")
lines.append("")
return "\n".join(lines)
class DocLogger:
"""Logger fichier par document. Append-only, pas de buffer."""
def __init__(self, log_path: Path) -> None:
self.log_path: Path = Path(log_path)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
def _write(self, level: str, msg: str) -> None:
ts = datetime.now().astimezone().isoformat()
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(f"{ts} [{level}] {msg}\n")
def info(self, msg: str) -> None:
self._write("INFO", msg)
def warning(self, msg: str) -> None:
self._write("WARNING", msg)
def error(self, msg: str) -> None:
self._write("ERROR", msg)