"""Quarantaine différentielle pour Pseudonymisation v11.0. Un document n'est livré "anonymisé" que si toutes les étapes critiques ont réussi. Sinon, quarantaine différentielle : - partial : texte OK sort, PDF en quarantaine si rédaction rate - full : document entier en quarantaine si pré-flight ou rescan critique Ce module est totalement standalone : il n'importe rien du core ni d'autre module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement (pas de logging global, pas de print). """ from __future__ import annotations import fcntl import json import os import re import traceback from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Literal, Optional # === Constantes === SEUIL_TEXTE_MINI = 100 # B-3 pré-flight : sous ce seuil = OCR raté/doc vide SEUIL_RESCAN_RESIDUEL = 0 # Tolérance zéro pour PII résiduelles après rescan QUARANTINE_DIR_NAME = "quarantaine" @dataclass class QuarantineEntry: """Une entrée dans le registre de quarantaine d'un batch.""" doc_name: str # nom de base sans extension reason: str # code court normalisé detail: str # message libre timestamp: str # ISO 8601 avec timezone severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées stacktrace: Optional[str] = None # stacktrace formatée de l'exception extracted_chars: int = 0 # nb caractères extraits (utile pour preflight) # Actions recommandées par code raison (utilisées dans INDEX.md) RECOMMENDED_ACTIONS: dict[str, str] = { "preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé", "extraction_total_failure": "Inspecter le PDF source (corrompu? chiffré?)", "rescan_residual_pii": "Inspection manuelle, fix regex ou whitelist", "pdf_redaction_failed": "Voir le .pseudonymise.txt, ré-essayer manuellement", "pdf_vector_fallback_to_raster": "PDF en qualité raster (lisible mais moins précis)", } _DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt" def _sanitize_doc_name(doc_name: str) -> str: """Nettoie le nom de document pour éviter path traversal et injection markdown.""" # Ne garder que le nom de base (sans composantes de chemin) safe = Path(doc_name).name # Remplacer les caractères dangereux par des underscores safe = re.sub(r'[<>:"|?*\x00-\x1f]', '_', safe) return safe def _escape_markdown_table_cell(text: str) -> str: """Échappe les caractères spéciaux pour les cellules de tableau markdown.""" return text.replace("|", "\\|").replace("<", "<").replace(">", ">") class QuarantineManager: """Gestion centralisée de la quarantaine pour un batch entier. Une instance par batch. Maintient le registre des entrées, écrit les fichiers .reason.txt par doc, append errors.log, et génère INDEX.md à la fin du batch. """ def __init__(self, output_dir: Path, app_version: str = "0.11.0", commit_sha: str = "", profile_name: str = "standard_local") -> None: self.output_dir: Path = Path(output_dir) self.quarantine_dir: Path = self.output_dir / QUARANTINE_DIR_NAME self.app_version: str = app_version self.commit_sha: str = commit_sha self.profile_name: str = profile_name self.entries: list[QuarantineEntry] = [] self._errors_log_path: Path = self.output_dir / "errors.log" self._quarantine_dir_secured: bool = False def _secure_quarantine_dir(self) -> None: """Crée le répertoire de quarantaine avec permissions restrictives (0o700).""" if self._quarantine_dir_secured: return self.quarantine_dir.mkdir(parents=True, exist_ok=True) # Restreindre à l'utilisateur seul (RGPD/HDS — PII médicales) try: os.chmod(str(self.quarantine_dir), 0o700) except OSError: pass # certains FS ne supportent pas chmod self._quarantine_dir_secured = True def flag(self, doc_name: str, reason: str, detail: str, severity: Literal["partial", "full"], *, exc: Optional[BaseException] = None, extracted_chars: int = 0, flags: Optional[list[str]] = None) -> QuarantineEntry: """Crée une entrée, écrit le .reason.txt, append errors.log. Si `exc` est fourni, on capture sa stacktrace via ``traceback.format_exception`` — fonctionne même hors d'un bloc ``except``. Sinon la stacktrace reste None. """ self._secure_quarantine_dir() # Sanitiser le nom de document (anti path-traversal) safe_doc_name = _sanitize_doc_name(doc_name) # Capturer la stacktrace de l'exception passée (pas l'exception courante) stacktrace: Optional[str] = None if exc is not None: stacktrace = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) entry = QuarantineEntry( doc_name=safe_doc_name, reason=reason, detail=detail, timestamp=datetime.now().astimezone().isoformat(), severity=severity, flags=flags if flags is not None else [reason], stacktrace=stacktrace, extracted_chars=extracted_chars, ) self.entries.append(entry) self._write_reason_txt(entry) self._append_errors_log(entry) return entry def has_full_quarantine(self, doc_name: str) -> bool: """True si un (ou plusieurs) flag `full` existe pour ce doc.""" safe = _sanitize_doc_name(doc_name) return any(e.doc_name == safe and e.severity == "full" for e in self.entries) def has_any_flag(self, doc_name: str) -> bool: """True si au moins un flag (partial ou full) existe pour ce doc.""" safe = _sanitize_doc_name(doc_name) return any(e.doc_name == safe for e in self.entries) def finalize(self, total_docs_processed: Optional[int] = None) -> None: """Écrit quarantaine/INDEX.md à la fin du batch. `total_docs_processed` est optionnel : si fourni, le taux de mise en quarantaine est calculé et affiché ; sinon la ligne `Taux` est omise. """ if not self.entries: return self._secure_quarantine_dir() index_path = self.quarantine_dir / "INDEX.md" content = self._build_index_md(total_docs_processed=total_docs_processed) index_path.write_text(content, encoding="utf-8") def _write_reason_txt(self, entry: QuarantineEntry) -> None: """Écrit quarantaine/.reason.txt selon format §6.1 du consolidé.""" self._secure_quarantine_dir() path = self.quarantine_dir / f"{entry.doc_name}.reason.txt" severity_label = ( "le document entier a été placé en quarantaine" if entry.severity == "full" else "le PDF de sortie n a pas pu être généré, le texte anonymisé est disponible" ) commit_short = self.commit_sha[:7] if self.commit_sha else "unknown" lines: list[str] = [ f"Document : {entry.doc_name}", f"Sévérité : {entry.severity} ({severity_label})", f"Raison : {entry.reason}", f"Détail : {entry.detail}", f"Horodatage : {entry.timestamp}", f"Version code : {self.app_version} (commit {commit_short})", f"Profil appliqué: {self.profile_name}", f"Caractères extraits : {entry.extracted_chars}", f"Flags : {', '.join(entry.flags)}", "", ] if entry.stacktrace: lines.append("--- stack trace ---") lines.append(entry.stacktrace) path.write_text("\n".join(lines), encoding="utf-8") def _append_errors_log(self, entry: QuarantineEntry) -> None: """Append une ligne JSON dans errors.log (format JSON-lines). M1 : utilise os.open(O_NOFOLLOW) pour éviter TOCTOU symlink. M4 : utilise fcntl.flock(LOCK_EX) pour serialization entre workers ProcessPoolExecutor. """ self.output_dir.mkdir(parents=True, exist_ok=True) category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason record: dict[str, object] = { "ts": entry.timestamp, "doc": entry.doc_name, "level": "ERROR" if entry.severity == "full" else "WARNING", "category": category, "msg": entry.detail, "severity": entry.severity, } line = json.dumps(record, ensure_ascii=False) + "\n" # M1: O_NOFOLLOW — refuse les symlinks atomiquement (pas de TOCTOU) # M4: O_CREAT|O_APPEND|O_WRONLY + permissions 0o600 dès la création fd = os.open( str(self._errors_log_path), os.O_CREAT | os.O_APPEND | os.O_WRONLY | os.O_NOFOLLOW, 0o600, ) try: # Major2: réparer permissions si fichier existait avec mode trop permissif try: os.fchmod(fd, 0o600) except OSError: pass # M4: lock exclusif pour serialiser les workers concurrents fcntl.flock(fd, fcntl.LOCK_EX) try: os.write(fd, line.encode("utf-8")) finally: fcntl.flock(fd, fcntl.LOCK_UN) finally: os.close(fd) def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str: """Construit le contenu du fichier INDEX.md selon §6.2 du consolidé.""" full_entries = [e for e in self.entries if e.severity == "full"] partial_entries = [e for e in self.entries if e.severity == "partial"] total_flagged = len(self.entries) batch_ts = datetime.now().astimezone().isoformat() commit_short = self.commit_sha[:7] if self.commit_sha else "unknown" lines: list[str] = [] lines.append(f"# Quarantaine — batch {batch_ts}") lines.append("") if total_docs_processed is not None: lines.append(f"**Documents traités** : {total_docs_processed}") else: lines.append(f"**Documents flaggés** : {total_flagged}") lines.append(f"**Quarantaine totale** : {len(full_entries)} (texte non livré)") lines.append(f"**Quarantaine partielle** : {len(partial_entries)} (texte OK, PDF en erreur)") if total_docs_processed is not None and total_docs_processed > 0: taux = (total_flagged / total_docs_processed) * 100.0 lines.append(f"**Taux** : {taux:.1f}%") lines.append("") # === Quarantaine totale === lines.append("## Quarantaine totale (full)") lines.append("") if full_entries: lines.append("| Document | Raison | Caractères extraits | Action recommandée |") lines.append("|---|---|---|---|") for e in full_entries: action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION) lines.append( f"| {_escape_markdown_table_cell(e.doc_name)} " f"| {_escape_markdown_table_cell(e.reason)} " f"| {e.extracted_chars} " f"| {_escape_markdown_table_cell(action)} |" ) else: lines.append("_Aucun document en quarantaine totale._") lines.append("") # === Quarantaine partielle === lines.append("## Quarantaine partielle (partial)") lines.append("") if partial_entries: lines.append("| Document | Raison | Texte livré dans | Flags |") lines.append("|---|---|---|---|") for e in partial_entries: txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt" flags_str = _escape_markdown_table_cell(", ".join(e.flags)) lines.append( f"| {_escape_markdown_table_cell(e.doc_name)} " f"| {_escape_markdown_table_cell(e.reason)} " f"| {txt_path} " f"| {flags_str} |" ) else: lines.append("_Aucun document en quarantaine partielle._") lines.append("") # === Contexte batch === lines.append("## Contexte batch") lines.append("") lines.append(f"- Version : {self.app_version} (commit {commit_short})") lines.append(f"- Profil appliqué : {self.profile_name}") lines.append(f"- Horodatage : {batch_ts}") lines.append("") return "\n".join(lines)