Suite des étapes Q-1 (F = rescan résiduel) + apport sécurité par Qwen
review Codex gpt-5.5 5 rounds (verdict READY FOR MERGE).
## anonymizer_core_refactored_onnx.py
- M5 Rescan résiduel inconditionnel : NIR/EMAIL/IBAN/TEL recherchés après
TOUT nettoyage. Fail-closed — aucun output livré si > seuil
(SEUIL_RESCAN_RESIDUEL = 0)
- M3 Return structuré : process_pdf retourne maintenant
{"status": "quarantined", "reason": ..., "text": "", "audit": ""} au lieu
de {} sur quarantaine — callers compatibles avec outputs["text"]/"audit"
- C3+M2 fallback préflight : si quarantine_mgr absent ET préflight rate,
copie du PDF source dans out_dir/_preflight_failed/ avec chmod 0o700
(le document n'est jamais perdu silencieusement)
- S5 guard double raster : "pdf_raster" not in outputs avant fallback
- Retrait import DocLogger (mort, jamais branché)
## quarantine.py
- _sanitize_doc_name() — anti path-traversal sur le nom de doc
- _escape_markdown_table_cell() — anti injection markdown dans INDEX.md
- _secure_quarantine_dir() — mkdir + chmod(0o700) systématique
- _append_errors_log() durci :
os.open(O_CREAT|O_APPEND|O_WRONLY|O_NOFOLLOW, 0o600)
+ fcntl.flock(LOCK_EX) + os.fchmod
- Retrait DocLogger (code mort identifié en review)
- Retrait REASON_CODES (jamais utilisé)
## Limites connues
- QuarantineManager pas encore wired dans GUI/server.py — les callers
actuels marchent en fallback (quarantine_mgr=None)
- finalize() + ProcessPoolExecutor : entries worker-local ne mergent pas
automatiquement (à documenter)
## Validation
- 73 tests unit existants : OK (non-régression)
- 1 test Q-1 happy path : passe (dégelé dans commit suivant)
- Codex gpt-5.5 5 rounds review : READY FOR MERGE
Co-Authored-By: Qwen Code <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
296 lines
13 KiB
Python
296 lines
13 KiB
Python
"""Quarantaine différentielle pour Pseudonymisation v11.0.
|
|
|
|
Un document n'est livré "anonymisé" que si toutes les étapes critiques ont
|
|
réussi. Sinon, quarantaine différentielle :
|
|
- partial : texte OK sort, PDF en quarantaine si rédaction rate
|
|
- full : document entier en quarantaine si pré-flight ou rescan critique
|
|
|
|
Ce module est totalement standalone : il n'importe rien du core ni d'autre
|
|
module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement
|
|
(pas de logging global, pas de print).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import fcntl
|
|
import json
|
|
import os
|
|
import re
|
|
import traceback
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Literal, Optional
|
|
|
|
# === Constantes ===
|
|
|
|
SEUIL_TEXTE_MINI = 100 # B-3 pré-flight : sous ce seuil = OCR raté/doc vide
|
|
SEUIL_RESCAN_RESIDUEL = 0 # Tolérance zéro pour PII résiduelles après rescan
|
|
QUARANTINE_DIR_NAME = "quarantaine"
|
|
|
|
|
|
@dataclass
|
|
class QuarantineEntry:
|
|
"""Une entrée dans le registre de quarantaine d'un batch."""
|
|
doc_name: str # nom de base sans extension
|
|
reason: str # code court normalisé
|
|
detail: str # message libre
|
|
timestamp: str # ISO 8601 avec timezone
|
|
severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier
|
|
flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées
|
|
stacktrace: Optional[str] = None # stacktrace formatée de l'exception
|
|
extracted_chars: int = 0 # nb caractères extraits (utile pour preflight)
|
|
|
|
|
|
# Actions recommandées par code raison (utilisées dans INDEX.md)
|
|
RECOMMENDED_ACTIONS: dict[str, str] = {
|
|
"preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé",
|
|
"extraction_total_failure": "Inspecter le PDF source (corrompu? chiffré?)",
|
|
"rescan_residual_pii": "Inspection manuelle, fix regex ou whitelist",
|
|
"pdf_redaction_failed": "Voir le .pseudonymise.txt, ré-essayer manuellement",
|
|
"pdf_vector_fallback_to_raster": "PDF en qualité raster (lisible mais moins précis)",
|
|
}
|
|
_DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt"
|
|
|
|
|
|
def _sanitize_doc_name(doc_name: str) -> str:
|
|
"""Nettoie le nom de document pour éviter path traversal et injection markdown."""
|
|
# Ne garder que le nom de base (sans composantes de chemin)
|
|
safe = Path(doc_name).name
|
|
# Remplacer les caractères dangereux par des underscores
|
|
safe = re.sub(r'[<>:"|?*\x00-\x1f]', '_', safe)
|
|
return safe
|
|
|
|
|
|
def _escape_markdown_table_cell(text: str) -> str:
|
|
"""Échappe les caractères spéciaux pour les cellules de tableau markdown."""
|
|
return text.replace("|", "\\|").replace("<", "<").replace(">", ">")
|
|
|
|
|
|
class QuarantineManager:
|
|
"""Gestion centralisée de la quarantaine pour un batch entier.
|
|
|
|
Une instance par batch. Maintient le registre des entrées, écrit les
|
|
fichiers .reason.txt par doc, append errors.log, et génère INDEX.md
|
|
à la fin du batch.
|
|
"""
|
|
|
|
def __init__(self, output_dir: Path, app_version: str = "0.11.0",
|
|
commit_sha: str = "", profile_name: str = "standard_local") -> None:
|
|
self.output_dir: Path = Path(output_dir)
|
|
self.quarantine_dir: Path = self.output_dir / QUARANTINE_DIR_NAME
|
|
self.app_version: str = app_version
|
|
self.commit_sha: str = commit_sha
|
|
self.profile_name: str = profile_name
|
|
self.entries: list[QuarantineEntry] = []
|
|
self._errors_log_path: Path = self.output_dir / "errors.log"
|
|
self._quarantine_dir_secured: bool = False
|
|
|
|
def _secure_quarantine_dir(self) -> None:
|
|
"""Crée le répertoire de quarantaine avec permissions restrictives (0o700)."""
|
|
if self._quarantine_dir_secured:
|
|
return
|
|
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
|
|
# Restreindre à l'utilisateur seul (RGPD/HDS — PII médicales)
|
|
try:
|
|
os.chmod(str(self.quarantine_dir), 0o700)
|
|
except OSError:
|
|
pass # certains FS ne supportent pas chmod
|
|
self._quarantine_dir_secured = True
|
|
|
|
def flag(self, doc_name: str, reason: str, detail: str,
|
|
severity: Literal["partial", "full"], *,
|
|
exc: Optional[BaseException] = None,
|
|
extracted_chars: int = 0,
|
|
flags: Optional[list[str]] = None) -> QuarantineEntry:
|
|
"""Crée une entrée, écrit le .reason.txt, append errors.log.
|
|
|
|
Si `exc` est fourni, on capture sa stacktrace via
|
|
``traceback.format_exception`` — fonctionne même hors d'un bloc ``except``.
|
|
Sinon la stacktrace reste None.
|
|
"""
|
|
self._secure_quarantine_dir()
|
|
|
|
# Sanitiser le nom de document (anti path-traversal)
|
|
safe_doc_name = _sanitize_doc_name(doc_name)
|
|
|
|
# Capturer la stacktrace de l'exception passée (pas l'exception courante)
|
|
stacktrace: Optional[str] = None
|
|
if exc is not None:
|
|
stacktrace = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
|
|
|
|
entry = QuarantineEntry(
|
|
doc_name=safe_doc_name,
|
|
reason=reason,
|
|
detail=detail,
|
|
timestamp=datetime.now().astimezone().isoformat(),
|
|
severity=severity,
|
|
flags=flags if flags is not None else [reason],
|
|
stacktrace=stacktrace,
|
|
extracted_chars=extracted_chars,
|
|
)
|
|
self.entries.append(entry)
|
|
self._write_reason_txt(entry)
|
|
self._append_errors_log(entry)
|
|
return entry
|
|
|
|
def has_full_quarantine(self, doc_name: str) -> bool:
|
|
"""True si un (ou plusieurs) flag `full` existe pour ce doc."""
|
|
safe = _sanitize_doc_name(doc_name)
|
|
return any(e.doc_name == safe and e.severity == "full" for e in self.entries)
|
|
|
|
def has_any_flag(self, doc_name: str) -> bool:
|
|
"""True si au moins un flag (partial ou full) existe pour ce doc."""
|
|
safe = _sanitize_doc_name(doc_name)
|
|
return any(e.doc_name == safe for e in self.entries)
|
|
|
|
def finalize(self, total_docs_processed: Optional[int] = None) -> None:
|
|
"""Écrit quarantaine/INDEX.md à la fin du batch.
|
|
|
|
`total_docs_processed` est optionnel : si fourni, le taux de mise en
|
|
quarantaine est calculé et affiché ; sinon la ligne `Taux` est omise.
|
|
"""
|
|
if not self.entries:
|
|
return
|
|
self._secure_quarantine_dir()
|
|
index_path = self.quarantine_dir / "INDEX.md"
|
|
content = self._build_index_md(total_docs_processed=total_docs_processed)
|
|
index_path.write_text(content, encoding="utf-8")
|
|
|
|
def _write_reason_txt(self, entry: QuarantineEntry) -> None:
|
|
"""Écrit quarantaine/<docname>.reason.txt selon format §6.1 du consolidé."""
|
|
self._secure_quarantine_dir()
|
|
path = self.quarantine_dir / f"{entry.doc_name}.reason.txt"
|
|
severity_label = (
|
|
"le document entier a été placé en quarantaine"
|
|
if entry.severity == "full"
|
|
else "le PDF de sortie n a pas pu être généré, le texte anonymisé est disponible"
|
|
)
|
|
commit_short = self.commit_sha[:7] if self.commit_sha else "unknown"
|
|
lines: list[str] = [
|
|
f"Document : {entry.doc_name}",
|
|
f"Sévérité : {entry.severity} ({severity_label})",
|
|
f"Raison : {entry.reason}",
|
|
f"Détail : {entry.detail}",
|
|
f"Horodatage : {entry.timestamp}",
|
|
f"Version code : {self.app_version} (commit {commit_short})",
|
|
f"Profil appliqué: {self.profile_name}",
|
|
f"Caractères extraits : {entry.extracted_chars}",
|
|
f"Flags : {', '.join(entry.flags)}",
|
|
"",
|
|
]
|
|
if entry.stacktrace:
|
|
lines.append("--- stack trace ---")
|
|
lines.append(entry.stacktrace)
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
def _append_errors_log(self, entry: QuarantineEntry) -> None:
|
|
"""Append une ligne JSON dans errors.log (format JSON-lines).
|
|
|
|
M1 : utilise os.open(O_NOFOLLOW) pour éviter TOCTOU symlink.
|
|
M4 : utilise fcntl.flock(LOCK_EX) pour serialization entre workers
|
|
ProcessPoolExecutor.
|
|
"""
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason
|
|
record: dict[str, object] = {
|
|
"ts": entry.timestamp,
|
|
"doc": entry.doc_name,
|
|
"level": "ERROR" if entry.severity == "full" else "WARNING",
|
|
"category": category,
|
|
"msg": entry.detail,
|
|
"severity": entry.severity,
|
|
}
|
|
line = json.dumps(record, ensure_ascii=False) + "\n"
|
|
|
|
# M1: O_NOFOLLOW — refuse les symlinks atomiquement (pas de TOCTOU)
|
|
# M4: O_CREAT|O_APPEND|O_WRONLY + permissions 0o600 dès la création
|
|
fd = os.open(
|
|
str(self._errors_log_path),
|
|
os.O_CREAT | os.O_APPEND | os.O_WRONLY | os.O_NOFOLLOW,
|
|
0o600,
|
|
)
|
|
try:
|
|
# Major2: réparer permissions si fichier existait avec mode trop permissif
|
|
try:
|
|
os.fchmod(fd, 0o600)
|
|
except OSError:
|
|
pass
|
|
# M4: lock exclusif pour serialiser les workers concurrents
|
|
fcntl.flock(fd, fcntl.LOCK_EX)
|
|
try:
|
|
os.write(fd, line.encode("utf-8"))
|
|
finally:
|
|
fcntl.flock(fd, fcntl.LOCK_UN)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str:
|
|
"""Construit le contenu du fichier INDEX.md selon §6.2 du consolidé."""
|
|
full_entries = [e for e in self.entries if e.severity == "full"]
|
|
partial_entries = [e for e in self.entries if e.severity == "partial"]
|
|
total_flagged = len(self.entries)
|
|
batch_ts = datetime.now().astimezone().isoformat()
|
|
commit_short = self.commit_sha[:7] if self.commit_sha else "unknown"
|
|
|
|
lines: list[str] = []
|
|
lines.append(f"# Quarantaine — batch {batch_ts}")
|
|
lines.append("")
|
|
if total_docs_processed is not None:
|
|
lines.append(f"**Documents traités** : {total_docs_processed}")
|
|
else:
|
|
lines.append(f"**Documents flaggés** : {total_flagged}")
|
|
lines.append(f"**Quarantaine totale** : {len(full_entries)} (texte non livré)")
|
|
lines.append(f"**Quarantaine partielle** : {len(partial_entries)} (texte OK, PDF en erreur)")
|
|
if total_docs_processed is not None and total_docs_processed > 0:
|
|
taux = (total_flagged / total_docs_processed) * 100.0
|
|
lines.append(f"**Taux** : {taux:.1f}%")
|
|
lines.append("")
|
|
|
|
# === Quarantaine totale ===
|
|
lines.append("## Quarantaine totale (full)")
|
|
lines.append("")
|
|
if full_entries:
|
|
lines.append("| Document | Raison | Caractères extraits | Action recommandée |")
|
|
lines.append("|---|---|---|---|")
|
|
for e in full_entries:
|
|
action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION)
|
|
lines.append(
|
|
f"| {_escape_markdown_table_cell(e.doc_name)} "
|
|
f"| {_escape_markdown_table_cell(e.reason)} "
|
|
f"| {e.extracted_chars} "
|
|
f"| {_escape_markdown_table_cell(action)} |"
|
|
)
|
|
else:
|
|
lines.append("_Aucun document en quarantaine totale._")
|
|
lines.append("")
|
|
|
|
# === Quarantaine partielle ===
|
|
lines.append("## Quarantaine partielle (partial)")
|
|
lines.append("")
|
|
if partial_entries:
|
|
lines.append("| Document | Raison | Texte livré dans | Flags |")
|
|
lines.append("|---|---|---|---|")
|
|
for e in partial_entries:
|
|
txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt"
|
|
flags_str = _escape_markdown_table_cell(", ".join(e.flags))
|
|
lines.append(
|
|
f"| {_escape_markdown_table_cell(e.doc_name)} "
|
|
f"| {_escape_markdown_table_cell(e.reason)} "
|
|
f"| {txt_path} "
|
|
f"| {flags_str} |"
|
|
)
|
|
else:
|
|
lines.append("_Aucun document en quarantaine partielle._")
|
|
lines.append("")
|
|
|
|
# === Contexte batch ===
|
|
lines.append("## Contexte batch")
|
|
lines.append("")
|
|
lines.append(f"- Version : {self.app_version} (commit {commit_short})")
|
|
lines.append(f"- Profil appliqué : {self.profile_name}")
|
|
lines.append(f"- Horodatage : {batch_ts}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|