Files
anonymisation/tests/unit/test_q1_quarantine.py
2026-06-26 10:21:27 +02:00

465 lines
18 KiB
Python

"""
Tests pour Q-1 — Quarantaine différentielle.
Couvre : pré-flight B-3, quarantaine D2/D3, rescan résiduel M5,
INDEX.md, errors.log.
Les tests B-1 (metadata XMP) et B-2 (per-doc log) restent xfail car non implémentés.
"""
from __future__ import annotations
import json
import os
import textwrap
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
# === Fixtures ====================================================
@pytest.fixture
def tmp_output_dir(tmp_path: Path) -> Path:
out = tmp_path / "output"
out.mkdir()
return out
@pytest.fixture
def fake_pdf_path(tmp_path: Path) -> Path:
p = tmp_path / "doc_ok.pdf"
p.write_bytes(b"%PDF-1.4\n%fake\n")
return p
# === Tests B-3 : pré-flight texte vide ===========================
class TestPreflight:
"""B-3 — Pré-flight : texte < SEUIL_TEXTE_MINI → quarantaine full."""
def test_preflight_empty_text_goes_to_quarantine(self, tmp_path: Path) -> None:
"""Un document dont l'extraction retourne < 100 chars va en quarantaine
sans produire de texte/PDF de sortie."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
pdf = out / "doc_empty.pdf"
pdf.write_bytes(b"%PDF-1.4\n%empty\n")
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
mgr.flag(
doc_name="doc_empty",
reason="preflight_text_too_short",
detail="Only 10 chars extracted (seuil=100)",
severity="full",
extracted_chars=10,
)
quarantine_dir = out / "quarantaine"
assert quarantine_dir.exists(), "Le dossier quarantaine doit être créé"
assert (quarantine_dir / "doc_empty.reason.txt").exists()
assert quarantine_dir.stat().st_mode & 0o777 == 0o700, "quarantine_dir doit être 0700"
def test_preflight_reason_format(self, tmp_path: Path) -> None:
"""Le fichier .reason.txt doit contenir : raison, horodatage,
caractères extraits, version, profil."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234",
profile_name="standard_local")
mgr.flag(
doc_name="doc_empty",
reason="preflight_text_too_short",
detail="Only 10 chars extracted (seuil=100)",
severity="full",
extracted_chars=10,
)
reason = (out / "quarantaine" / "doc_empty.reason.txt").read_text()
assert "preflight_text_too_short" in reason
assert "Caractères extraits" in reason
assert "10" in reason
assert "Horodatage" in reason
assert "0.11.0" in reason
assert "abc1234" in reason
assert "standard_local" in reason
# === Tests Q-1 : quarantaine différentielle =====================
class TestRedactionFailure:
"""Q-1 — Rédaction PDF échoue → texte livré, PDF en quarantaine."""
def test_redaction_failure_text_still_outputs(self, tmp_path: Path) -> None:
"""Si la rédaction PDF échoue mais que l'anonymisation texte réussit :
- le .pseudonymise.txt sort normalement
- le PDF va en quarantaine avec flag pdf_redaction_failed
"""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
pdf = out / "doc_redact_fail.pdf"
pdf.write_bytes(b"%PDF-1.4\n%redact_fails\n")
# Simule le comportement de process_pdf quand vector échoue
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
# Texte anonymisé produit
txt = out / "doc_redact_fail.pseudonymise.txt"
txt.write_text("Patient [NOM] présenté le [DATE].\n")
audit = out / "doc_redact_fail.audit.jsonl"
audit.write_text('{"type": "mask", "label": "NOM"}\n')
# Vector échoue → flag partial
mgr.flag(
doc_name="doc_redact_fail",
reason="pdf_redaction_failed",
detail="vector failed (fitz.ApplyRedactionException); raster also failed (OOM)",
severity="partial",
)
assert txt.exists()
assert audit.exists()
reason = (out / "quarantaine" / "doc_redact_fail.reason.txt").read_text()
assert "pdf_redaction_failed" in reason
assert "partial" in reason
def test_no_silent_failure_on_redaction(self, tmp_path: Path) -> None:
"""Toute exception sur la rédaction DOIT être logguée (warning minimum).
Pas de `except Exception: pass` silencieux."""
import logging
# On teste que _append_errors_log ne mute pas les erreurs
# (le vrai comportement est testé par le test de symlink ci-dessous)
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out)
# Flag avec exception — vérifie que la stacktrace est capturée
try:
raise ValueError("ApplyRedactionException: invalid rect")
except ValueError as e:
mgr.flag(doc_name="doc1", reason="pdf_redaction_failed",
detail="vector failed", severity="partial", exc=e)
errors_log = out / "errors.log"
assert errors_log.exists()
lines = errors_log.read_text().splitlines()
assert len(lines) == 1
entry = json.loads(lines[0])
assert "pdf_redaction_failed" in entry["category"] or "pdf" in entry["category"]
# === Tests F : rescan résiduel (M5) =============================
class TestRescanQuarantine:
"""F / M5 — Rescan post-nettoyage détecte PII résiduelles → quarantaine full."""
def test_rescan_detects_residual_pii_triggers_quarantine(self, tmp_path: Path) -> None:
"""Si le rescan détecte des PII résiduelles > seuil (0 par défaut),
AUCUN fichier de sortie n'est livré — quarantaine full."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
mgr.flag(
doc_name="doc_leak",
reason="rescan_residual_pii",
detail="2 residual PII after all cleaning passes (seuil=0)",
severity="full",
)
# Le texte NE doit PAS être livré
assert not (out / "doc_leak.pseudonymise.txt").exists()
assert (out / "quarantaine" / "doc_leak.reason.txt").exists()
assert mgr.has_full_quarantine("doc_leak")
# === Tests F4 : patterns résiduels gated par catégorie désactivée ===
class TestResidualPatternsGating:
"""F-4 (P1-2) — `_build_residual_patterns(disabled)` : une catégorie
décochée ne doit pas déclencher la quarantaine résiduelle, ni directement,
ni via le pattern résiduel d'une autre catégorie (piège NIR ⇄ TEL)."""
def _labels(self, patterns):
return {label for _pat, label in patterns}
@staticmethod
def _residual_count(text, disabled):
"""Reproduit EXACTEMENT le calcul du call-site (process_pdf) :
seul le scan TEL voit le texte pré-masqué ; EMAIL/IBAN/NIR voient
le texte ORIGINAL."""
from anonymizer_core_refactored_onnx import (
_build_residual_patterns,
_residual_premask_text,
)
patterns = _build_residual_patterns(disabled)
tel_text = _residual_premask_text(text, disabled)
total = 0
for pat, label in patterns:
scan = tel_text if label == "TEL" else text
total += len(pat.findall(scan))
return total
def test_default_set_includes_all_labels(self) -> None:
"""Aucune catégorie désactivée → NIR, EMAIL, IBAN, TEL tous présents."""
from anonymizer_core_refactored_onnx import _build_residual_patterns
labels = self._labels(_build_residual_patterns(set()))
assert {"NIR", "EMAIL", "IBAN", "TEL"}.issubset(labels)
def test_nir_disabled_drops_nir_keeps_others(self) -> None:
"""NIR décoché → NIR absent, EMAIL/IBAN toujours présents."""
from anonymizer_core_refactored_onnx import _build_residual_patterns
labels = self._labels(_build_residual_patterns({"NIR"}))
assert "NIR" not in labels
assert "EMAIL" in labels
assert "IBAN" in labels
def test_tel_disabled_drops_tel(self) -> None:
"""TEL décoché → TEL absent."""
from anonymizer_core_refactored_onnx import _build_residual_patterns
labels = self._labels(_build_residual_patterns({"TEL"}))
assert "TEL" not in labels
def test_nir_disabled_tel_does_not_match_nir_in_clear(self) -> None:
"""Piège F-4 : NIR décoché laissé en clair → le pré-masquage SCOPÉ-TEL
empêche le pattern TEL de matcher le bloc central de chiffres du NIR.
Le NIR-pattern est retiré du set et EMAIL/IBAN ne matchent pas des
chiffres nus → décompte résiduel global == 0 pour ce NIR en clair."""
from anonymizer_core_refactored_onnx import (
_build_residual_patterns,
_residual_premask_text,
)
nir_en_clair = "1 85 05 74 123 456 78"
disabled = {"NIR"}
# Le pattern TEL appliqué au texte pré-masqué → 0 match.
patterns = _build_residual_patterns(disabled)
tel_pat = next(pat for pat, label in patterns if label == "TEL")
premasked = _residual_premask_text(nir_en_clair, disabled)
assert tel_pat.findall(premasked) == []
# Décompte résiduel global (logique call-site, TEL-scopé) == 0.
total = self._residual_count(nir_en_clair, disabled)
assert total == 0, (
f"NIR décoché ne doit pas déclencher la quarantaine, "
f"or {total} match(s) résiduel(s) sur {nir_en_clair!r}"
)
def test_nir_disabled_clear_iban_still_matches(self) -> None:
"""Fix 1 (régression) : le pré-masquage NIR est SCOPÉ au seul scan TEL.
Un IBAN en clair, avec NIR décoché, DOIT toujours déclencher le filet
IBAN résiduel — le pré-masquage ne doit PAS effacer ses groupes de
chiffres (sinon le backstop IBAN, toujours actif, serait affaibli)."""
from anonymizer_core_refactored_onnx import _build_residual_patterns
iban_clair = "FR76 3000 1007 9412 3456 7890 185"
disabled = {"NIR"}
# Le pattern IBAN (scanné sur le texte ORIGINAL) matche toujours.
patterns = _build_residual_patterns(disabled)
iban_pat = next(pat for pat, label in patterns if label == "IBAN")
assert iban_pat.findall(iban_clair), "le filet IBAN doit rester actif"
# Décompte résiduel global (logique call-site) ≥ 1.
total = self._residual_count(iban_clair, disabled)
assert total >= 1, (
f"IBAN en clair doit déclencher la quarantaine même NIR décoché, "
f"or {total} match(s)"
)
def test_residual_threshold_is_strict_zero_regardless_of_disabled(self) -> None:
"""Fix 2 (régression) : le seuil résiduel reste STRICT (0)
inconditionnellement. Un EMAIL en clair → 1 résidu, et 1 > 0 ⇒
quarantaine, même avec des catégories décochées (pas de relâchement
à 1 qui laisserait passer une fuite EMAIL/IBAN)."""
from anonymizer_core_refactored_onnx import SEUIL_RESCAN_RESIDUEL
assert SEUIL_RESCAN_RESIDUEL == 0
email_clair = "a@b.fr"
# Une catégorie est décochée mais le seuil effectif reste 0.
for disabled in (set(), {"NIR"}, {"NIR", "TEL"}):
total = self._residual_count(email_clair, disabled)
assert total == 1, (disabled, total)
# 1 résidu > seuil strict (0) ⇒ quarantaine déclenchée.
assert total > SEUIL_RESCAN_RESIDUEL
def test_nir_enabled_tel_behavior_unchanged(self) -> None:
"""Non-régression : NIR activé → le pré-masquage est l'identité et
un vrai téléphone est toujours détecté par le pattern TEL."""
from anonymizer_core_refactored_onnx import (
_build_residual_patterns,
_residual_premask_text,
)
tel = "06 12 34 56 78"
patterns = _build_residual_patterns(set())
text = _residual_premask_text(tel, set())
assert text == tel # identité quand rien n'est désactivé
tel_pat = next(pat for pat, label in patterns if label == "TEL")
assert tel_pat.findall(tel), "un vrai téléphone doit rester détecté"
# === Tests A : INDEX.md et errors.log ===========================
class TestQuarantineArtifacts:
"""A — Artifacts de quarantaine : INDEX.md, errors.log."""
def test_quarantine_index_md_format(self, tmp_path: Path) -> None:
"""INDEX.md doit lister tous les docs en quarantaine avec raison,
caractères extraits, action recommandée."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
mgr.flag(
doc_name="doc_empty",
reason="preflight_text_too_short",
detail="Only 10 chars",
severity="full",
extracted_chars=10,
)
mgr.flag(
doc_name="doc_fail",
reason="pdf_redaction_failed",
detail="vector failed",
severity="partial",
)
mgr.finalize(total_docs_processed=5)
index = out / "quarantaine" / "INDEX.md"
assert index.exists()
content = index.read_text()
assert "doc_empty" in content
assert "doc_fail" in content
assert "Quarantaine totale" in content
assert "Quarantaine partielle" in content
assert "Taux" in content
# 2 docs flaggés sur 5 traités = 40%
assert "40.0%" in content
def test_errors_log_json_lines(self, tmp_path: Path) -> None:
"""errors.log doit être un fichier JSON-lines valide,
avec ts, doc, level, category, msg, severity."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
mgr.flag(
doc_name="doc1",
reason="preflight_text_too_short",
detail="Only 10 chars",
severity="full",
)
mgr.flag(
doc_name="doc2",
reason="pdf_redaction_failed",
detail="vector failed",
severity="partial",
)
errors_log = out / "errors.log"
assert errors_log.exists()
# Vérifier permissions (0o600)
mode = errors_log.stat().st_mode & 0o777
assert mode == 0o600, f"errors.log permissions should be 0600, got {oct(mode)}"
lines = errors_log.read_text().splitlines()
assert len(lines) == 2
for line in lines:
entry = json.loads(line) # doit parser sans erreur
assert "ts" in entry
assert "doc" in entry
assert "level" in entry
assert "category" in entry
assert "msg" in entry
assert "severity" in entry
assert lines[0].startswith("{") # JSON-lines format
entry1 = json.loads(lines[0])
assert entry1["severity"] == "full"
assert entry1["category"] == "preflight"
entry2 = json.loads(lines[1])
assert entry2["severity"] == "partial"
assert entry2["category"] == "pdf"
# === Tests de non-régression ====================================
def test_happy_path_no_quarantine_created_if_no_failure(tmp_path: Path) -> None:
"""Non-régression — Sans flag, aucun dossier quarantaine/ créé."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out, app_version="0.11.0")
# Aucun flag → pas de quarantine_dir créé
assert not (out / "quarantaine").exists()
# === Tests security : permissions + symlink =====================
class TestSecurity:
"""Tests des fixes sécurité (Criticals 1-2, M1-M2)."""
def test_quarantine_dir_permissions(self, tmp_path: Path) -> None:
"""quarantine_dir doit avoir des permissions 0o700."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
mgr = QuarantineManager(out)
mgr.flag(doc_name="doc1", reason="test", detail="test", severity="full")
qdir = out / "quarantaine"
mode = qdir.stat().st_mode & 0o777
assert mode == 0o700, f"quarantine_dir should be 0700, got {oct(mode)}"
def test_symlink_errors_log_refused(self, tmp_path: Path) -> None:
"""Si errors.log est un symlink, _append_errors_log doit refuser d'écrire
(O_NOFOLLOW lève OSError)."""
from quarantine import QuarantineManager
out = tmp_path / "output"
out.mkdir()
target = tmp_path / "symlink_target.txt"
target.write_text("innocent")
(out / "errors.log").symlink_to(target)
mgr = QuarantineManager(out)
# O_NOFOLLOW lève OSError (ELOOP), pas RuntimeError
with pytest.raises(OSError):
mgr.flag(doc_name="doc1", reason="test", detail="test", severity="full")
def test_o_nofollow_refuses_symlink_at_creation(self, tmp_path: Path) -> None:
"""os.open(O_NOFOLLOW) doit refuser la création via symlink."""
import os as _os
target = tmp_path / "target.txt"
target.write_text("innocent")
link = tmp_path / "errors.log"
link.symlink_to(target)
with pytest.raises(OSError):
fd = _os.open(str(link), _os.O_CREAT | _os.O_APPEND | _os.O_WRONLY | _os.O_NOFOLLOW, 0o600)
_os.close(fd)