feat(core): _category_of dérivé (anti-dérive) + filtre audit Tier 1 (P1-2/F-1)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -36,7 +36,7 @@ for _env in ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS",
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Dict, Tuple, Optional, Any
|
from typing import List, Dict, Tuple, Optional, Any, Set
|
||||||
|
|
||||||
|
|
||||||
def _bundle_root() -> Path:
|
def _bundle_root() -> Path:
|
||||||
@@ -609,6 +609,85 @@ PLACEHOLDERS = {
|
|||||||
|
|
||||||
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"}
|
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"}
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Gating par catégorie (Plan 1b — P1-2/F-1)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Les 7 catégories toggleables de la GUI ↔ type de placeholder. Tout autre
|
||||||
|
# placeholder → None ⇒ default-deny (reste TOUJOURS masqué). Dérivation
|
||||||
|
# anti-dérive : on lit les maps sources (VLM/EDS) plutôt qu'une table figée.
|
||||||
|
_PLACEHOLDER_TO_CATEGORY = {
|
||||||
|
"NOM": "NOM", "DATE_NAISSANCE": "DATE_NAISSANCE", "ETAB": "ETAB",
|
||||||
|
"ADRESSE": "ADRESSE", "NIR": "NIR", "TEL": "TEL", "ADHERENT": "ADHERENT",
|
||||||
|
"CODE_POSTAL": "ADRESSE", # décision Dom 2026-06-26 : CP suit le toggle « Adresses »
|
||||||
|
}
|
||||||
|
# Kinds regex/inline non dérivables d'une map → leur catégorie explicitement.
|
||||||
|
# ⚠ Table manuelle : ajouter ici tout nouveau kind regex/inline d'une des 7
|
||||||
|
# catégories (non couvert par le test anti-dérive, qui ne vérifie que VLM/EDS).
|
||||||
|
_EXPLICIT_KIND_CATEGORY = {
|
||||||
|
"NOM_FORCE": "NOM", "NOM_EXTRACTED": "NOM", "NOM_INITIAL": "NOM",
|
||||||
|
"NER_PER": "NOM", "NER_ORG": "ETAB",
|
||||||
|
"ETAB_FINESS": "ETAB", "ETAB_SPACED": "ETAB",
|
||||||
|
"ADDR_FINESS": "ADRESSE",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _placeholder_to_category(placeholder):
|
||||||
|
"""Type de placeholder → catégorie toggleable (ou None)."""
|
||||||
|
return _PLACEHOLDER_TO_CATEGORY.get(str(placeholder).strip("[]").upper())
|
||||||
|
|
||||||
|
|
||||||
|
def _category_of(kind):
|
||||||
|
"""Catégorie toggleable d'un kind d'audit, ou None (default-deny → masqué).
|
||||||
|
|
||||||
|
Ordre de dérivation anti-dérive :
|
||||||
|
1. suffixe ``_GLOBAL`` → recatégoriser la base ;
|
||||||
|
2. table explicite des kinds regex/inline ;
|
||||||
|
3. kind = placeholder toggleable lui-même ;
|
||||||
|
4. ``VLM_*`` → placeholder via l'inverse de ``VLM_CATEGORY_MAP`` ;
|
||||||
|
5. ``EDS_*`` → label → placeholder via ``EDS_LABEL_MAP`` ;
|
||||||
|
6. sinon ``None`` (default-deny → toujours masqué).
|
||||||
|
"""
|
||||||
|
if not kind:
|
||||||
|
return None
|
||||||
|
if kind.endswith("_GLOBAL"):
|
||||||
|
return _category_of(kind[: -len("_GLOBAL")])
|
||||||
|
if kind in _EXPLICIT_KIND_CATEGORY:
|
||||||
|
return _EXPLICIT_KIND_CATEGORY[kind]
|
||||||
|
if kind in _PLACEHOLDER_TO_CATEGORY:
|
||||||
|
return _PLACEHOLDER_TO_CATEGORY[kind]
|
||||||
|
if kind.startswith("VLM_"):
|
||||||
|
try:
|
||||||
|
import vlm_manager
|
||||||
|
rev = {k: ph for (k, ph) in vlm_manager.VLM_CATEGORY_MAP.values()}
|
||||||
|
return _placeholder_to_category(rev.get(kind))
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
if kind.startswith("EDS_"):
|
||||||
|
try:
|
||||||
|
import eds_pseudo_manager
|
||||||
|
label = kind[len("EDS_"):]
|
||||||
|
ph = eds_pseudo_manager.EDS_LABEL_MAP.get(label, label)
|
||||||
|
return _placeholder_to_category(ph)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _filter_audit_by_disabled(audit, disabled_kinds):
|
||||||
|
"""Retire de l'audit les hits dont la CATÉGORIE (sortie de _category_of) est désactivée.
|
||||||
|
|
||||||
|
``disabled_kinds`` = set des CATÉGORIES désactivées (les 7 toggles :
|
||||||
|
"NOM", "DATE_NAISSANCE", "ETAB", "ADRESSE", "NIR", "TEL", "ADHERENT"),
|
||||||
|
PAS des kinds bruts. Le nom du paramètre reste ``disabled_kinds`` par
|
||||||
|
cohérence avec le plan / la GUI.
|
||||||
|
|
||||||
|
No-op si ``disabled_kinds`` est vide/None (garantie de non-régression).
|
||||||
|
"""
|
||||||
|
if not disabled_kinds:
|
||||||
|
return audit
|
||||||
|
return [h for h in audit if _category_of(h.kind) not in disabled_kinds]
|
||||||
|
|
||||||
|
|
||||||
# Baseline regex
|
# Baseline regex
|
||||||
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
|
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
|
||||||
RE_URL = re.compile(r"https?://[A-Za-z0-9._~:/?#\[\]@!$&'()*+,;=\-%]+", re.IGNORECASE)
|
RE_URL = re.compile(r"https?://[A-Za-z0-9._~:/?#\[\]@!$&'()*+,;=\-%]+", re.IGNORECASE)
|
||||||
@@ -4984,6 +5063,7 @@ def process_pdf(
|
|||||||
gliner_manager=None,
|
gliner_manager=None,
|
||||||
camembert_manager=None,
|
camembert_manager=None,
|
||||||
quarantine_mgr: Optional["QuarantineManager"] = None,
|
quarantine_mgr: Optional["QuarantineManager"] = None,
|
||||||
|
disabled_kinds: Optional[Set[str]] = None,
|
||||||
) -> Dict[str, str]:
|
) -> Dict[str, str]:
|
||||||
perf_t0 = time.perf_counter()
|
perf_t0 = time.perf_counter()
|
||||||
last_mark = perf_t0
|
last_mark = perf_t0
|
||||||
@@ -5000,6 +5080,10 @@ def process_pdf(
|
|||||||
_log_env_banner()
|
_log_env_banner()
|
||||||
out_dir.mkdir(parents=True, exist_ok=True)
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
cfg = load_dictionaries(config_path)
|
cfg = load_dictionaries(config_path)
|
||||||
|
# Plan 1b (P1-2/F-1) : catégories désactivées dans la GUI. Vide par défaut
|
||||||
|
# ⇒ no-op (aucun changement de comportement vs aujourd'hui).
|
||||||
|
# NB: catégories (sortie de _category_of), pas kinds bruts.
|
||||||
|
cfg["disabled_kinds"] = set(disabled_kinds or ())
|
||||||
_perf_mark("load_config")
|
_perf_mark("load_config")
|
||||||
pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path)
|
pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path)
|
||||||
_perf_mark("extract_text_ocr")
|
_perf_mark("extract_text_ocr")
|
||||||
@@ -5518,6 +5602,15 @@ def process_pdf(
|
|||||||
return {"status": "quarantined", "reason": "rescan_residual_pii",
|
return {"status": "quarantined", "reason": "rescan_residual_pii",
|
||||||
"residual_count": residual_count, "text": "", "audit": ""}
|
"residual_count": residual_count, "text": "", "audit": ""}
|
||||||
|
|
||||||
|
# Plan 1b (P1-2/F-1) — Filtre Tier 1 : retire de l'audit les hits dont la
|
||||||
|
# catégorie est désactivée, JUSTE AVANT le burn PDF et l'écriture de l'audit.
|
||||||
|
# Comme le burn PDF (vector + raster) et le .audit.jsonl dérivent tous de
|
||||||
|
# anon.audit, cette mutation unique en place sécurise le livrable PDF et la
|
||||||
|
# piste d'audit. No-op si disabled_kinds est vide (non-régression).
|
||||||
|
_disabled = cfg.get("disabled_kinds") or set()
|
||||||
|
if _disabled:
|
||||||
|
anon.audit = _filter_audit_by_disabled(anon.audit, _disabled)
|
||||||
|
|
||||||
# Sauvegardes
|
# Sauvegardes
|
||||||
base = pdf_path.stem
|
base = pdf_path.stem
|
||||||
txt_path = out_dir / f"{base}.pseudonymise.txt"
|
txt_path = out_dir / f"{base}.pseudonymise.txt"
|
||||||
|
|||||||
47
tests/unit/test_core_category_gating.py
Normal file
47
tests/unit/test_core_category_gating.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
import anonymizer_core_refactored_onnx as core
|
||||||
|
|
||||||
|
|
||||||
|
def test_category_of_each_source():
|
||||||
|
assert core._category_of("NOM_FORCE") == "NOM" # explicite/regex
|
||||||
|
assert core._category_of("NIR") == "NIR" # placeholder-self
|
||||||
|
assert core._category_of("NIR_GLOBAL") == "NIR" # suffixe _GLOBAL
|
||||||
|
assert core._category_of("ADHERENT_GLOBAL") == "ADHERENT"
|
||||||
|
assert core._category_of("VLM_NOM") == "NOM" # dérivé VLM
|
||||||
|
assert core._category_of("VLM_ETAB") == "ETAB"
|
||||||
|
assert core._category_of("EDS_SECU") == "NIR" # dérivé EDS (SECU→NIR)
|
||||||
|
assert core._category_of("EDS_HOPITAL") == "ETAB"
|
||||||
|
assert core._category_of("VLM_CP") == "ADRESSE" # CP suit « Adresses » (Dom 2026-06-26)
|
||||||
|
assert core._category_of("EDS_ZIP") == "ADRESSE"
|
||||||
|
|
||||||
|
|
||||||
|
def test_category_of_default_deny():
|
||||||
|
# Non toggleables → None (restent TOUJOURS masqués). Sécurité.
|
||||||
|
# NB : VILLE reste masquée ; seul CODE_POSTAL (VLM_CP/EDS_ZIP) a été basculé vers ADRESSE.
|
||||||
|
for k in ("EMAIL", "IBAN", "IPP", "VILLE", "FAX",
|
||||||
|
"VLM_VILLE", "EMAIL_GLOBAL", "INCONNU_XYZ"):
|
||||||
|
assert core._category_of(k) is None, k
|
||||||
|
# Garde de terminaison de la récursion (_GLOBAL strip) : entrées vides.
|
||||||
|
assert core._category_of(None) is None
|
||||||
|
assert core._category_of("") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_toggleable_vlm_or_eds_kind_is_uncategorised():
|
||||||
|
# ANTI-DÉRIVE : tout kind VLM/EDS dont le placeholder est une des 7 catégories
|
||||||
|
# DOIT être catégorisé (sinon toggle faussé sur ce chemin).
|
||||||
|
import vlm_manager, eds_pseudo_manager
|
||||||
|
seven = {"NOM", "DATE_NAISSANCE", "ETAB", "ADRESSE", "NIR", "TEL", "ADHERENT"}
|
||||||
|
for _label, (kind, placeholder) in vlm_manager.VLM_CATEGORY_MAP.items():
|
||||||
|
if core._placeholder_to_category(placeholder) in seven:
|
||||||
|
assert core._category_of(kind) is not None, f"VLM {kind} non catégorisé"
|
||||||
|
for label, placeholder in eds_pseudo_manager.EDS_LABEL_MAP.items():
|
||||||
|
if core._placeholder_to_category(placeholder) in seven:
|
||||||
|
assert core._category_of(f"EDS_{label}") is not None, f"EDS_{label} non catégorisé"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_audit_drops_only_disabled():
|
||||||
|
PiiHit = core.PiiHit
|
||||||
|
audit = [PiiHit(1, "NOM", "Dupont", "[NOM]"), PiiHit(1, "NIR", "1850574", "[NIR]"),
|
||||||
|
PiiHit(1, "EMAIL", "x@y.fr", "[EMAIL]"), PiiHit(1, "NIR_GLOBAL", "1850574", "[NIR]")]
|
||||||
|
kinds = {h.kind for h in core._filter_audit_by_disabled(audit, {"NIR"})}
|
||||||
|
assert "NIR" not in kinds and "NIR_GLOBAL" not in kinds # NIR + propagation retirés
|
||||||
|
assert "NOM" in kinds and "EMAIL" in kinds # autres conservés
|
||||||
Reference in New Issue
Block a user