feat(q1): F+sécurité — rescan inconditionnel + hardening quarantine

Suite des étapes Q-1 (F = rescan résiduel) + apport sécurité par Qwen
review Codex gpt-5.5 5 rounds (verdict READY FOR MERGE).

## anonymizer_core_refactored_onnx.py

- M5 Rescan résiduel inconditionnel : NIR/EMAIL/IBAN/TEL recherchés après
  TOUT nettoyage. Fail-closed — aucun output livré si > seuil
  (SEUIL_RESCAN_RESIDUEL = 0)
- M3 Return structuré : process_pdf retourne maintenant
  {"status": "quarantined", "reason": ..., "text": "", "audit": ""} au lieu
  de {} sur quarantaine — callers compatibles avec outputs["text"]/"audit"
- C3+M2 fallback préflight : si quarantine_mgr absent ET préflight rate,
  copie du PDF source dans out_dir/_preflight_failed/ avec chmod 0o700
  (le document n'est jamais perdu silencieusement)
- S5 guard double raster : "pdf_raster" not in outputs avant fallback
- Retrait import DocLogger (mort, jamais branché)

## quarantine.py

- _sanitize_doc_name() — anti path-traversal sur le nom de doc
- _escape_markdown_table_cell() — anti injection markdown dans INDEX.md
- _secure_quarantine_dir() — mkdir + chmod(0o700) systématique
- _append_errors_log() durci :
  os.open(O_CREAT|O_APPEND|O_WRONLY|O_NOFOLLOW, 0o600)
  + fcntl.flock(LOCK_EX) + os.fchmod
- Retrait DocLogger (code mort identifié en review)
- Retrait REASON_CODES (jamais utilisé)

## Limites connues

- QuarantineManager pas encore wired dans GUI/server.py — les callers
  actuels marchent en fallback (quarantine_mgr=None)
- finalize() + ProcessPoolExecutor : entries worker-local ne mergent pas
  automatiquement (à documenter)

## Validation

- 73 tests unit existants : OK (non-régression)
- 1 test Q-1 happy path : passe (dégelé dans commit suivant)
- Codex gpt-5.5 5 rounds review : READY FOR MERGE

Co-Authored-By: Qwen Code <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-02 10:44:52 +02:00
parent 217fc75983
commit 6df87defd1
2 changed files with 166 additions and 61 deletions

View File

@@ -95,13 +95,11 @@ except Exception:
try:
from quarantine import (
QuarantineManager,
DocLogger,
SEUIL_TEXTE_MINI,
SEUIL_RESCAN_RESIDUEL,
)
except ImportError:
QuarantineManager = None # type: ignore
DocLogger = None # type: ignore
SEUIL_TEXTE_MINI = 100
SEUIL_RESCAN_RESIDUEL = 0
@@ -4296,12 +4294,29 @@ def process_pdf(
extracted_chars=extracted_chars,
)
try:
quarantine_mgr.quarantine_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(pdf_path, quarantine_mgr.quarantine_dir / pdf_path.name)
except Exception as copy_err:
log.warning("Could not copy original PDF to quarantine for %s: %s",
pdf_path.name, copy_err)
return {}
else:
# Critical 3 + M2 : quarantine_mgr absent — fallback avec permissions
fallback_dir = out_dir / "_preflight_failed"
fallback_dir.mkdir(parents=True, exist_ok=True)
try:
os.chmod(str(fallback_dir), 0o700)
except OSError:
pass
try:
shutil.copy(pdf_path, fallback_dir / pdf_path.name)
log.error("Preflight failed AND quarantine_mgr unavailable. "
"Original PDF copied to %s for manual review (mode 0700).", fallback_dir)
except Exception as fb_err:
log.critical("Preflight failed, quarantine unavailable, AND fallback copy failed "
"for %s: %s — DOCUMENT LOST", pdf_path.name, fb_err)
# M3 : retourner un dict compatible — callers accèdent à outputs["text"] / ["audit"]
return {"status": "quarantined", "reason": "preflight_text_too_short",
"extracted_chars": extracted_chars, "seuil": SEUIL_TEXTE_MINI,
"text": "", "audit": ""}
# 1) Regex rules + NER-first cross-validation
# Passer les NER managers pour que anonymise_document_regex exécute le NER
@@ -4677,6 +4692,48 @@ def process_pdf(
before - len(anon.audit),
", ".join(sorted(removed_tokens)[:10]))
# M5 : Check résiduel post-TOUT-nettoyage (après rescan, CP orphan, tel fragmentés,
# initiales, whitelist). Si PII résiduelles > seuil, on NE LIVRE PAS — quarantaine full.
# Inconditionnel : toujours exécuté même si quarantine_mgr absent (Codex review).
if SEUIL_RESCAN_RESIDUEL is not None:
_residual_pii_patterns = [
(re.compile(RE_NIR.pattern if hasattr(RE_NIR, 'pattern') else r"\b\d{15}\b"), "NIR"),
(re.compile(r"\b[\w.%+-]+@[\w.-]+\.\w{2,}\b"), "EMAIL"),
(re.compile(r"\b(?:FR\d{2})?\s?\d{4}\s?\d{4}\s?\d{4}\s?\d{4}\s?\d{2,3}\b"), "IBAN"),
(re.compile(r"\b(?:\+33|0)[\s.\-]?\d[\s.\-]?(?:\d[\s.\-]?){8}\b"), "TEL"),
]
residual_count = 0
for pat, _label in _residual_pii_patterns:
residual_count += len(pat.findall(final_text))
if residual_count > SEUIL_RESCAN_RESIDUEL:
if quarantine_mgr is not None:
quarantine_mgr.flag(
doc_name=pdf_path.stem,
reason="rescan_residual_pii",
detail=f"{residual_count} residual PII after all cleaning passes (seuil={SEUIL_RESCAN_RESIDUEL})",
severity="full",
)
try:
shutil.copy(pdf_path, quarantine_mgr.quarantine_dir / pdf_path.name)
except Exception as copy_err:
log.warning("Could not copy PDF to quarantine for %s: %s", pdf_path.name, copy_err)
else:
# Sans quarantine_mgr : fallback sécurisé
fallback_dir = out_dir / "_rescan_failed"
fallback_dir.mkdir(parents=True, exist_ok=True)
try:
os.chmod(str(fallback_dir), 0o700)
except OSError:
pass
try:
shutil.copy(pdf_path, fallback_dir / pdf_path.name)
except Exception as copy_err:
log.warning("Could not copy PDF for rescan failure %s: %s", pdf_path.name, copy_err)
log.critical("Rescan found %d residual PII for %s — NO OUTPUT delivered",
residual_count, pdf_path.name)
return {"status": "quarantined", "reason": "rescan_residual_pii",
"residual_count": residual_count, "text": "", "audit": ""}
# Sauvegardes
base = pdf_path.stem
txt_path = out_dir / f"{base}.pseudonymise.txt"
@@ -4730,27 +4787,30 @@ def process_pdf(
exc=e,
)
else:
# S1 : passer raster_err (pas e) pour que la stacktrace corresponde
quarantine_mgr.flag(
doc_name=pdf_path.stem,
reason="pdf_redaction_failed",
detail=f"vector failed ({e}); raster also failed ({raster_err})",
severity="partial",
exc=e,
exc=raster_err,
)
# Décision A finalisée : copier le texte en quarantaine pour autoportance
# (l'opérateur peut tout consulter depuis un seul dossier)
# S8 : pas de mkdir() ici — flag() le fait déjà via _secure_quarantine_dir()
try:
quarantine_mgr.quarantine_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(txt_path, quarantine_mgr.quarantine_dir / txt_path.name)
except Exception as copy_err:
log.warning("Could not copy text to quarantine for %s: %s",
pdf_path.name, copy_err)
# Note : pas de raise — texte anonymisé disponible (et copié si quarantine_mgr)
if also_make_raster_burn and fitz is not None:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map)
outputs["pdf_raster"] = str(ras_path)
# S5 : ne pas refaire le raster si le fallback vector→raster l'a déjà produit
if "pdf_raster" not in outputs:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map)
outputs["pdf_raster"] = str(ras_path)
return outputs
@@ -4842,11 +4902,15 @@ def process_document(
try:
outputs = process_pdf(pdf_path=pdf_path, out_dir=out_dir, **kwargs)
# Renommer les sorties pour refléter le nom original (pas le .tmp_convert.pdf)
# Minor2 : ne traiter que les valeurs qui sont des chemins (str),
# pas les métadonnées scalaires (status, reason, extracted_chars, etc.)
if is_temp:
original_stem = doc_path.stem
renamed = {}
for key, path_str in outputs.items():
if not isinstance(path_str, str):
renamed[key] = path_str
continue
p = Path(path_str)
if p.exists() and ".tmp_convert" in p.name:
new_name = p.name.replace(doc_path.stem + ".tmp_convert", original_stem)