feat(q1): F+sécurité — rescan inconditionnel + hardening quarantine

Suite des étapes Q-1 (F = rescan résiduel) + apport sécurité par Qwen
review Codex gpt-5.5 5 rounds (verdict READY FOR MERGE).

## anonymizer_core_refactored_onnx.py

- M5 Rescan résiduel inconditionnel : NIR/EMAIL/IBAN/TEL recherchés après
  TOUT nettoyage. Fail-closed — aucun output livré si > seuil
  (SEUIL_RESCAN_RESIDUEL = 0)
- M3 Return structuré : process_pdf retourne maintenant
  {"status": "quarantined", "reason": ..., "text": "", "audit": ""} au lieu
  de {} sur quarantaine — callers compatibles avec outputs["text"]/"audit"
- C3+M2 fallback préflight : si quarantine_mgr absent ET préflight rate,
  copie du PDF source dans out_dir/_preflight_failed/ avec chmod 0o700
  (le document n'est jamais perdu silencieusement)
- S5 guard double raster : "pdf_raster" not in outputs avant fallback
- Retrait import DocLogger (mort, jamais branché)

## quarantine.py

- _sanitize_doc_name() — anti path-traversal sur le nom de doc
- _escape_markdown_table_cell() — anti injection markdown dans INDEX.md
- _secure_quarantine_dir() — mkdir + chmod(0o700) systématique
- _append_errors_log() durci :
  os.open(O_CREAT|O_APPEND|O_WRONLY|O_NOFOLLOW, 0o600)
  + fcntl.flock(LOCK_EX) + os.fchmod
- Retrait DocLogger (code mort identifié en review)
- Retrait REASON_CODES (jamais utilisé)

## Limites connues

- QuarantineManager pas encore wired dans GUI/server.py — les callers
  actuels marchent en fallback (quarantine_mgr=None)
- finalize() + ProcessPoolExecutor : entries worker-local ne mergent pas
  automatiquement (à documenter)

## Validation

- 73 tests unit existants : OK (non-régression)
- 1 test Q-1 happy path : passe (dégelé dans commit suivant)
- Codex gpt-5.5 5 rounds review : READY FOR MERGE

Co-Authored-By: Qwen Code <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-02 10:44:52 +02:00
parent 5216a1518e
commit cf78bea910
2 changed files with 166 additions and 61 deletions

View File

@@ -11,7 +11,10 @@ module local. Il n'écrit que dans les fichiers qu'on lui demande explicitement
"""
from __future__ import annotations
import fcntl
import json
import os
import re
import traceback
from dataclasses import dataclass, field
from datetime import datetime
@@ -29,26 +32,15 @@ QUARANTINE_DIR_NAME = "quarantaine"
class QuarantineEntry:
"""Une entrée dans le registre de quarantaine d'un batch."""
doc_name: str # nom de base sans extension
reason: str # code court normalisé (cf REASON_CODES)
reason: str # code court normalisé
detail: str # message libre
timestamp: str # ISO 8601 avec timezone
severity: Literal["partial", "full"] # partial = PDF seul, full = doc entier
flags: list[str] = field(default_factory=list) # peut contenir plusieurs raisons cumulées
stacktrace: Optional[str] = None # tb.format_exc() si exception
stacktrace: Optional[str] = None # stacktrace formatée de l'exception
extracted_chars: int = 0 # nb caractères extraits (utile pour preflight)
# Codes raison normalisés (voir §5 du consolidé v2)
REASON_CODES: dict[str, Literal["partial", "full"]] = {
"preflight_text_too_short": "full",
"extraction_total_failure": "full",
"rescan_residual_pii": "full",
"pdf_redaction_failed": "partial",
"pdf_vector_fallback_to_raster": "partial",
"regex_user_invalid": "partial",
}
# Actions recommandées par code raison (utilisées dans INDEX.md)
RECOMMENDED_ACTIONS: dict[str, str] = {
"preflight_text_too_short": "Vérifier OCR, ré-essayer avec docTR forcé",
@@ -60,6 +52,20 @@ RECOMMENDED_ACTIONS: dict[str, str] = {
_DEFAULT_RECOMMENDED_ACTION = "Voir le .reason.txt"
def _sanitize_doc_name(doc_name: str) -> str:
"""Nettoie le nom de document pour éviter path traversal et injection markdown."""
# Ne garder que le nom de base (sans composantes de chemin)
safe = Path(doc_name).name
# Remplacer les caractères dangereux par des underscores
safe = re.sub(r'[<>:"|?*\x00-\x1f]', '_', safe)
return safe
def _escape_markdown_table_cell(text: str) -> str:
"""Échappe les caractères spéciaux pour les cellules de tableau markdown."""
return text.replace("|", "\\|").replace("<", "&lt;").replace(">", "&gt;")
class QuarantineManager:
"""Gestion centralisée de la quarantaine pour un batch entier.
@@ -77,6 +83,19 @@ class QuarantineManager:
self.profile_name: str = profile_name
self.entries: list[QuarantineEntry] = []
self._errors_log_path: Path = self.output_dir / "errors.log"
self._quarantine_dir_secured: bool = False
def _secure_quarantine_dir(self) -> None:
"""Crée le répertoire de quarantaine avec permissions restrictives (0o700)."""
if self._quarantine_dir_secured:
return
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
# Restreindre à l'utilisateur seul (RGPD/HDS — PII médicales)
try:
os.chmod(str(self.quarantine_dir), 0o700)
except OSError:
pass # certains FS ne supportent pas chmod
self._quarantine_dir_secured = True
def flag(self, doc_name: str, reason: str, detail: str,
severity: Literal["partial", "full"], *,
@@ -85,19 +104,28 @@ class QuarantineManager:
flags: Optional[list[str]] = None) -> QuarantineEntry:
"""Crée une entrée, écrit le .reason.txt, append errors.log.
Si `exc` est fourni, on capture la stacktrace courante via
``traceback.format_exc()`` — ce qui suppose qu'on est appelé depuis
un bloc ``except``. Sinon la stacktrace reste None.
Si `exc` est fourni, on capture sa stacktrace via
``traceback.format_exception`` — fonctionne même hors d'un bloc ``except``.
Sinon la stacktrace reste None.
"""
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
self._secure_quarantine_dir()
# Sanitiser le nom de document (anti path-traversal)
safe_doc_name = _sanitize_doc_name(doc_name)
# Capturer la stacktrace de l'exception passée (pas l'exception courante)
stacktrace: Optional[str] = None
if exc is not None:
stacktrace = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
entry = QuarantineEntry(
doc_name=doc_name,
doc_name=safe_doc_name,
reason=reason,
detail=detail,
timestamp=datetime.now().astimezone().isoformat(),
severity=severity,
flags=flags if flags is not None else [reason],
stacktrace=traceback.format_exc() if exc is not None else None,
stacktrace=stacktrace,
extracted_chars=extracted_chars,
)
self.entries.append(entry)
@@ -107,11 +135,13 @@ class QuarantineManager:
def has_full_quarantine(self, doc_name: str) -> bool:
"""True si un (ou plusieurs) flag `full` existe pour ce doc."""
return any(e.doc_name == doc_name and e.severity == "full" for e in self.entries)
safe = _sanitize_doc_name(doc_name)
return any(e.doc_name == safe and e.severity == "full" for e in self.entries)
def has_any_flag(self, doc_name: str) -> bool:
"""True si au moins un flag (partial ou full) existe pour ce doc."""
return any(e.doc_name == doc_name for e in self.entries)
safe = _sanitize_doc_name(doc_name)
return any(e.doc_name == safe for e in self.entries)
def finalize(self, total_docs_processed: Optional[int] = None) -> None:
"""Écrit quarantaine/INDEX.md à la fin du batch.
@@ -121,14 +151,14 @@ class QuarantineManager:
"""
if not self.entries:
return
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
self._secure_quarantine_dir()
index_path = self.quarantine_dir / "INDEX.md"
content = self._build_index_md(total_docs_processed=total_docs_processed)
index_path.write_text(content, encoding="utf-8")
def _write_reason_txt(self, entry: QuarantineEntry) -> None:
"""Écrit quarantaine/<docname>.reason.txt selon format §6.1 du consolidé."""
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
self._secure_quarantine_dir()
path = self.quarantine_dir / f"{entry.doc_name}.reason.txt"
severity_label = (
"le document entier a été placé en quarantaine"
@@ -154,8 +184,14 @@ class QuarantineManager:
path.write_text("\n".join(lines), encoding="utf-8")
def _append_errors_log(self, entry: QuarantineEntry) -> None:
"""Append une ligne JSON dans errors.log (format JSON-lines)."""
"""Append une ligne JSON dans errors.log (format JSON-lines).
M1 : utilise os.open(O_NOFOLLOW) pour éviter TOCTOU symlink.
M4 : utilise fcntl.flock(LOCK_EX) pour serialization entre workers
ProcessPoolExecutor.
"""
self.output_dir.mkdir(parents=True, exist_ok=True)
category = entry.reason.split("_")[0] if "_" in entry.reason else entry.reason
record: dict[str, object] = {
"ts": entry.timestamp,
@@ -165,8 +201,29 @@ class QuarantineManager:
"msg": entry.detail,
"severity": entry.severity,
}
with open(self._errors_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
line = json.dumps(record, ensure_ascii=False) + "\n"
# M1: O_NOFOLLOW — refuse les symlinks atomiquement (pas de TOCTOU)
# M4: O_CREAT|O_APPEND|O_WRONLY + permissions 0o600 dès la création
fd = os.open(
str(self._errors_log_path),
os.O_CREAT | os.O_APPEND | os.O_WRONLY | os.O_NOFOLLOW,
0o600,
)
try:
# Major2: réparer permissions si fichier existait avec mode trop permissif
try:
os.fchmod(fd, 0o600)
except OSError:
pass
# M4: lock exclusif pour serialiser les workers concurrents
fcntl.flock(fd, fcntl.LOCK_EX)
try:
os.write(fd, line.encode("utf-8"))
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
finally:
os.close(fd)
def _build_index_md(self, total_docs_processed: Optional[int] = None) -> str:
"""Construit le contenu du fichier INDEX.md selon §6.2 du consolidé."""
@@ -199,7 +256,10 @@ class QuarantineManager:
for e in full_entries:
action = RECOMMENDED_ACTIONS.get(e.reason, _DEFAULT_RECOMMENDED_ACTION)
lines.append(
f"| {e.doc_name} | {e.reason} | {e.extracted_chars} | {action} |"
f"| {_escape_markdown_table_cell(e.doc_name)} "
f"| {_escape_markdown_table_cell(e.reason)} "
f"| {e.extracted_chars} "
f"| {_escape_markdown_table_cell(action)} |"
)
else:
lines.append("_Aucun document en quarantaine totale._")
@@ -213,9 +273,12 @@ class QuarantineManager:
lines.append("|---|---|---|---|")
for e in partial_entries:
txt_path = self.output_dir / f"{e.doc_name}.pseudonymise.txt"
flags_str = ", ".join(e.flags)
flags_str = _escape_markdown_table_cell(", ".join(e.flags))
lines.append(
f"| {e.doc_name} | {e.reason} | {txt_path} | {flags_str} |"
f"| {_escape_markdown_table_cell(e.doc_name)} "
f"| {_escape_markdown_table_cell(e.reason)} "
f"| {txt_path} "
f"| {flags_str} |"
)
else:
lines.append("_Aucun document en quarantaine partielle._")
@@ -230,25 +293,3 @@ class QuarantineManager:
lines.append("")
return "\n".join(lines)
class DocLogger:
"""Logger fichier par document. Append-only, pas de buffer."""
def __init__(self, log_path: Path) -> None:
self.log_path: Path = Path(log_path)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
def _write(self, level: str, msg: str) -> None:
ts = datetime.now().astimezone().isoformat()
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(f"{ts} [{level}] {msg}\n")
def info(self, msg: str) -> None:
self._write("INFO", msg)
def warning(self, msg: str) -> None:
self._write("WARNING", msg)
def error(self, msg: str) -> None:
self._write("ERROR", msg)