""" Tests pour Q-1 — Quarantaine différentielle. Couvre : pré-flight B-3, quarantaine D2/D3, rescan résiduel M5, INDEX.md, errors.log. Les tests B-1 (metadata XMP) et B-2 (per-doc log) restent xfail car non implémentés. """ from __future__ import annotations import json import os import textwrap from pathlib import Path from unittest.mock import patch, MagicMock import pytest # === Fixtures ==================================================== @pytest.fixture def tmp_output_dir(tmp_path: Path) -> Path: out = tmp_path / "output" out.mkdir() return out @pytest.fixture def fake_pdf_path(tmp_path: Path) -> Path: p = tmp_path / "doc_ok.pdf" p.write_bytes(b"%PDF-1.4\n%fake\n") return p # === Tests B-3 : pré-flight texte vide =========================== class TestPreflight: """B-3 — Pré-flight : texte < SEUIL_TEXTE_MINI → quarantaine full.""" def test_preflight_empty_text_goes_to_quarantine(self, tmp_path: Path) -> None: """Un document dont l'extraction retourne < 100 chars va en quarantaine sans produire de texte/PDF de sortie.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() pdf = out / "doc_empty.pdf" pdf.write_bytes(b"%PDF-1.4\n%empty\n") mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234") mgr.flag( doc_name="doc_empty", reason="preflight_text_too_short", detail="Only 10 chars extracted (seuil=100)", severity="full", extracted_chars=10, ) quarantine_dir = out / "quarantaine" assert quarantine_dir.exists(), "Le dossier quarantaine doit être créé" assert (quarantine_dir / "doc_empty.reason.txt").exists() assert quarantine_dir.stat().st_mode & 0o777 == 0o700, "quarantine_dir doit être 0700" def test_preflight_reason_format(self, tmp_path: Path) -> None: """Le fichier .reason.txt doit contenir : raison, horodatage, caractères extraits, version, profil.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234", profile_name="standard_local") mgr.flag( doc_name="doc_empty", reason="preflight_text_too_short", detail="Only 10 chars extracted (seuil=100)", severity="full", extracted_chars=10, ) reason = (out / "quarantaine" / "doc_empty.reason.txt").read_text() assert "preflight_text_too_short" in reason assert "Caractères extraits" in reason assert "10" in reason assert "Horodatage" in reason assert "0.11.0" in reason assert "abc1234" in reason assert "standard_local" in reason # === Tests Q-1 : quarantaine différentielle ===================== class TestRedactionFailure: """Q-1 — Rédaction PDF échoue → texte livré, PDF en quarantaine.""" def test_redaction_failure_text_still_outputs(self, tmp_path: Path) -> None: """Si la rédaction PDF échoue mais que l'anonymisation texte réussit : - le .pseudonymise.txt sort normalement - le PDF va en quarantaine avec flag pdf_redaction_failed """ from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() pdf = out / "doc_redact_fail.pdf" pdf.write_bytes(b"%PDF-1.4\n%redact_fails\n") # Simule le comportement de process_pdf quand vector échoue mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234") # Texte anonymisé produit txt = out / "doc_redact_fail.pseudonymise.txt" txt.write_text("Patient [NOM] présenté le [DATE].\n") audit = out / "doc_redact_fail.audit.jsonl" audit.write_text('{"type": "mask", "label": "NOM"}\n') # Vector échoue → flag partial mgr.flag( doc_name="doc_redact_fail", reason="pdf_redaction_failed", detail="vector failed (fitz.ApplyRedactionException); raster also failed (OOM)", severity="partial", ) assert txt.exists() assert audit.exists() reason = (out / "quarantaine" / "doc_redact_fail.reason.txt").read_text() assert "pdf_redaction_failed" in reason assert "partial" in reason def test_no_silent_failure_on_redaction(self, tmp_path: Path) -> None: """Toute exception sur la rédaction DOIT être logguée (warning minimum). Pas de `except Exception: pass` silencieux.""" import logging # On teste que _append_errors_log ne mute pas les erreurs # (le vrai comportement est testé par le test de symlink ci-dessous) from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out) # Flag avec exception — vérifie que la stacktrace est capturée try: raise ValueError("ApplyRedactionException: invalid rect") except ValueError as e: mgr.flag(doc_name="doc1", reason="pdf_redaction_failed", detail="vector failed", severity="partial", exc=e) errors_log = out / "errors.log" assert errors_log.exists() lines = errors_log.read_text().splitlines() assert len(lines) == 1 entry = json.loads(lines[0]) assert "pdf_redaction_failed" in entry["category"] or "pdf" in entry["category"] # === Tests F : rescan résiduel (M5) ============================= class TestRescanQuarantine: """F / M5 — Rescan post-nettoyage détecte PII résiduelles → quarantaine full.""" def test_rescan_detects_residual_pii_triggers_quarantine(self, tmp_path: Path) -> None: """Si le rescan détecte des PII résiduelles > seuil (0 par défaut), AUCUN fichier de sortie n'est livré — quarantaine full.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234") mgr.flag( doc_name="doc_leak", reason="rescan_residual_pii", detail="2 residual PII after all cleaning passes (seuil=0)", severity="full", ) # Le texte NE doit PAS être livré assert not (out / "doc_leak.pseudonymise.txt").exists() assert (out / "quarantaine" / "doc_leak.reason.txt").exists() assert mgr.has_full_quarantine("doc_leak") # === Tests F4 : patterns résiduels gated par catégorie désactivée === class TestResidualPatternsGating: """F-4 (P1-2) — `_build_residual_patterns(disabled)` : une catégorie décochée ne doit pas déclencher la quarantaine résiduelle, ni directement, ni via le pattern résiduel d'une autre catégorie (piège NIR ⇄ TEL).""" def _labels(self, patterns): return {label for _pat, label in patterns} @staticmethod def _residual_count(text, disabled): """Reproduit EXACTEMENT le calcul du call-site (process_pdf) : seul le scan TEL voit le texte pré-masqué ; EMAIL/IBAN/NIR voient le texte ORIGINAL.""" from anonymizer_core_refactored_onnx import ( _build_residual_patterns, _residual_premask_text, ) patterns = _build_residual_patterns(disabled) tel_text = _residual_premask_text(text, disabled) total = 0 for pat, label in patterns: scan = tel_text if label == "TEL" else text total += len(pat.findall(scan)) return total def test_default_set_includes_all_labels(self) -> None: """Aucune catégorie désactivée → NIR, EMAIL, IBAN, TEL tous présents.""" from anonymizer_core_refactored_onnx import _build_residual_patterns labels = self._labels(_build_residual_patterns(set())) assert {"NIR", "EMAIL", "IBAN", "TEL"}.issubset(labels) def test_nir_disabled_drops_nir_keeps_others(self) -> None: """NIR décoché → NIR absent, EMAIL/IBAN toujours présents.""" from anonymizer_core_refactored_onnx import _build_residual_patterns labels = self._labels(_build_residual_patterns({"NIR"})) assert "NIR" not in labels assert "EMAIL" in labels assert "IBAN" in labels def test_tel_disabled_drops_tel(self) -> None: """TEL décoché → TEL absent.""" from anonymizer_core_refactored_onnx import _build_residual_patterns labels = self._labels(_build_residual_patterns({"TEL"})) assert "TEL" not in labels def test_nir_disabled_tel_does_not_match_nir_in_clear(self) -> None: """Piège F-4 : NIR décoché laissé en clair → le pré-masquage SCOPÉ-TEL empêche le pattern TEL de matcher le bloc central de chiffres du NIR. Le NIR-pattern est retiré du set et EMAIL/IBAN ne matchent pas des chiffres nus → décompte résiduel global == 0 pour ce NIR en clair.""" from anonymizer_core_refactored_onnx import ( _build_residual_patterns, _residual_premask_text, ) nir_en_clair = "1 85 05 74 123 456 78" disabled = {"NIR"} # Le pattern TEL appliqué au texte pré-masqué → 0 match. patterns = _build_residual_patterns(disabled) tel_pat = next(pat for pat, label in patterns if label == "TEL") premasked = _residual_premask_text(nir_en_clair, disabled) assert tel_pat.findall(premasked) == [] # Décompte résiduel global (logique call-site, TEL-scopé) == 0. total = self._residual_count(nir_en_clair, disabled) assert total == 0, ( f"NIR décoché ne doit pas déclencher la quarantaine, " f"or {total} match(s) résiduel(s) sur {nir_en_clair!r}" ) def test_nir_disabled_clear_iban_still_matches(self) -> None: """Fix 1 (régression) : le pré-masquage NIR est SCOPÉ au seul scan TEL. Un IBAN en clair, avec NIR décoché, DOIT toujours déclencher le filet IBAN résiduel — le pré-masquage ne doit PAS effacer ses groupes de chiffres (sinon le backstop IBAN, toujours actif, serait affaibli).""" from anonymizer_core_refactored_onnx import _build_residual_patterns iban_clair = "FR76 3000 1007 9412 3456 7890 185" disabled = {"NIR"} # Le pattern IBAN (scanné sur le texte ORIGINAL) matche toujours. patterns = _build_residual_patterns(disabled) iban_pat = next(pat for pat, label in patterns if label == "IBAN") assert iban_pat.findall(iban_clair), "le filet IBAN doit rester actif" # Décompte résiduel global (logique call-site) ≥ 1. total = self._residual_count(iban_clair, disabled) assert total >= 1, ( f"IBAN en clair doit déclencher la quarantaine même NIR décoché, " f"or {total} match(s)" ) def test_residual_threshold_is_strict_zero_regardless_of_disabled(self) -> None: """Fix 2 (régression) : le seuil résiduel reste STRICT (0) inconditionnellement. Un EMAIL en clair → 1 résidu, et 1 > 0 ⇒ quarantaine, même avec des catégories décochées (pas de relâchement à 1 qui laisserait passer une fuite EMAIL/IBAN).""" from anonymizer_core_refactored_onnx import SEUIL_RESCAN_RESIDUEL assert SEUIL_RESCAN_RESIDUEL == 0 email_clair = "a@b.fr" # Une catégorie est décochée mais le seuil effectif reste 0. for disabled in (set(), {"NIR"}, {"NIR", "TEL"}): total = self._residual_count(email_clair, disabled) assert total == 1, (disabled, total) # 1 résidu > seuil strict (0) ⇒ quarantaine déclenchée. assert total > SEUIL_RESCAN_RESIDUEL def test_nir_enabled_tel_behavior_unchanged(self) -> None: """Non-régression : NIR activé → le pré-masquage est l'identité et un vrai téléphone est toujours détecté par le pattern TEL.""" from anonymizer_core_refactored_onnx import ( _build_residual_patterns, _residual_premask_text, ) tel = "06 12 34 56 78" patterns = _build_residual_patterns(set()) text = _residual_premask_text(tel, set()) assert text == tel # identité quand rien n'est désactivé tel_pat = next(pat for pat, label in patterns if label == "TEL") assert tel_pat.findall(tel), "un vrai téléphone doit rester détecté" # === Tests A : INDEX.md et errors.log =========================== class TestQuarantineArtifacts: """A — Artifacts de quarantaine : INDEX.md, errors.log.""" def test_quarantine_index_md_format(self, tmp_path: Path) -> None: """INDEX.md doit lister tous les docs en quarantaine avec raison, caractères extraits, action recommandée.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234") mgr.flag( doc_name="doc_empty", reason="preflight_text_too_short", detail="Only 10 chars", severity="full", extracted_chars=10, ) mgr.flag( doc_name="doc_fail", reason="pdf_redaction_failed", detail="vector failed", severity="partial", ) mgr.finalize(total_docs_processed=5) index = out / "quarantaine" / "INDEX.md" assert index.exists() content = index.read_text() assert "doc_empty" in content assert "doc_fail" in content assert "Quarantaine totale" in content assert "Quarantaine partielle" in content assert "Taux" in content # 2 docs flaggés sur 5 traités = 40% assert "40.0%" in content def test_errors_log_json_lines(self, tmp_path: Path) -> None: """errors.log doit être un fichier JSON-lines valide, avec ts, doc, level, category, msg, severity.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234") mgr.flag( doc_name="doc1", reason="preflight_text_too_short", detail="Only 10 chars", severity="full", ) mgr.flag( doc_name="doc2", reason="pdf_redaction_failed", detail="vector failed", severity="partial", ) errors_log = out / "errors.log" assert errors_log.exists() # Vérifier permissions (0o600) mode = errors_log.stat().st_mode & 0o777 assert mode == 0o600, f"errors.log permissions should be 0600, got {oct(mode)}" lines = errors_log.read_text().splitlines() assert len(lines) == 2 for line in lines: entry = json.loads(line) # doit parser sans erreur assert "ts" in entry assert "doc" in entry assert "level" in entry assert "category" in entry assert "msg" in entry assert "severity" in entry assert lines[0].startswith("{") # JSON-lines format entry1 = json.loads(lines[0]) assert entry1["severity"] == "full" assert entry1["category"] == "preflight" entry2 = json.loads(lines[1]) assert entry2["severity"] == "partial" assert entry2["category"] == "pdf" # === Tests de non-régression ==================================== def test_happy_path_no_quarantine_created_if_no_failure(tmp_path: Path) -> None: """Non-régression — Sans flag, aucun dossier quarantaine/ créé.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out, app_version="0.11.0") # Aucun flag → pas de quarantine_dir créé assert not (out / "quarantaine").exists() # === Tests security : permissions + symlink ===================== class TestSecurity: """Tests des fixes sécurité (Criticals 1-2, M1-M2).""" def test_quarantine_dir_permissions(self, tmp_path: Path) -> None: """quarantine_dir doit avoir des permissions 0o700.""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() mgr = QuarantineManager(out) mgr.flag(doc_name="doc1", reason="test", detail="test", severity="full") qdir = out / "quarantaine" mode = qdir.stat().st_mode & 0o777 assert mode == 0o700, f"quarantine_dir should be 0700, got {oct(mode)}" def test_symlink_errors_log_refused(self, tmp_path: Path) -> None: """Si errors.log est un symlink, _append_errors_log doit refuser d'écrire (O_NOFOLLOW lève OSError).""" from quarantine import QuarantineManager out = tmp_path / "output" out.mkdir() target = tmp_path / "symlink_target.txt" target.write_text("innocent") (out / "errors.log").symlink_to(target) mgr = QuarantineManager(out) # O_NOFOLLOW lève OSError (ELOOP), pas RuntimeError with pytest.raises(OSError): mgr.flag(doc_name="doc1", reason="test", detail="test", severity="full") def test_o_nofollow_refuses_symlink_at_creation(self, tmp_path: Path) -> None: """os.open(O_NOFOLLOW) doit refuser la création via symlink.""" import os as _os target = tmp_path / "target.txt" target.write_text("innocent") link = tmp_path / "errors.log" link.symlink_to(target) with pytest.raises(OSError): fd = _os.open(str(link), _os.O_CREAT | _os.O_APPEND | _os.O_WRONLY | _os.O_NOFOLLOW, 0o600) _os.close(fd)