Files
anonymisation/quarantine.py
Domi31tls 6df87defd1 feat(q1): F+sécurité — rescan inconditionnel + hardening quarantine
Suite des étapes Q-1 (F = rescan résiduel) + apport sécurité par Qwen
review Codex gpt-5.5 5 rounds (verdict READY FOR MERGE).

## anonymizer_core_refactored_onnx.py

- M5 Rescan résiduel inconditionnel : NIR/EMAIL/IBAN/TEL recherchés après
  TOUT nettoyage. Fail-closed — aucun output livré si > seuil
  (SEUIL_RESCAN_RESIDUEL = 0)
- M3 Return structuré : process_pdf retourne maintenant
  {"status": "quarantined", "reason": ..., "text": "", "audit": ""} au lieu
  de {} sur quarantaine — callers compatibles avec outputs["text"]/"audit"
- C3+M2 fallback préflight : si quarantine_mgr absent ET préflight rate,
  copie du PDF source dans out_dir/_preflight_failed/ avec chmod 0o700
  (le document n'est jamais perdu silencieusement)
- S5 guard double raster : "pdf_raster" not in outputs avant fallback
- Retrait import DocLogger (mort, jamais branché)

## quarantine.py

- _sanitize_doc_name() — anti path-traversal sur le nom de doc
- _escape_markdown_table_cell() — anti injection markdown dans INDEX.md
- _secure_quarantine_dir() — mkdir + chmod(0o700) systématique
- _append_errors_log() durci :
  os.open(O_CREAT|O_APPEND|O_WRONLY|O_NOFOLLOW, 0o600)
  + fcntl.flock(LOCK_EX) + os.fchmod
- Retrait DocLogger (code mort identifié en review)
- Retrait REASON_CODES (jamais utilisé)

## Limites connues

- QuarantineManager pas encore wired dans GUI/server.py — les callers
  actuels marchent en fallback (quarantine_mgr=None)
- finalize() + ProcessPoolExecutor : entries worker-local ne mergent pas
  automatiquement (à documenter)

## Validation

- 73 tests unit existants : OK (non-régression)
- 1 test Q-1 happy path : passe (dégelé dans commit suivant)
- Codex gpt-5.5 5 rounds review : READY FOR MERGE

Co-Authored-By: Qwen Code <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 10:44:52 +02:00

296 lines
13 KiB
Python

"""Quarantaine différentielle pour Pseudonymisation v11.0.
Un document n'est livré "anonymisé" que si toutes les étapes critiques ont
réussi. Sinon, quarantaine différentielle :
- partial : texte OK sort, PDF en quarantaine si rédaction rate
- full : document entier en quarantaine si pré-flight ou rescan critique
Ce module est totalement standalone : il n'importe rien du core ni d'autre
module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement
(pas de logging global, pas de print).
"""
from __future__ import annotations
import fcntl
import json
import os
import re
import traceback
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Literal, Optional
# === Constantes ===
SEUIL_TEXTE_MINI = 100 # B-3 pré-flight : sous ce seuil = OCR raté/doc vide
SEUIL_RESCAN_RESIDUEL = 0 # Tolérance zéro pour PII résiduelles après rescan
QUARANTINE_DIR_NAME = "quarantaine"
@dataclass
class QuarantineEntry:
"""Une entrée dans le registre de quarantaine d'un batch."""
doc_name: str # nom de base sans extension
reason: str # code court normalisé
detail: str # message libre
timestamp: str # ISO 8601 avec timezone
severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier
flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées
stacktrace: Optional[str] = None # stacktrace formatée de l'exception
extracted_chars: int = 0 # nb caractères extraits (utile pour preflight)
# Actions recommandées par code raison (utilisées dans INDEX.md)
RECOMMENDED_ACTIONS: dict[str, str] = {
"preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé",
"extraction_total_failure": "Inspecter le PDF source (corrompu? chiffré?)",
"rescan_residual_pii": "Inspection manuelle, fix regex ou whitelist",
"pdf_redaction_failed": "Voir le .pseudonymise.txt, ré-essayer manuellement",
"pdf_vector_fallback_to_raster": "PDF en qualité raster (lisible mais moins précis)",
}
_DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt"
def _sanitize_doc_name(doc_name: str) -> str:
"""Nettoie le nom de document pour éviter path traversal et injection markdown."""
# Ne garder que le nom de base (sans composantes de chemin)
safe = Path(doc_name).name
# Remplacer les caractères dangereux par des underscores
safe = re.sub(r'[<>:"|?*\x00-\x1f]', '_', safe)
return safe
def _escape_markdown_table_cell(text: str) -> str:
"""Échappe les caractères spéciaux pour les cellules de tableau markdown."""
return text.replace("|", "\\|").replace("<", "&lt;").replace(">", "&gt;")
class QuarantineManager:
"""Gestion centralisée de la quarantaine pour un batch entier.
Une instance par batch. Maintient le registre des entrées, écrit les
fichiers .reason.txt par doc, append errors.log, et génère INDEX.md
à la fin du batch.
"""
def __init__(self, output_dir: Path, app_version: str = "0.11.0",
commit_sha: str = "", profile_name: str = "standard_local") -> None:
self.output_dir: Path = Path(output_dir)
self.quarantine_dir: Path = self.output_dir / QUARANTINE_DIR_NAME
self.app_version: str = app_version
self.commit_sha: str = commit_sha
self.profile_name: str = profile_name
self.entries: list[QuarantineEntry] = []
self._errors_log_path: Path = self.output_dir / "errors.log"
self._quarantine_dir_secured: bool = False
def _secure_quarantine_dir(self) -> None:
"""Crée le répertoire de quarantaine avec permissions restrictives (0o700)."""
if self._quarantine_dir_secured:
return
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
# Restreindre à l'utilisateur seul (RGPD/HDS — PII médicales)
try:
os.chmod(str(self.quarantine_dir), 0o700)
except OSError:
pass # certains FS ne supportent pas chmod
self._quarantine_dir_secured = True
def flag(self, doc_name: str, reason: str, detail: str,
severity: Literal["partial", "full"], *,
exc: Optional[BaseException] = None,
extracted_chars: int = 0,
flags: Optional[list[str]] = None) -> QuarantineEntry:
"""Crée une entrée, écrit le .reason.txt, append errors.log.
Si `exc` est fourni, on capture sa stacktrace via
``traceback.format_exception`` — fonctionne même hors d'un bloc ``except``.
Sinon la stacktrace reste None.
"""
self._secure_quarantine_dir()
# Sanitiser le nom de document (anti path-traversal)
safe_doc_name = _sanitize_doc_name(doc_name)
# Capturer la stacktrace de l'exception passée (pas l'exception courante)
stacktrace: Optional[str] = None
if exc is not None:
stacktrace = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
entry = QuarantineEntry(
doc_name=safe_doc_name,
reason=reason,
detail=detail,
timestamp=datetime.now().astimezone().isoformat(),
severity=severity,
flags=flags if flags is not None else [reason],
stacktrace=stacktrace,
extracted_chars=extracted_chars,
)
self.entries.append(entry)
self._write_reason_txt(entry)
self._append_errors_log(entry)
return entry
def has_full_quarantine(self, doc_name: str) -> bool:
"""True si un (ou plusieurs) flag `full` existe pour ce doc."""
safe = _sanitize_doc_name(doc_name)
return any(e.doc_name == safe and e.severity == "full" for e in self.entries)
def has_any_flag(self, doc_name: str) -> bool:
"""True si au moins un flag (partial ou full) existe pour ce doc."""
safe = _sanitize_doc_name(doc_name)
return any(e.doc_name == safe for e in self.entries)
def finalize(self, total_docs_processed: Optional[int] = None) -> None:
"""Écrit quarantaine/INDEX.md à la fin du batch.
`total_docs_processed` est optionnel : si fourni, le taux de mise en
quarantaine est calculé et affiché ; sinon la ligne `Taux` est omise.
"""
if not self.entries:
return
self._secure_quarantine_dir()
index_path = self.quarantine_dir / "INDEX.md"
content = self._build_index_md(total_docs_processed=total_docs_processed)
index_path.write_text(content, encoding="utf-8")
def _write_reason_txt(self, entry: QuarantineEntry) -> None:
"""Écrit quarantaine/<docname>.reason.txt selon format §6.1 du consolidé."""
self._secure_quarantine_dir()
path = self.quarantine_dir / f"{entry.doc_name}.reason.txt"
severity_label = (
"le document entier a été placé en quarantaine"
if entry.severity == "full"
else "le PDF de sortie n a pas pu être généré, le texte anonymisé est disponible"
)
commit_short = self.commit_sha[:7] if self.commit_sha else "unknown"
lines: list[str] = [
f"Document : {entry.doc_name}",
f"Sévérité : {entry.severity} ({severity_label})",
f"Raison : {entry.reason}",
f"Détail : {entry.detail}",
f"Horodatage : {entry.timestamp}",
f"Version code : {self.app_version} (commit {commit_short})",
f"Profil appliqué: {self.profile_name}",
f"Caractères extraits : {entry.extracted_chars}",
f"Flags : {', '.join(entry.flags)}",
"",
]
if entry.stacktrace:
lines.append("--- stack trace ---")
lines.append(entry.stacktrace)
path.write_text("\n".join(lines), encoding="utf-8")
def _append_errors_log(self, entry: QuarantineEntry) -> None:
"""Append une ligne JSON dans errors.log (format JSON-lines).
M1 : utilise os.open(O_NOFOLLOW) pour éviter TOCTOU symlink.
M4 : utilise fcntl.flock(LOCK_EX) pour serialization entre workers
ProcessPoolExecutor.
"""
self.output_dir.mkdir(parents=True, exist_ok=True)
category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason
record: dict[str, object] = {
"ts": entry.timestamp,
"doc": entry.doc_name,
"level": "ERROR" if entry.severity == "full" else "WARNING",
"category": category,
"msg": entry.detail,
"severity": entry.severity,
}
line = json.dumps(record, ensure_ascii=False) + "\n"
# M1: O_NOFOLLOW — refuse les symlinks atomiquement (pas de TOCTOU)
# M4: O_CREAT|O_APPEND|O_WRONLY + permissions 0o600 dès la création
fd = os.open(
str(self._errors_log_path),
os.O_CREAT | os.O_APPEND | os.O_WRONLY | os.O_NOFOLLOW,
0o600,
)
try:
# Major2: réparer permissions si fichier existait avec mode trop permissif
try:
os.fchmod(fd, 0o600)
except OSError:
pass
# M4: lock exclusif pour serialiser les workers concurrents
fcntl.flock(fd, fcntl.LOCK_EX)
try:
os.write(fd, line.encode("utf-8"))
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
finally:
os.close(fd)
def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str:
"""Construit le contenu du fichier INDEX.md selon §6.2 du consolidé."""
full_entries = [e for e in self.entries if e.severity == "full"]
partial_entries = [e for e in self.entries if e.severity == "partial"]
total_flagged = len(self.entries)
batch_ts = datetime.now().astimezone().isoformat()
commit_short = self.commit_sha[:7] if self.commit_sha else "unknown"
lines: list[str] = []
lines.append(f"# Quarantaine — batch {batch_ts}")
lines.append("")
if total_docs_processed is not None:
lines.append(f"**Documents traités** : {total_docs_processed}")
else:
lines.append(f"**Documents flaggés** : {total_flagged}")
lines.append(f"**Quarantaine totale** : {len(full_entries)} (texte non livré)")
lines.append(f"**Quarantaine partielle** : {len(partial_entries)} (texte OK, PDF en erreur)")
if total_docs_processed is not None and total_docs_processed > 0:
taux = (total_flagged / total_docs_processed) * 100.0
lines.append(f"**Taux** : {taux:.1f}%")
lines.append("")
# === Quarantaine totale ===
lines.append("## Quarantaine totale (full)")
lines.append("")
if full_entries:
lines.append("| Document | Raison | Caractères extraits | Action recommandée |")
lines.append("|---|---|---|---|")
for e in full_entries:
action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION)
lines.append(
f"| {_escape_markdown_table_cell(e.doc_name)} "
f"| {_escape_markdown_table_cell(e.reason)} "
f"| {e.extracted_chars} "
f"| {_escape_markdown_table_cell(action)} |"
)
else:
lines.append("_Aucun document en quarantaine totale._")
lines.append("")
# === Quarantaine partielle ===
lines.append("## Quarantaine partielle (partial)")
lines.append("")
if partial_entries:
lines.append("| Document | Raison | Texte livré dans | Flags |")
lines.append("|---|---|---|---|")
for e in partial_entries:
txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt"
flags_str = _escape_markdown_table_cell(", ".join(e.flags))
lines.append(
f"| {_escape_markdown_table_cell(e.doc_name)} "
f"| {_escape_markdown_table_cell(e.reason)} "
f"| {txt_path} "
f"| {flags_str} |"
)
else:
lines.append("_Aucun document en quarantaine partielle._")
lines.append("")
# === Contexte batch ===
lines.append("## Contexte batch")
lines.append("")
lines.append(f"- Version : {self.app_version} (commit {commit_short})")
lines.append(f"- Profil appliqué : {self.profile_name}")
lines.append(f"- Horodatage : {batch_ts}")
lines.append("")
return "\n".join(lines)