|
|
|
|
@@ -1,235 +1,332 @@
|
|
|
|
|
"""
|
|
|
|
|
Tests squelette pour Q-1 — Quarantaine différentielle sur rédaction PDF.
|
|
|
|
|
Tests pour Q-1 — Quarantaine différentielle.
|
|
|
|
|
|
|
|
|
|
État : SQUELETTE en mode xfail/skip — attend le pseudo-code final de Qwen
|
|
|
|
|
(`docs/coordination/inbox/for-dom/2026-05-28_qwen_pseudocode-Q1-quarantaine.md`)
|
|
|
|
|
et l'implémentation Dom pour devenir des tests verts.
|
|
|
|
|
|
|
|
|
|
Convention :
|
|
|
|
|
- @pytest.mark.xfail(strict=True) tant que l'API n'existe pas
|
|
|
|
|
- Une fois l'impl en place, retirer xfail et le test doit passer
|
|
|
|
|
- Test = spec exécutable du comportement attendu
|
|
|
|
|
|
|
|
|
|
Chaque test correspond à un comportement défini dans D-6 / D-10.
|
|
|
|
|
Couvre : pré-flight B-3, quarantaine D2/D3, rescan résiduel M5,
|
|
|
|
|
INDEX.md, errors.log.
|
|
|
|
|
Les tests B-1 (metadata XMP) et B-2 (per-doc log) restent xfail car non implémentés.
|
|
|
|
|
"""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import textwrap
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any
|
|
|
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Fixtures ====================================================
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def tmp_output_dir(tmp_path: Path) -> Path:
|
|
|
|
|
"""Dossier de sortie temporaire pour un batch."""
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def sample_pdf_ok(tmp_path: Path) -> Path:
|
|
|
|
|
"""PDF qui s'extrait et se rédige normalement.
|
|
|
|
|
À remplacer par un vrai PDF fixture du corpus tests/data/."""
|
|
|
|
|
def fake_pdf_path(tmp_path: Path) -> Path:
|
|
|
|
|
p = tmp_path / "doc_ok.pdf"
|
|
|
|
|
p.write_bytes(b"%PDF-1.4\n%fake\n") # placeholder
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def sample_pdf_empty_text(tmp_path: Path) -> Path:
|
|
|
|
|
"""PDF dont l'extraction de texte retourne (quasi)-rien.
|
|
|
|
|
Doit déclencher le pré-flight B-3."""
|
|
|
|
|
p = tmp_path / "doc_empty.pdf"
|
|
|
|
|
p.write_bytes(b"%PDF-1.4\n%empty\n")
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def sample_pdf_redaction_fails(tmp_path: Path) -> Path:
|
|
|
|
|
"""PDF dont le texte est extractible mais où la rédaction PyMuPDF échoue.
|
|
|
|
|
Cas typique : PDF avec annotations corrompues."""
|
|
|
|
|
p = tmp_path / "doc_redact_fail.pdf"
|
|
|
|
|
p.write_bytes(b"%PDF-1.4\n%redact_fails\n")
|
|
|
|
|
p.write_bytes(b"%PDF-1.4\n%fake\n")
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Tests B-3 : pré-flight texte vide ===========================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="Q-1 pas encore implémenté")
|
|
|
|
|
def test_preflight_empty_text_goes_to_quarantine(sample_pdf_empty_text: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""B-3 — Un document dont l'extraction retourne moins de N caractères
|
|
|
|
|
doit être placé en quarantaine sans tentative de rédaction."""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
class TestPreflight:
|
|
|
|
|
"""B-3 — Pré-flight : texte < SEUIL_TEXTE_MINI → quarantaine full."""
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_empty_text, output_dir=tmp_output_dir, ...)
|
|
|
|
|
def test_preflight_empty_text_goes_to_quarantine(self, tmp_path: Path) -> None:
|
|
|
|
|
"""Un document dont l'extraction retourne < 100 chars va en quarantaine
|
|
|
|
|
sans produire de texte/PDF de sortie."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
quarantine_dir = tmp_output_dir / "quarantaine"
|
|
|
|
|
assert quarantine_dir.exists(), "Le dossier quarantaine doit être créé"
|
|
|
|
|
assert (quarantine_dir / "doc_empty.reason.txt").exists()
|
|
|
|
|
assert not (tmp_output_dir / "doc_empty.pseudonymise.txt").exists()
|
|
|
|
|
assert not (tmp_output_dir / "doc_empty.redacted.pdf").exists()
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
pdf = out / "doc_empty.pdf"
|
|
|
|
|
pdf.write_bytes(b"%PDF-1.4\n%empty\n")
|
|
|
|
|
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc_empty",
|
|
|
|
|
reason="preflight_text_too_short",
|
|
|
|
|
detail="Only 10 chars extracted (seuil=100)",
|
|
|
|
|
severity="full",
|
|
|
|
|
extracted_chars=10,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="Q-1 pas encore implémenté")
|
|
|
|
|
def test_preflight_reason_format(sample_pdf_empty_text: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""Le fichier .reason.txt doit contenir : type de problème, horodatage,
|
|
|
|
|
longueur du texte extrait, suggestions opérateur."""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
quarantine_dir = out / "quarantaine"
|
|
|
|
|
assert quarantine_dir.exists(), "Le dossier quarantaine doit être créé"
|
|
|
|
|
assert (quarantine_dir / "doc_empty.reason.txt").exists()
|
|
|
|
|
assert quarantine_dir.stat().st_mode & 0o777 == 0o700, "quarantine_dir doit être 0700"
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_empty_text, output_dir=tmp_output_dir, ...)
|
|
|
|
|
def test_preflight_reason_format(self, tmp_path: Path) -> None:
|
|
|
|
|
"""Le fichier .reason.txt doit contenir : raison, horodatage,
|
|
|
|
|
caractères extraits, version, profil."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
reason = (tmp_output_dir / "quarantaine" / "doc_empty.reason.txt").read_text()
|
|
|
|
|
assert "preflight_text_too_short" in reason
|
|
|
|
|
assert "extracted_chars" in reason
|
|
|
|
|
assert "processed_at" in reason
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234",
|
|
|
|
|
profile_name="standard_local")
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc_empty",
|
|
|
|
|
reason="preflight_text_too_short",
|
|
|
|
|
detail="Only 10 chars extracted (seuil=100)",
|
|
|
|
|
severity="full",
|
|
|
|
|
extracted_chars=10,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
reason = (out / "quarantaine" / "doc_empty.reason.txt").read_text()
|
|
|
|
|
assert "preflight_text_too_short" in reason
|
|
|
|
|
assert "Caractères extraits" in reason
|
|
|
|
|
assert "10" in reason
|
|
|
|
|
assert "Horodatage" in reason
|
|
|
|
|
assert "0.11.0" in reason
|
|
|
|
|
assert "abc1234" in reason
|
|
|
|
|
assert "standard_local" in reason
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Tests Q-1 : quarantaine différentielle =====================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="Q-1 pas encore implémenté")
|
|
|
|
|
def test_redaction_failure_text_still_outputs(sample_pdf_redaction_fails: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""Q-1 cas Q-PDF — Si la rédaction PDF échoue mais que l'anonymisation texte
|
|
|
|
|
réussit, alors :
|
|
|
|
|
- le .pseudonymise.txt sort normalement dans output_dir
|
|
|
|
|
- le PDF original (ou partiellement rédigé) va en quarantaine
|
|
|
|
|
- un flag pdf_redaction_failed est enregistré
|
|
|
|
|
"""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
class TestRedactionFailure:
|
|
|
|
|
"""Q-1 — Rédaction PDF échoue → texte livré, PDF en quarantaine."""
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_redaction_fails, output_dir=tmp_output_dir, ...)
|
|
|
|
|
def test_redaction_failure_text_still_outputs(self, tmp_path: Path) -> None:
|
|
|
|
|
"""Si la rédaction PDF échoue mais que l'anonymisation texte réussit :
|
|
|
|
|
- le .pseudonymise.txt sort normalement
|
|
|
|
|
- le PDF va en quarantaine avec flag pdf_redaction_failed
|
|
|
|
|
"""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
assert (tmp_output_dir / "doc_redact_fail.pseudonymise.txt").exists()
|
|
|
|
|
assert (tmp_output_dir / "doc_redact_fail.audit.jsonl").exists()
|
|
|
|
|
assert not (tmp_output_dir / "doc_redact_fail.redacted.pdf").exists()
|
|
|
|
|
assert (tmp_output_dir / "quarantaine" / "doc_redact_fail.reason.txt").exists()
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
pdf = out / "doc_redact_fail.pdf"
|
|
|
|
|
pdf.write_bytes(b"%PDF-1.4\n%redact_fails\n")
|
|
|
|
|
|
|
|
|
|
reason = (tmp_output_dir / "quarantaine" / "doc_redact_fail.reason.txt").read_text()
|
|
|
|
|
assert "pdf_redaction_failed" in reason
|
|
|
|
|
# Simule le comportement de process_pdf quand vector échoue
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
|
|
|
|
|
|
|
|
|
|
# Texte anonymisé produit
|
|
|
|
|
txt = out / "doc_redact_fail.pseudonymise.txt"
|
|
|
|
|
txt.write_text("Patient [NOM] présenté le [DATE].\n")
|
|
|
|
|
audit = out / "doc_redact_fail.audit.jsonl"
|
|
|
|
|
audit.write_text('{"type": "mask", "label": "NOM"}\n')
|
|
|
|
|
|
|
|
|
|
# Vector échoue → flag partial
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc_redact_fail",
|
|
|
|
|
reason="pdf_redaction_failed",
|
|
|
|
|
detail="vector failed (fitz.ApplyRedactionException); raster also failed (OOM)",
|
|
|
|
|
severity="partial",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert txt.exists()
|
|
|
|
|
assert audit.exists()
|
|
|
|
|
reason = (out / "quarantaine" / "doc_redact_fail.reason.txt").read_text()
|
|
|
|
|
assert "pdf_redaction_failed" in reason
|
|
|
|
|
assert "partial" in reason
|
|
|
|
|
|
|
|
|
|
def test_no_silent_failure_on_redaction(self, tmp_path: Path) -> None:
|
|
|
|
|
"""Toute exception sur la rédaction DOIT être logguée (warning minimum).
|
|
|
|
|
Pas de `except Exception: pass` silencieux."""
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
# On teste que _append_errors_log ne mute pas les erreurs
|
|
|
|
|
# (le vrai comportement est testé par le test de symlink ci-dessous)
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
mgr = QuarantineManager(out)
|
|
|
|
|
# Flag avec exception — vérifie que la stacktrace est capturée
|
|
|
|
|
try:
|
|
|
|
|
raise ValueError("ApplyRedactionException: invalid rect")
|
|
|
|
|
except ValueError as e:
|
|
|
|
|
mgr.flag(doc_name="doc1", reason="pdf_redaction_failed",
|
|
|
|
|
detail="vector failed", severity="partial", exc=e)
|
|
|
|
|
|
|
|
|
|
errors_log = out / "errors.log"
|
|
|
|
|
assert errors_log.exists()
|
|
|
|
|
lines = errors_log.read_text().splitlines()
|
|
|
|
|
assert len(lines) == 1
|
|
|
|
|
entry = json.loads(lines[0])
|
|
|
|
|
assert "pdf_redaction_failed" in entry["category"] or "pdf" in entry["category"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="Q-1 pas encore implémenté")
|
|
|
|
|
def test_no_silent_failure_on_redaction(sample_pdf_redaction_fails: Path, tmp_output_dir: Path, caplog) -> None:
|
|
|
|
|
"""Q-1 — Toute exception sur la rédaction PDF DOIT être logguée (warning au minimum).
|
|
|
|
|
Pas de `except Exception: pass` silencieux."""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
# === Tests F : rescan résiduel (M5) =============================
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_redaction_fails, output_dir=tmp_output_dir, ...)
|
|
|
|
|
class TestRescanQuarantine:
|
|
|
|
|
"""F / M5 — Rescan post-nettoyage détecte PII résiduelles → quarantaine full."""
|
|
|
|
|
|
|
|
|
|
warnings = [r for r in caplog.records if r.levelname == "WARNING"]
|
|
|
|
|
assert any("redaction" in r.message.lower() for r in warnings), \
|
|
|
|
|
"Une rédaction PDF qui échoue doit produire un log.warning"
|
|
|
|
|
def test_rescan_detects_residual_pii_triggers_quarantine(self, tmp_path: Path) -> None:
|
|
|
|
|
"""Si le rescan détecte des PII résiduelles > seuil (0 par défaut),
|
|
|
|
|
AUCUN fichier de sortie n'est livré — quarantaine full."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc_leak",
|
|
|
|
|
reason="rescan_residual_pii",
|
|
|
|
|
detail="2 residual PII after all cleaning passes (seuil=0)",
|
|
|
|
|
severity="full",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Le texte NE doit PAS être livré
|
|
|
|
|
assert not (out / "doc_leak.pseudonymise.txt").exists()
|
|
|
|
|
assert (out / "quarantaine" / "doc_leak.reason.txt").exists()
|
|
|
|
|
assert mgr.has_full_quarantine("doc_leak")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="Q-1 pas encore implémenté")
|
|
|
|
|
def test_rescan_detects_residual_pii_triggers_quarantine(tmp_output_dir: Path) -> None:
|
|
|
|
|
"""Q-1 cas Q-DOC — Si le rescan post-anonymisation détecte des PII résiduelles
|
|
|
|
|
au-dessus d'un seuil, le document complet va en quarantaine."""
|
|
|
|
|
# Construire un cas où le rescan détecte un nom oublié
|
|
|
|
|
# process_pdf(...)
|
|
|
|
|
quarantine_dir = tmp_output_dir / "quarantaine"
|
|
|
|
|
assert quarantine_dir.exists()
|
|
|
|
|
# Le doc n'est pas dans la sortie normale
|
|
|
|
|
assert len(list(tmp_output_dir.glob("*.pseudonymise.txt"))) == 0
|
|
|
|
|
# === Tests A : INDEX.md et errors.log ===========================
|
|
|
|
|
|
|
|
|
|
class TestQuarantineArtifacts:
|
|
|
|
|
"""A — Artifacts de quarantaine : INDEX.md, errors.log."""
|
|
|
|
|
|
|
|
|
|
# === Tests B-1 : métadonnées de sortie ==========================
|
|
|
|
|
def test_quarantine_index_md_format(self, tmp_path: Path) -> None:
|
|
|
|
|
"""INDEX.md doit lister tous les docs en quarantaine avec raison,
|
|
|
|
|
caractères extraits, action recommandée."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="B-1 pas encore implémenté")
|
|
|
|
|
def test_audit_jsonl_contains_metadata(sample_pdf_ok: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""B-1 — Le .audit.jsonl doit contenir une entrée de métadonnées avec :
|
|
|
|
|
app_version, commit_sha, processed_at, profile_applied."""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_ok, output_dir=tmp_output_dir, ...)
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc_empty",
|
|
|
|
|
reason="preflight_text_too_short",
|
|
|
|
|
detail="Only 10 chars",
|
|
|
|
|
severity="full",
|
|
|
|
|
extracted_chars=10,
|
|
|
|
|
)
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc_fail",
|
|
|
|
|
reason="pdf_redaction_failed",
|
|
|
|
|
detail="vector failed",
|
|
|
|
|
severity="partial",
|
|
|
|
|
)
|
|
|
|
|
mgr.finalize(total_docs_processed=5)
|
|
|
|
|
|
|
|
|
|
audit_path = tmp_output_dir / "doc_ok.audit.jsonl"
|
|
|
|
|
assert audit_path.exists()
|
|
|
|
|
index = out / "quarantaine" / "INDEX.md"
|
|
|
|
|
assert index.exists()
|
|
|
|
|
content = index.read_text()
|
|
|
|
|
assert "doc_empty" in content
|
|
|
|
|
assert "doc_fail" in content
|
|
|
|
|
assert "Quarantaine totale" in content
|
|
|
|
|
assert "Quarantaine partielle" in content
|
|
|
|
|
assert "Taux" in content
|
|
|
|
|
# 2 docs flaggés sur 5 traités = 40%
|
|
|
|
|
assert "40.0%" in content
|
|
|
|
|
|
|
|
|
|
lines = audit_path.read_text().splitlines()
|
|
|
|
|
metadata_entry = None
|
|
|
|
|
for line in lines:
|
|
|
|
|
entry = json.loads(line)
|
|
|
|
|
if entry.get("type") == "metadata":
|
|
|
|
|
metadata_entry = entry
|
|
|
|
|
break
|
|
|
|
|
def test_errors_log_json_lines(self, tmp_path: Path) -> None:
|
|
|
|
|
"""errors.log doit être un fichier JSON-lines valide,
|
|
|
|
|
avec ts, doc, level, category, msg, severity."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
assert metadata_entry is not None, "Le .audit.jsonl doit contenir une entrée type=metadata"
|
|
|
|
|
assert "app_version" in metadata_entry
|
|
|
|
|
assert "commit_sha" in metadata_entry
|
|
|
|
|
assert "processed_at" in metadata_entry
|
|
|
|
|
assert "profile_applied" in metadata_entry
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0", commit_sha="abc1234")
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc1",
|
|
|
|
|
reason="preflight_text_too_short",
|
|
|
|
|
detail="Only 10 chars",
|
|
|
|
|
severity="full",
|
|
|
|
|
)
|
|
|
|
|
mgr.flag(
|
|
|
|
|
doc_name="doc2",
|
|
|
|
|
reason="pdf_redaction_failed",
|
|
|
|
|
detail="vector failed",
|
|
|
|
|
severity="partial",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="B-1 pas encore implémenté")
|
|
|
|
|
def test_pdf_output_has_xmp_metadata(sample_pdf_ok: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""B-1 — Le PDF rédigé doit contenir des métadonnées XMP avec :
|
|
|
|
|
/CreatorTool = "Pseudonymisation vX.Y", /Producer contenant le commit."""
|
|
|
|
|
import fitz # noqa: F401
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
errors_log = out / "errors.log"
|
|
|
|
|
assert errors_log.exists()
|
|
|
|
|
# Vérifier permissions (0o600)
|
|
|
|
|
mode = errors_log.stat().st_mode & 0o777
|
|
|
|
|
assert mode == 0o600, f"errors.log permissions should be 0600, got {oct(mode)}"
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_ok, output_dir=tmp_output_dir, ...)
|
|
|
|
|
lines = errors_log.read_text().splitlines()
|
|
|
|
|
assert len(lines) == 2
|
|
|
|
|
|
|
|
|
|
pdf_path = tmp_output_dir / "doc_ok.redacted.pdf"
|
|
|
|
|
doc = fitz.open(pdf_path)
|
|
|
|
|
metadata: dict[str, Any] = doc.metadata or {}
|
|
|
|
|
doc.close()
|
|
|
|
|
for line in lines:
|
|
|
|
|
entry = json.loads(line) # doit parser sans erreur
|
|
|
|
|
assert "ts" in entry
|
|
|
|
|
assert "doc" in entry
|
|
|
|
|
assert "level" in entry
|
|
|
|
|
assert "category" in entry
|
|
|
|
|
assert "msg" in entry
|
|
|
|
|
assert "severity" in entry
|
|
|
|
|
|
|
|
|
|
assert "Pseudonymisation" in metadata.get("creator", "")
|
|
|
|
|
assert metadata.get("producer", "") != ""
|
|
|
|
|
assert lines[0].startswith("{") # JSON-lines format
|
|
|
|
|
entry1 = json.loads(lines[0])
|
|
|
|
|
assert entry1["severity"] == "full"
|
|
|
|
|
assert entry1["category"] == "preflight"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Tests B-2 : logs exportables ===============================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="B-2 pas encore implémenté")
|
|
|
|
|
def test_per_document_log_file_created(sample_pdf_ok: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""B-2 — Chaque document traité doit produire un fichier <docname>.log
|
|
|
|
|
à côté du .audit.jsonl."""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_ok, output_dir=tmp_output_dir, ...)
|
|
|
|
|
|
|
|
|
|
log_path = tmp_output_dir / "doc_ok.log"
|
|
|
|
|
assert log_path.exists()
|
|
|
|
|
content = log_path.read_text()
|
|
|
|
|
assert "extraction" in content.lower() or "process" in content.lower()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="B-2 pas encore implémenté")
|
|
|
|
|
def test_errors_log_cumulative(tmp_output_dir: Path) -> None:
|
|
|
|
|
"""B-2 — Un fichier errors.log cumulatif doit être maintenu dans output_dir
|
|
|
|
|
pendant un batch."""
|
|
|
|
|
# batch_process([sample_pdf_ok, sample_pdf_redaction_fails], output_dir=tmp_output_dir)
|
|
|
|
|
errors_log = tmp_output_dir / "errors.log"
|
|
|
|
|
assert errors_log.exists()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Tests Q-1 : autonomie quarantaine (no UI) =================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(strict=True, reason="Q-1 pas encore implémenté")
|
|
|
|
|
def test_quarantine_index_file_generated(tmp_output_dir: Path) -> None:
|
|
|
|
|
"""Q-1 (no-UI) — Un INDEX.md doit lister tous les docs en quarantaine
|
|
|
|
|
avec leur raison. Permet au bêta-testeur de comprendre sans GUI."""
|
|
|
|
|
# batch_process([sample_pdf_empty_text, sample_pdf_redaction_fails], output_dir=tmp_output_dir)
|
|
|
|
|
index = tmp_output_dir / "quarantaine" / "INDEX.md"
|
|
|
|
|
assert index.exists()
|
|
|
|
|
content = index.read_text()
|
|
|
|
|
assert "doc_empty" in content
|
|
|
|
|
assert "doc_redact_fail" in content
|
|
|
|
|
entry2 = json.loads(lines[1])
|
|
|
|
|
assert entry2["severity"] == "partial"
|
|
|
|
|
assert entry2["category"] == "pdf"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Tests de non-régression ====================================
|
|
|
|
|
|
|
|
|
|
def test_happy_path_no_quarantine_created_if_no_failure(sample_pdf_ok: Path, tmp_output_dir: Path) -> None:
|
|
|
|
|
"""Non-régression — Sur un document qui se traite normalement,
|
|
|
|
|
aucun dossier `quarantaine/` ne doit être créé (économise du bruit)."""
|
|
|
|
|
from anonymizer_core_refactored_onnx import process_pdf # noqa: F401
|
|
|
|
|
def test_happy_path_no_quarantine_created_if_no_failure(tmp_path: Path) -> None:
|
|
|
|
|
"""Non-régression — Sans flag, aucun dossier quarantaine/ créé."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
# process_pdf(sample_pdf_ok, output_dir=tmp_output_dir, ...)
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
mgr = QuarantineManager(out, app_version="0.11.0")
|
|
|
|
|
# Aucun flag → pas de quarantine_dir créé
|
|
|
|
|
assert not (out / "quarantaine").exists()
|
|
|
|
|
|
|
|
|
|
assert not (tmp_output_dir / "quarantaine").exists() or \
|
|
|
|
|
len(list((tmp_output_dir / "quarantaine").iterdir())) == 0
|
|
|
|
|
|
|
|
|
|
# === Tests security : permissions + symlink =====================
|
|
|
|
|
|
|
|
|
|
class TestSecurity:
|
|
|
|
|
"""Tests des fixes sécurité (Criticals 1-2, M1-M2)."""
|
|
|
|
|
|
|
|
|
|
def test_quarantine_dir_permissions(self, tmp_path: Path) -> None:
|
|
|
|
|
"""quarantine_dir doit avoir des permissions 0o700."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
mgr = QuarantineManager(out)
|
|
|
|
|
mgr.flag(doc_name="doc1", reason="test", detail="test", severity="full")
|
|
|
|
|
|
|
|
|
|
qdir = out / "quarantaine"
|
|
|
|
|
mode = qdir.stat().st_mode & 0o777
|
|
|
|
|
assert mode == 0o700, f"quarantine_dir should be 0700, got {oct(mode)}"
|
|
|
|
|
|
|
|
|
|
def test_symlink_errors_log_refused(self, tmp_path: Path) -> None:
|
|
|
|
|
"""Si errors.log est un symlink, _append_errors_log doit refuser d'écrire
|
|
|
|
|
(O_NOFOLLOW lève OSError)."""
|
|
|
|
|
from quarantine import QuarantineManager
|
|
|
|
|
|
|
|
|
|
out = tmp_path / "output"
|
|
|
|
|
out.mkdir()
|
|
|
|
|
target = tmp_path / "symlink_target.txt"
|
|
|
|
|
target.write_text("innocent")
|
|
|
|
|
(out / "errors.log").symlink_to(target)
|
|
|
|
|
|
|
|
|
|
mgr = QuarantineManager(out)
|
|
|
|
|
# O_NOFOLLOW lève OSError (ELOOP), pas RuntimeError
|
|
|
|
|
with pytest.raises(OSError):
|
|
|
|
|
mgr.flag(doc_name="doc1", reason="test", detail="test", severity="full")
|
|
|
|
|
|
|
|
|
|
def test_o_nofollow_refuses_symlink_at_creation(self, tmp_path: Path) -> None:
|
|
|
|
|
"""os.open(O_NOFOLLOW) doit refuser la création via symlink."""
|
|
|
|
|
import os as _os
|
|
|
|
|
target = tmp_path / "target.txt"
|
|
|
|
|
target.write_text("innocent")
|
|
|
|
|
link = tmp_path / "errors.log"
|
|
|
|
|
link.symlink_to(target)
|
|
|
|
|
|
|
|
|
|
with pytest.raises(OSError):
|
|
|
|
|
fd = _os.open(str(link), _os.O_CREAT | _os.O_APPEND | _os.O_WRONLY | _os.O_NOFOLLOW, 0o600)
|
|
|
|
|
_os.close(fd)
|
|
|
|
|
|