feat(core): garde-fou adresse burn + doc chemins conservateurs (P1-2/F-3)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -4812,9 +4812,14 @@ def _apply_pseudo_xmp_metadata(doc) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, ocr_word_map: OcrWordMap = None) -> None:
|
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, ocr_word_map: OcrWordMap = None, disabled_kinds: Optional[Set[str]] = None) -> None:
|
||||||
if fitz is None:
|
if fitz is None:
|
||||||
raise RuntimeError("PyMuPDF non disponible – installez pymupdf.")
|
raise RuntimeError("PyMuPDF non disponible – installez pymupdf.")
|
||||||
|
# Plan 1b (P1-2/F-3) — `disabled_kinds` = set des CATÉGORIES décochées
|
||||||
|
# (les 7 toggles). Sert UNIQUEMENT à gater les chemins de burn qui ne
|
||||||
|
# dérivent PAS de l'audit (le filtre audit de la Task 1 couvre déjà tout le
|
||||||
|
# reste). Vide/None ⇒ comportement par défaut byte-for-byte (non-régression).
|
||||||
|
disabled = disabled_kinds or set()
|
||||||
doc = fitz.open(str(original_pdf))
|
doc = fitz.open(str(original_pdf))
|
||||||
# index hits par page; page==-1 → rechercher sur toutes pages
|
# index hits par page; page==-1 → rechercher sur toutes pages
|
||||||
by_page: Dict[int, List[PiiHit]] = {}
|
by_page: Dict[int, List[PiiHit]] = {}
|
||||||
@@ -4822,6 +4827,14 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, oc
|
|||||||
by_page.setdefault(h.page, []).append(h)
|
by_page.setdefault(h.page, []).append(h)
|
||||||
# Kinds à ne pas chercher dans le PDF (dates masquées uniquement dans le texte,
|
# Kinds à ne pas chercher dans le PDF (dates masquées uniquement dans le texte,
|
||||||
# pas dans le PDF où elles rendent les tableaux illisibles)
|
# pas dans le PDF où elles rendent les tableaux illisibles)
|
||||||
|
# Plan 1b (P1-2/F-3) — _VECTOR_SKIP_KINDS : exclusion TOUJOURS appliquée
|
||||||
|
# (indépendante des toggles). Les dates EDS et la propagation globale DDN ne
|
||||||
|
# sont jamais brûlées dans le PDF (elles rendraient les tableaux illisibles),
|
||||||
|
# seulement masquées dans le texte. C'est un RETRAIT du burn, jamais un ajout :
|
||||||
|
# il ne peut donc pas réintroduire une catégorie décochée (déjà retirée de
|
||||||
|
# l'audit par _filter_audit_by_disabled / Task 1) ni faire brûler une catégorie
|
||||||
|
# cochée absente de ce set. Aucun conflit avec le toggle DATE_NAISSANCE :
|
||||||
|
# activer DATE_NAISSANCE ne fait PAS brûler ces kinds dans le PDF, c'est voulu.
|
||||||
_VECTOR_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL", "DATE_NAISSANCE_GLOBAL"}
|
_VECTOR_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL", "DATE_NAISSANCE_GLOBAL"}
|
||||||
# Kinds sensibles au substring matching : utiliser _search_whole_word
|
# Kinds sensibles au substring matching : utiliser _search_whole_word
|
||||||
_VECTOR_WHOLEWORD_KINDS = {"NOM", "NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
|
_VECTOR_WHOLEWORD_KINDS = {"NOM", "NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
|
||||||
@@ -4830,7 +4843,10 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, oc
|
|||||||
for pno in range(len(doc)):
|
for pno in range(len(doc)):
|
||||||
page = doc[pno]
|
page = doc[pno]
|
||||||
hits = by_page.get(pno, []) + by_page.get(-1, [])
|
hits = by_page.get(pno, []) + by_page.get(-1, [])
|
||||||
all_rects = _search_pdf_address_lines(page)
|
# Plan 1b (P1-2/F-3) — Chemin de burn INDÉPENDANT de l'audit : recherche
|
||||||
|
# géométrique des lignes d'adresse. Le filtre audit (Task 1) ne le couvre
|
||||||
|
# pas, donc on le gate explicitement sous la catégorie ADRESSE.
|
||||||
|
all_rects = [] if "ADRESSE" in disabled else _search_pdf_address_lines(page)
|
||||||
if not hits and not all_rects:
|
if not hits and not all_rects:
|
||||||
continue
|
continue
|
||||||
# Dédupliquer les tokens : (token, kind) → rechercher une seule fois par page
|
# Dédupliquer les tokens : (token, kind) → rechercher une seule fois par page
|
||||||
@@ -4929,6 +4945,13 @@ def _rasterize_page(args):
|
|||||||
if rx1 > rx0:
|
if rx1 > rx0:
|
||||||
draw.rectangle([rx0, ry0, rx1, ry1], fill=(0, 0, 0))
|
draw.rectangle([rx0, ry0, rx1, ry1], fill=(0, 0, 0))
|
||||||
# Noircir les images embarquées (logos, signatures, captures d'écran)
|
# Noircir les images embarquées (logos, signatures, captures d'écran)
|
||||||
|
# Plan 1b (P1-2/F-3) — Choix DÉLIBÉRÉ : les blackouts d'images et de
|
||||||
|
# codes-barres/QR (ci-dessous) restent TOUJOURS actifs, NON toggleables.
|
||||||
|
# Une image ou un code-barres peut encoder n'importe quelle PII (nom dans un
|
||||||
|
# logo signé, NIR/IPP dans un code-barres). Ces masques sont conservateurs :
|
||||||
|
# ils peuvent sur-masquer, mais ne doivent JAMAIS laisser fuir. Les gater
|
||||||
|
# sous une catégorie (NOM/ADRESSE/…) serait impossible à décider de façon
|
||||||
|
# sûre depuis la seule géométrie → on les garde inconditionnels.
|
||||||
for (x0, y0, x1, y1) in image_rects_tuples:
|
for (x0, y0, x1, y1) in image_rects_tuples:
|
||||||
rx0 = x0 * zoom
|
rx0 = x0 * zoom
|
||||||
ry0 = y0 * zoom
|
ry0 = y0 * zoom
|
||||||
@@ -4976,11 +4999,18 @@ def _rasterize_page(args):
|
|||||||
return pno, buf.getvalue(), rect_w, rect_h
|
return pno, buf.getvalue(), rect_w, rect_h
|
||||||
|
|
||||||
|
|
||||||
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 120, ogc_label: Optional[str] = None, ocr_word_map: OcrWordMap = None, jpeg_quality: int = 80) -> None:
|
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 120, ogc_label: Optional[str] = None, ocr_word_map: OcrWordMap = None, jpeg_quality: int = 80, disabled_kinds: Optional[Set[str]] = None) -> None:
|
||||||
if fitz is None:
|
if fitz is None:
|
||||||
raise RuntimeError("PyMuPDF non disponible – installez pymupdf.")
|
raise RuntimeError("PyMuPDF non disponible – installez pymupdf.")
|
||||||
|
# Plan 1b (P1-2/F-3) — voir redact_pdf_vector : `disabled_kinds` gate les
|
||||||
|
# chemins de burn hors-audit. Vide/None ⇒ comportement par défaut.
|
||||||
|
disabled = disabled_kinds or set()
|
||||||
doc = fitz.open(str(original_pdf))
|
doc = fitz.open(str(original_pdf))
|
||||||
all_rects: Dict[int, List["fitz.Rect"]] = {}
|
all_rects: Dict[int, List["fitz.Rect"]] = {}
|
||||||
|
# Plan 1b (P1-2/F-3) — _RASTER_SKIP_KINDS : exclusion TOUJOURS appliquée
|
||||||
|
# (cf. _VECTOR_SKIP_KINDS). Retrait du burn uniquement, jamais d'ajout :
|
||||||
|
# aucun conflit avec le toggle DATE_NAISSANCE (ces kinds restent hors PDF
|
||||||
|
# quel que soit l'état du toggle, c'est voulu — dates illisibles en tableau).
|
||||||
_RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL", "DATE_NAISSANCE_GLOBAL"}
|
_RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL", "DATE_NAISSANCE_GLOBAL"}
|
||||||
# Kinds sensibles au substring matching : utiliser _search_whole_word
|
# Kinds sensibles au substring matching : utiliser _search_whole_word
|
||||||
_RASTER_WHOLEWORD_KINDS = {"NOM", "NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
|
_RASTER_WHOLEWORD_KINDS = {"NOM", "NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
|
||||||
@@ -5002,7 +5032,9 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
|
|||||||
rects.append(fitz.Rect(margin, margin, page.rect.width - margin, page.rect.height - margin))
|
rects.append(fitz.Rect(margin, margin, page.rect.width - margin, page.rect.height - margin))
|
||||||
all_rects[pno] = rects
|
all_rects[pno] = rects
|
||||||
continue
|
continue
|
||||||
rects = _search_pdf_address_lines(page)
|
# Plan 1b (P1-2/F-3) — Chemin de burn INDÉPENDANT de l'audit (recherche
|
||||||
|
# géométrique d'adresse), gaté sous la catégorie ADRESSE comme en vector.
|
||||||
|
rects = [] if "ADRESSE" in disabled else _search_pdf_address_lines(page)
|
||||||
for h in hits:
|
for h in hits:
|
||||||
token = h.original.strip()
|
token = h.original.strip()
|
||||||
if not token or h.kind in _RASTER_SKIP_KINDS:
|
if not token or h.kind in _RASTER_SKIP_KINDS:
|
||||||
@@ -5867,7 +5899,7 @@ def process_pdf(
|
|||||||
if make_vector_redaction and fitz is not None:
|
if make_vector_redaction and fitz is not None:
|
||||||
vec_path = out_dir / f"{base}.redacted_vector.pdf"
|
vec_path = out_dir / f"{base}.redacted_vector.pdf"
|
||||||
try:
|
try:
|
||||||
redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map)
|
redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map, disabled_kinds=_disabled)
|
||||||
outputs["pdf_vector"] = str(vec_path)
|
outputs["pdf_vector"] = str(vec_path)
|
||||||
_perf_mark("pdf_vector")
|
_perf_mark("pdf_vector")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -5883,6 +5915,7 @@ def process_pdf(
|
|||||||
redact_pdf_raster(
|
redact_pdf_raster(
|
||||||
pdf_path, anon.audit, ras_fb_path,
|
pdf_path, anon.audit, ras_fb_path,
|
||||||
ogc_label=ogc_label, ocr_word_map=ocr_word_map,
|
ogc_label=ogc_label, ocr_word_map=ocr_word_map,
|
||||||
|
disabled_kinds=_disabled,
|
||||||
)
|
)
|
||||||
outputs["pdf_raster"] = str(ras_fb_path)
|
outputs["pdf_raster"] = str(ras_fb_path)
|
||||||
raster_fallback_ok = True
|
raster_fallback_ok = True
|
||||||
@@ -5924,7 +5957,7 @@ def process_pdf(
|
|||||||
# S5 : ne pas refaire le raster si le fallback vector→raster l'a déjà produit
|
# S5 : ne pas refaire le raster si le fallback vector→raster l'a déjà produit
|
||||||
if "pdf_raster" not in outputs:
|
if "pdf_raster" not in outputs:
|
||||||
ras_path = out_dir / f"{base}.redacted_raster.pdf"
|
ras_path = out_dir / f"{base}.redacted_raster.pdf"
|
||||||
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map)
|
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map, disabled_kinds=_disabled)
|
||||||
outputs["pdf_raster"] = str(ras_path)
|
outputs["pdf_raster"] = str(ras_path)
|
||||||
_perf_mark("pdf_raster")
|
_perf_mark("pdf_raster")
|
||||||
log.info("PERF %s: done total=%.2fs outputs=%s", pdf_path.name, time.perf_counter() - perf_t0, sorted(outputs.keys()))
|
log.info("PERF %s: done total=%.2fs outputs=%s", pdf_path.name, time.perf_counter() - perf_t0, sorted(outputs.keys()))
|
||||||
|
|||||||
160
tests/unit/test_core_address_burn_guard.py
Normal file
160
tests/unit/test_core_address_burn_guard.py
Normal file
@@ -0,0 +1,160 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Plan 1b — Task 4 (P1-2/F-3) : garde-fou du burn adresse géométrique.
|
||||||
|
|
||||||
|
`_search_pdf_address_lines` est un chemin de caviardage INDÉPENDANT de
|
||||||
|
l'audit : il noircit directement les lignes d'adresse trouvées
|
||||||
|
géométriquement sur la page (cf. `test_pdf_redaction_directly_masks_finess_address_range`).
|
||||||
|
Le filtre d'audit de la Task 1 ne le couvre donc PAS.
|
||||||
|
|
||||||
|
Ces tests vérifient que ce chemin est gaté sous la catégorie ADRESSE :
|
||||||
|
- ADRESSE désactivée → `_search_pdf_address_lines` n'est PAS appliqué ;
|
||||||
|
- ADRESSE activée (ou disabled vide) → il est appelé comme avant.
|
||||||
|
"""
|
||||||
|
import anonymizer_core_refactored_onnx as core
|
||||||
|
from anonymizer_core_refactored_onnx import (
|
||||||
|
PiiHit,
|
||||||
|
fitz,
|
||||||
|
redact_pdf_raster,
|
||||||
|
redact_pdf_vector,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_address_pdf(tmp_path):
|
||||||
|
source = tmp_path / "addr.pdf"
|
||||||
|
doc = fitz.open()
|
||||||
|
page = doc.new_page()
|
||||||
|
page.insert_text((72, 72), "15 à 35 rue Claude Boucher Bordeaux Cedex")
|
||||||
|
page.insert_text((72, 108), "Motif d'hospitalisation : contrôle clinique.")
|
||||||
|
doc.save(source)
|
||||||
|
doc.close()
|
||||||
|
return source
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VECTOR
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def test_vector_address_search_called_when_adresse_enabled(tmp_path, monkeypatch):
|
||||||
|
if fitz is None:
|
||||||
|
return
|
||||||
|
source = _make_address_pdf(tmp_path)
|
||||||
|
output = tmp_path / "addr.enabled.pdf"
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
real = core._search_pdf_address_lines
|
||||||
|
|
||||||
|
def _spy(page):
|
||||||
|
calls.append(page.number)
|
||||||
|
return real(page)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_search_pdf_address_lines", _spy)
|
||||||
|
|
||||||
|
# disabled vide → comportement par défaut (adresse cherchée)
|
||||||
|
redact_pdf_vector(source, [], output, disabled_kinds=set())
|
||||||
|
|
||||||
|
assert calls, "ADRESSE activée : _search_pdf_address_lines doit être appelé"
|
||||||
|
redacted = fitz.open(output)
|
||||||
|
text = redacted[0].get_text()
|
||||||
|
redacted.close()
|
||||||
|
# L'adresse a bien été caviardée (le burn géométrique s'applique)
|
||||||
|
assert "rue Claude Boucher" not in text
|
||||||
|
# La ligne clinique reste lisible
|
||||||
|
assert "Motif d'hospitalisation" in text
|
||||||
|
|
||||||
|
|
||||||
|
def test_vector_address_search_not_applied_when_adresse_disabled(tmp_path, monkeypatch):
|
||||||
|
if fitz is None:
|
||||||
|
return
|
||||||
|
source = _make_address_pdf(tmp_path)
|
||||||
|
output = tmp_path / "addr.disabled.pdf"
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
real = core._search_pdf_address_lines
|
||||||
|
|
||||||
|
def _spy(page):
|
||||||
|
calls.append(page.number)
|
||||||
|
return real(page)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_search_pdf_address_lines", _spy)
|
||||||
|
|
||||||
|
redact_pdf_vector(source, [], output, disabled_kinds={"ADRESSE"})
|
||||||
|
|
||||||
|
# Le burn géométrique d'adresse ne doit PAS être appliqué.
|
||||||
|
assert not calls, (
|
||||||
|
"ADRESSE désactivée : _search_pdf_address_lines ne doit pas être appliqué"
|
||||||
|
)
|
||||||
|
redacted = fitz.open(output)
|
||||||
|
text = redacted[0].get_text()
|
||||||
|
redacted.close()
|
||||||
|
# L'adresse reste lisible puisque la catégorie est décochée.
|
||||||
|
assert "rue Claude Boucher" in text
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# RASTER
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def test_raster_address_search_called_when_adresse_enabled(tmp_path, monkeypatch):
|
||||||
|
if fitz is None:
|
||||||
|
return
|
||||||
|
source = _make_address_pdf(tmp_path)
|
||||||
|
output = tmp_path / "addr.raster.enabled.pdf"
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
real = core._search_pdf_address_lines
|
||||||
|
|
||||||
|
def _spy(page):
|
||||||
|
calls.append(page.number)
|
||||||
|
return real(page)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_search_pdf_address_lines", _spy)
|
||||||
|
|
||||||
|
redact_pdf_raster(source, [], output, disabled_kinds=set())
|
||||||
|
|
||||||
|
assert calls, "ADRESSE activée (raster) : _search_pdf_address_lines doit être appelé"
|
||||||
|
|
||||||
|
|
||||||
|
def test_raster_address_search_not_applied_when_adresse_disabled(tmp_path, monkeypatch):
|
||||||
|
if fitz is None:
|
||||||
|
return
|
||||||
|
source = _make_address_pdf(tmp_path)
|
||||||
|
output = tmp_path / "addr.raster.disabled.pdf"
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
real = core._search_pdf_address_lines
|
||||||
|
|
||||||
|
def _spy(page):
|
||||||
|
calls.append(page.number)
|
||||||
|
return real(page)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_search_pdf_address_lines", _spy)
|
||||||
|
|
||||||
|
redact_pdf_raster(source, [], output, disabled_kinds={"ADRESSE"})
|
||||||
|
|
||||||
|
assert not calls, (
|
||||||
|
"ADRESSE désactivée (raster) : _search_pdf_address_lines ne doit pas être appliqué"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Non-régression : signature positionnelle d'origine + défaut byte-for-byte
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def test_vector_default_signature_still_calls_address_search(tmp_path, monkeypatch):
|
||||||
|
"""Sans disabled_kinds (appel positionnel d'origine), le burn adresse
|
||||||
|
reste actif — non-régression stricte."""
|
||||||
|
if fitz is None:
|
||||||
|
return
|
||||||
|
source = _make_address_pdf(tmp_path)
|
||||||
|
output = tmp_path / "addr.default.pdf"
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
real = core._search_pdf_address_lines
|
||||||
|
|
||||||
|
def _spy(page):
|
||||||
|
calls.append(page.number)
|
||||||
|
return real(page)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_search_pdf_address_lines", _spy)
|
||||||
|
|
||||||
|
# Appel d'origine : aucun argument disabled.
|
||||||
|
redact_pdf_vector(source, [PiiHit(0, "OGC", "14", "[OGC]")], output)
|
||||||
|
|
||||||
|
assert calls, "Défaut (pas de disabled) : burn adresse doit rester actif"
|
||||||
Reference in New Issue
Block a user