feat(core): gates texte par catégorie sur toutes les passes (P1-2/F-2/F-5)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-26 10:43:01 +02:00
parent dd392c4a50
commit a02bca516d
2 changed files with 527 additions and 147 deletions

View File

@@ -1829,6 +1829,12 @@ def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[s
def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
# Plan 1b (P1-2/F-2/F-5) — catégories décochées (les 7 toggles). Vide ⇒ no-op
# byte-for-byte. Chaque sous-bloc d'une catégorie toggleable est sauté si sa
# catégorie est désactivée (la valeur reste alors EN CLAIR dans le texte).
# Les kinds non toggleables (EMAIL, IBAN, FINESS, IPP, VILLE, …) → None ⇒
# default-deny ⇒ TOUJOURS masqués.
disabled = cfg.get("disabled_kinds") or set()
# EMAIL avant les overrides : les force_terms (ex: CHUXX) casseraient sinon l'adresse
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
@@ -1857,14 +1863,15 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
return raw # faux positif, on ne masque pas
audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"]))
return PLACEHOLDERS["NIR"]
line = RE_NIR.sub(_repl_nir, line)
# NIR 13 chiffres sans clé, STRICTEMENT après label (pas de validation modulo
# possible sans la clé ; l'ancre label suffit à éviter les faux positifs).
def _repl_nir_no_key(m: re.Match) -> str:
val = m.group(1)
audit.append(PiiHit(page_idx, "NIR", val, PLACEHOLDERS["NIR"]))
return m.group(0).replace(val, PLACEHOLDERS["NIR"])
line = RE_NIR_NO_KEY.sub(_repl_nir_no_key, line)
if "NIR" not in disabled:
line = RE_NIR.sub(_repl_nir, line)
line = RE_NIR_NO_KEY.sub(_repl_nir_no_key, line)
# FAX (label-ancré) AVANT TEL : un numéro de fax doit devenir [FAX], pas [TEL].
def _repl_fax(m: re.Match) -> str:
@@ -1877,9 +1884,10 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
line = RE_TEL_SLASH.sub(_repl_tel, line) # slash d'abord (plus spécifique)
line = RE_TEL.sub(_repl_tel, line)
line = RE_TEL_COMPACT.sub(_repl_tel, line)
if "TEL" not in disabled:
line = RE_TEL_SLASH.sub(_repl_tel, line) # slash d'abord (plus spécifique)
line = RE_TEL.sub(_repl_tel, line)
line = RE_TEL_COMPACT.sub(_repl_tel, line)
# IBAN
def _repl_iban(m: re.Match) -> str:
@@ -1905,13 +1913,14 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
def _repl_date_naissance(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"]))
return PLACEHOLDERS["DATE_NAISSANCE"]
line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line)
# « Né en 1972 » (année seule de naissance) → [DATE_NAISSANCE]
def _repl_date_naissance_annee(m: re.Match) -> str:
val = m.group(1)
audit.append(PiiHit(page_idx, "DATE_NAISSANCE", val, PLACEHOLDERS["DATE_NAISSANCE"]))
return m.group(0).replace(val, PLACEHOLDERS["DATE_NAISSANCE"])
line = RE_DATE_NAISSANCE_ANNEE.sub(_repl_date_naissance_annee, line)
if "DATE_NAISSANCE" not in disabled:
line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line)
line = RE_DATE_NAISSANCE_ANNEE.sub(_repl_date_naissance_annee, line)
# DATE générique — désactivé : seules les dates de naissance sont masquées
# def _repl_date(m: re.Match) -> str:
@@ -1919,23 +1928,23 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
# return PLACEHOLDERS["DATE"]
# line = RE_DATE.sub(_repl_date, line)
# ADRESSE
# ADRESSE — la catégorie « Adresses » couvre voie, BP et code postal
# (décision Dom 2026-06-26 : CODE_POSTAL suit le toggle ADRESSE).
def _repl_adresse(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
line = RE_ADRESSE.sub(_repl_adresse, line)
# BOITE POSTALE (BP)
def _repl_bp(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
line = RE_BP.sub(_repl_bp, line)
# CODE_POSTAL
def _repl_code_postal(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"]))
return PLACEHOLDERS["CODE_POSTAL"]
line = RE_CODE_POSTAL.sub(_repl_code_postal, line)
if "ADRESSE" not in disabled:
line = RE_ADRESSE.sub(_repl_adresse, line)
line = RE_BP.sub(_repl_bp, line)
line = RE_CODE_POSTAL.sub(_repl_code_postal, line)
# AGE
def _repl_age(m: re.Match) -> str:
@@ -1959,13 +1968,13 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
def _repl_lieu_dit(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
line = RE_ADRESSE_LIEU_DIT.sub(_repl_lieu_dit, line)
# Lieux-dits courants seuls sur une ligne (ex: "Le BOURG", "Le Village")
line = RE_LIEU_DIT_SEUL.sub(
lambda m: (audit.append(PiiHit(page_idx, "ADRESSE", m.group(1), PLACEHOLDERS["ADRESSE"])) or PLACEHOLDERS["ADRESSE"]),
line,
)
if "ADRESSE" not in disabled:
line = RE_ADRESSE_LIEU_DIT.sub(_repl_lieu_dit, line)
# Lieux-dits courants seuls sur une ligne (ex: "Le BOURG", "Le Village")
line = RE_LIEU_DIT_SEUL.sub(
lambda m: (audit.append(PiiHit(page_idx, "ADRESSE", m.group(1), PLACEHOLDERS["ADRESSE"])) or PLACEHOLDERS["ADRESSE"]),
line,
)
# N° EPISODE / Episode N. (pieds de page Trackare)
def _repl_episode(m: re.Match) -> str:
@@ -1990,26 +1999,29 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
audit.append(PiiHit(page_idx, "ADHERENT", val, PLACEHOLDERS["ADHERENT"]))
full = m.group(0)
return full[:full.find(val)] + PLACEHOLDERS["ADHERENT"]
line = RE_NUM_ADHERENT.sub(_repl_adherent, line)
line = RE_NUM_MUTUELLE.sub(_repl_adherent, line)
if "ADHERENT" not in disabled:
line = RE_NUM_ADHERENT.sub(_repl_adherent, line)
line = RE_NUM_MUTUELLE.sub(_repl_adherent, line)
# Établissements de santé (EHPAD Chicago, SSR Anonyme, Hôpital de Chicago, etc.)
def _repl_etab(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ETAB", m.group(0), PLACEHOLDERS["ETAB"]))
return PLACEHOLDERS["ETAB"]
line = RE_ETABLISSEMENT.sub(_repl_etab, line)
line = RE_HOPITAL_VILLE.sub(_repl_etab, line)
if "ETAB" not in disabled:
line = RE_ETABLISSEMENT.sub(_repl_etab, line)
line = RE_HOPITAL_VILLE.sub(_repl_etab, line)
# Établissements par gazetteer Aho-Corasick FINESS (116K noms distinctifs)
# Note: _mask_finess_establishments() construit l'automate en lazy au premier appel
line, finess_matched = _mask_finess_establishments(line, return_matched_names=True)
for matched_name in finess_matched:
audit.append(PiiHit(page_idx, "ETAB_FINESS", matched_name, PLACEHOLDERS["ETAB"]))
# Établissements par gazetteer Aho-Corasick FINESS (116K noms distinctifs)
# Note: _mask_finess_establishments() construit l'automate en lazy au premier appel
line, finess_matched = _mask_finess_establishments(line, return_matched_names=True)
for matched_name in finess_matched:
audit.append(PiiHit(page_idx, "ETAB_FINESS", matched_name, PLACEHOLDERS["ETAB"]))
# Adresses par gazetteer Aho-Corasick FINESS (28K noms de voie)
line, addr_matched = _mask_finess_addresses(line, return_matched_names=True)
for matched_addr in addr_matched:
audit.append(PiiHit(page_idx, "ADDR_FINESS", matched_addr, PLACEHOLDERS["ADRESSE"]))
if "ADRESSE" not in disabled:
line, addr_matched = _mask_finess_addresses(line, return_matched_names=True)
for matched_addr in addr_matched:
audit.append(PiiHit(page_idx, "ADDR_FINESS", matched_addr, PLACEHOLDERS["ADRESSE"]))
# Texte espacé d'en-tête : "C E N T R E H O S P I T A L I E R D E ..."
# Les lettres majuscules séparées par des espaces échappent à toute détection normale.
@@ -2028,7 +2040,7 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
"CENTRE", "ETABLISSEMENT", "MAISON", "RESIDENCE",
"EHPAD", "SSR", "USLD", "CHU", "CHRU",
}
spaced_matches = list(_RE_SPACED_TEXT.finditer(line))
spaced_matches = list(_RE_SPACED_TEXT.finditer(line)) if "ETAB" not in disabled else []
if spaced_matches:
# Vérifier si au moins un segment contient un mot-clé d'établissement
has_etab_keyword = False
@@ -2068,7 +2080,8 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
return full_match
audit.append(PiiHit(page_idx, "ETAB", full_match, PLACEHOLDERS["MASK"]))
return PLACEHOLDERS["MASK"]
line = RE_SERVICE.sub(_repl_service, line)
if "ETAB" not in disabled:
line = RE_SERVICE.sub(_repl_service, line)
# Ville en en-tête de courrier : "Chicago, le 12/03/2024" → masquer la ville
# Le contexte "Mot, le [date]" est fiable (virgule obligatoire)
@@ -2128,49 +2141,57 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
audit.append(PiiHit(page_idx, "NOM", cleaned, PLACEHOLDERS["NOM"]))
return raw.replace(cleaned, PLACEHOLDERS["NOM"])
line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line)
# Mr/Mme + initiale isolée : "Mme Z", "Mr R" → masquer la lettre
def _repl_civilite_init(m: re.Match) -> str:
prefix = m.group(1)
lettre = m.group(2)
audit.append(PiiHit(page_idx, "NOM", lettre, PLACEHOLDERS["NOM"]))
return prefix + PLACEHOLDERS["NOM"]
line = RE_CIVILITE_INITIALE.sub(_repl_civilite_init, line)
# Passe supplémentaire : noms dans des listes virgulées après "Dr"
# ex: "le Dr DUVAL, MACHELART, LAZARO" → masquer chaque nom
for m in RE_DR_COMMA_LIST.finditer(line):
fragment = m.group(0)
# Extraire les segments séparés par des virgules (sauf le premier qui inclut "Dr")
parts = [p.strip() for p in fragment.split(",")]
for part in parts:
# Extraire les tokens nom de chaque segment
for tok in _NAME_TOKEN_RE.findall(part):
if tok in wl_sections or len(tok) <= 3:
continue
if _stop_rx.fullmatch(tok):
continue
if tok not in line:
continue
# Vérifier qu'il n'est pas déjà masqué
if f"[{tok}]" in line or tok in {v for v in PLACEHOLDERS.values()}:
continue
audit.append(PiiHit(page_idx, "NOM", tok, PLACEHOLDERS["NOM"]))
line = re.sub(rf"\b{re.escape(tok)}\b", PLACEHOLDERS["NOM"], line)
if "NOM" not in disabled:
line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line)
line = RE_CIVILITE_INITIALE.sub(_repl_civilite_init, line)
# Passe supplémentaire : noms dans des listes virgulées après "Dr"
# ex: "le Dr DUVAL, MACHELART, LAZARO" → masquer chaque nom
for m in RE_DR_COMMA_LIST.finditer(line):
fragment = m.group(0)
# Extraire les segments séparés par des virgules (sauf le premier qui inclut "Dr")
parts = [p.strip() for p in fragment.split(",")]
for part in parts:
# Extraire les tokens nom de chaque segment
for tok in _NAME_TOKEN_RE.findall(part):
if tok in wl_sections or len(tok) <= 3:
continue
if _stop_rx.fullmatch(tok):
continue
if tok not in line:
continue
# Vérifier qu'il n'est pas déjà masqué
if f"[{tok}]" in line or tok in {v for v in PLACEHOLDERS.values()}:
continue
audit.append(PiiHit(page_idx, "NOM", tok, PLACEHOLDERS["NOM"]))
line = re.sub(rf"\b{re.escape(tok)}\b", PLACEHOLDERS["NOM"], line)
return line
def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int) -> str:
def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int,
disabled: Optional[Set[str]] = None) -> str:
"""Masque les TEL, EMAIL, ADRESSE, CODE_POSTAL même dans la partie 'clé' d'une ligne clé:valeur.
Nécessaire car des lignes comme '13 avenue ... CHICAGO - Tel : 0XXX' sont splitées sur ':'."""
Nécessaire car des lignes comme '13 avenue ... CHICAGO - Tel : 0XXX' sont splitées sur ':'.
Plan 1b (P1-2/F-2) : TEL et ADRESSE sont gatés par catégorie (EMAIL → toujours masqué).
FAX (non toggleable) est masqué+audité INCONDITIONNELLEMENT, hors gate TEL."""
disabled = disabled or set()
# FAX d'abord et SANS condition : si le numéro+libellé fax atterrit côté clé.
key = _mask_fax_unconditional(key, audit, page_idx)
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
key = RE_TEL_SLASH.sub(_repl_tel, key)
key = RE_TEL.sub(_repl_tel, key)
key = RE_TEL_COMPACT.sub(_repl_tel, key)
if "TEL" not in disabled:
key = RE_TEL_SLASH.sub(_repl_tel, key)
key = RE_TEL.sub(_repl_tel, key)
key = RE_TEL_COMPACT.sub(_repl_tel, key)
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
return PLACEHOLDERS["EMAIL"]
@@ -2179,16 +2200,17 @@ def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int) -> str:
def _repl_adresse(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
key = RE_ADRESSE.sub(_repl_adresse, key)
# CODE_POSTAL (inclut la ville)
def _repl_cp(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"]))
return PLACEHOLDERS["CODE_POSTAL"]
key = RE_CODE_POSTAL.sub(_repl_cp, key)
# FINESS adresses Aho-Corasick
key, addr_matched = _mask_finess_addresses(key, return_matched_names=True)
for matched_addr in addr_matched:
audit.append(PiiHit(page_idx, "ADDR_FINESS", matched_addr, PLACEHOLDERS["ADRESSE"]))
if "ADRESSE" not in disabled:
key = RE_ADRESSE.sub(_repl_adresse, key)
key = RE_CODE_POSTAL.sub(_repl_cp, key)
# FINESS adresses Aho-Corasick
key, addr_matched = _mask_finess_addresses(key, return_matched_names=True)
for matched_addr in addr_matched:
audit.append(PiiHit(page_idx, "ADDR_FINESS", matched_addr, PLACEHOLDERS["ADRESSE"]))
return key
@@ -2200,8 +2222,12 @@ def _replace_captured_value(full_match: str, captured_value: str, placeholder: s
return full_match[:start] + placeholder + full_match[end:]
def _mask_structured_line(line: str, audit: List[PiiHit], page_idx: int) -> str:
"""Masque les champs structurés dont la détection dépend du libellé de la ligne."""
def _mask_structured_line(line: str, audit: List[PiiHit], page_idx: int,
disabled: Optional[Set[str]] = None) -> str:
"""Masque les champs structurés dont la détection dépend du libellé de la ligne.
Plan 1b (P1-2/F-2) : CODE_POSTAL→ADRESSE, ADHERENT, et les libellés NOM sont
gatés par catégorie. DOSSIER/NDA/VILLE → toujours masqués (non toggleables)."""
disabled = disabled or set()
def _repl_code_postal(m: re.Match) -> str:
original = m.group(1) or m.group(2) or m.group(0)
@@ -2250,27 +2276,54 @@ def _mask_structured_line(line: str, audit: List[PiiHit], page_idx: int) -> str:
audit.append(PiiHit(page_idx, "NOM_INITIAL", m.group(3), PLACEHOLDERS["NOM"]))
return m.group(1) + PLACEHOLDERS["NOM"] + "/" + PLACEHOLDERS["NOM"]
masked = RE_CODE_POSTAL.sub(_repl_code_postal, line)
# CODE_POSTAL → catégorie ADRESSE (décision Dom 2026-06-26).
masked = line
if "ADRESSE" not in disabled:
masked = RE_CODE_POSTAL.sub(_repl_code_postal, masked)
# DOSSIER / NDA → toujours masqués (non toggleables).
masked = RE_NUM_EXAMEN_PATIENT.sub(_repl_num_examen, masked)
masked = RE_NUMERO_DOSSIER.sub(_repl_dossier, masked)
masked = RE_VENUE_SEJOUR.sub(_repl_venue, masked)
masked = RE_NUM_ADHERENT.sub(_repl_adherent, masked)
masked = RE_NUM_MUTUELLE.sub(_repl_adherent, masked)
masked = RE_LABEL_NOM_VARIANTES.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_LABEL_PRENOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_LABEL_NOM_PROFESSIONNEL.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_LABEL_STAFF_ROLE_NOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_HEADER_CROP_EPI_NOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_STANDALONE_COMPOUND_PERSON_LINE.sub(_repl_whole_line_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_MODIFIED_BY_NOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_REF_INITIALS_INLINE.sub(_repl_ref_initials, masked)
# N° adhérent → catégorie ADHERENT.
if "ADHERENT" not in disabled:
masked = RE_NUM_ADHERENT.sub(_repl_adherent, masked)
masked = RE_NUM_MUTUELLE.sub(_repl_adherent, masked)
# Libellés NOM (NOM_FORCE / NOM_INITIAL) → catégorie NOM.
if "NOM" not in disabled:
masked = RE_LABEL_NOM_VARIANTES.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_LABEL_PRENOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_LABEL_NOM_PROFESSIONNEL.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_LABEL_STAFF_ROLE_NOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_HEADER_CROP_EPI_NOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_STANDALONE_COMPOUND_PERSON_LINE.sub(_repl_whole_line_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_MODIFIED_BY_NOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
masked = RE_REF_INITIALS_INLINE.sub(_repl_ref_initials, masked)
# Ville → toujours masquée (non toggleable).
masked = RE_LABEL_VILLE.sub(_repl_label_with_placeholder("VILLE", "VILLE"), masked)
return masked
def _mask_fax_unconditional(line: str, audit: List[PiiHit], page_idx: int) -> str:
"""FAX est NON toggleable (`_category_of("FAX")` → None) ⇒ toujours masqué ET
inscrit à l'audit, indépendamment du toggle TEL. ``RE_FAX`` est ancré au libellé
("Fax :"/"Télécopie :") collé au numéro : il doit donc tourner sur la LIGNE
COMPLÈTE, avant le split clé/valeur (qui sépare le libellé du numéro et
empêcherait toute détection). Le hit FAX doit atteindre ``anon.audit`` pour que
le burn PDF (vector+raster, dérivé de l'audit) masque le numéro."""
def _repl_fax(m: re.Match) -> str:
num = m.group(1)
audit.append(PiiHit(page_idx, "FAX", num, PLACEHOLDERS["FAX"]))
return m.group(0).replace(num, PLACEHOLDERS["FAX"])
return RE_FAX.sub(_repl_fax, line)
def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
disabled = cfg.get("disabled_kinds") or set()
# FAX non toggleable : masquage+audit sur la ligne complète AVANT toute autre
# passe (le split clé/valeur sépare « Fax » du numéro → détection impossible).
line = _mask_fax_unconditional(line, audit, page_idx)
line = _mask_admin_label(line, audit, page_idx, cfg)
structured_line = _mask_structured_line(line, audit, page_idx)
structured_line = _mask_structured_line(line, audit, page_idx, disabled)
if structured_line != line:
return structured_line
parts = SPLITTER.split(line, maxsplit=1)
@@ -2281,7 +2334,7 @@ def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
# probablement du narratif, pas un libellé `Label : valeur`.
if len(parts) == 2 and parts[1].strip() and len(parts[0].split()) <= 5:
key, value = parts
masked_key = _mask_critical_in_key(key, audit, page_idx)
masked_key = _mask_critical_in_key(key, audit, page_idx, disabled)
masked_val = _mask_line_by_regex(value, audit, page_idx, cfg)
return f"{masked_key.strip()} : {masked_val.strip()}"
return _mask_line_by_regex(line, audit, page_idx, cfg)
@@ -2967,8 +3020,13 @@ def _cross_validate_name_candidates(
return validated_names, validated_force_names
def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_names: set = None) -> str:
"""Remplace globalement chaque nom extrait dans le texte."""
def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_names: set = None,
disabled: Optional[Set[str]] = None) -> str:
"""Remplace globalement chaque nom extrait dans le texte.
Plan 1b (P1-2/F-2) : si la catégorie NOM est décochée, ne masque RIEN
(les noms restent en clair). No-op aussi quand ``names`` est vide."""
if disabled and "NOM" in disabled:
return text
placeholder = PLACEHOLDERS["NOM"]
_force = force_names or set()
@@ -3063,10 +3121,17 @@ def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit], cfg: Dict[str,
kind = rule.get("kind")
if kind:
_APPLY_KINDS.add(str(kind))
# Plan 1b (P1-2/F-2) : ne pas réappliquer dans le texte les hits dont la
# catégorie est décochée (ex: NIR, ou un kind admin mappé à une des 7).
# Default-deny : si _category_of renvoie None (kind non toggleable), on
# masque toujours. No-op byte-for-byte quand disabled est vide.
disabled = (cfg or {}).get("disabled_kinds") or set()
# Collecter les valeurs à remplacer, groupées par placeholder
replacements: Dict[str, str] = {} # original → placeholder
for h in audit:
if h.kind in _APPLY_KINDS and h.original and len(h.original.strip()) >= 4:
if disabled and _category_of(h.kind) in disabled:
continue
replacements[h.original.strip()] = h.placeholder
# Remplacer les plus longs d'abord (éviter les remplacements partiels)
for original in sorted(replacements, key=len, reverse=True):
@@ -3103,6 +3168,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
if _is_practitioner_council_recoding_form(full_raw):
cfg = dict(cfg)
cfg["_preserve_practitioner_council_ogc"] = True
# Plan 1b (P1-2/F-2/F-5) — catégories décochées (7 toggles). Vide ⇒ no-op
# byte-for-byte. Chaque passe de masquage TEXTE saute sa catégorie si décochée.
disabled = cfg.get("disabled_kinds") or set()
extracted_names, doc_force_names, doc_candidates = _extract_document_names(full_raw, cfg)
# Phase 0b : si document Trackare, extraction renforcée des PII structurés
@@ -3178,8 +3246,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
r"(\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4})",
re.IGNORECASE,
)
for m in _RE_DATE_NAISSANCE_MULTILINE.finditer(full_raw):
audit.append(PiiHit(-1, "DATE_NAISSANCE", m.group(1), PLACEHOLDERS["DATE_NAISSANCE"]))
if "DATE_NAISSANCE" not in disabled:
for m in _RE_DATE_NAISSANCE_MULTILINE.finditer(full_raw):
audit.append(PiiHit(-1, "DATE_NAISSANCE", m.group(1), PLACEHOLDERS["DATE_NAISSANCE"]))
# Phase 0e : IPP multiline (N°Ipp :\n20023294 ou I.P.P. :\nS1032021)
_RE_IPP_MULTILINE = re.compile(
@@ -3197,8 +3266,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
r"(\d(?:[\s.\-]?\d){12})\b",
re.IGNORECASE,
)
for m in _RE_NIR_NO_KEY_MULTILINE.finditer(full_raw):
audit.append(PiiHit(-1, "NIR", m.group(1), PLACEHOLDERS["NIR"]))
if "NIR" not in disabled:
for m in _RE_NIR_NO_KEY_MULTILINE.finditer(full_raw):
audit.append(PiiHit(-1, "NIR", m.group(1), PLACEHOLDERS["NIR"]))
# Phase 0f : numéro d'accession / d'examen en en-tête de labo ou imagerie
# Ex:
@@ -3254,13 +3324,15 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
def _repl_etab_linebreak(m: re.Match, _page=i) -> str:
audit.append(PiiHit(_page, "ETAB", m.group(0), PLACEHOLDERS["ETAB"]))
return PLACEHOLDERS["ETAB"]
page_txt = RE_ETAB_LINEBREAK.sub(_repl_etab_linebreak, page_txt)
if "ETAB" not in disabled:
page_txt = RE_ETAB_LINEBREAK.sub(_repl_etab_linebreak, page_txt)
def _repl_iao_multiline(m: re.Match, _page=i) -> str:
value = m.group(2).strip()
audit.append(PiiHit(_page, "NOM_FORCE", value, PLACEHOLDERS["NOM"]))
return m.group(1) + PLACEHOLDERS["NOM"]
page_txt = RE_TRACKARE_IAO_MULTILINE_VALUE.sub(_repl_iao_multiline, page_txt)
if "NOM" not in disabled:
page_txt = RE_TRACKARE_IAO_MULTILINE_VALUE.sub(_repl_iao_multiline, page_txt)
lines = page_txt.splitlines()
masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines]
@@ -3285,7 +3357,8 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
# Phase 2 : application globale des noms extraits (rattrapage)
# Utilise all_names (validé par NER-first si disponible, sinon extracted_names original)
if all_names:
text_out = _apply_extracted_names(text_out, all_names, audit, force_names=all_force_names)
text_out = _apply_extracted_names(text_out, all_names, audit, force_names=all_force_names,
disabled=disabled)
# Phase 2b : application globale des PiiHit (EPISODE, RPPS, FINESS)
text_out = _apply_trackare_hits_to_text(text_out, audit, cfg)
@@ -3297,6 +3370,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str:
# remplace via regex sur les 'word' détectés (approche pragmatique)
keep_org_gpe = bool((cfg.get("whitelist", {}) or {}).get("org_gpe_keep", False))
# Plan 1b (P1-2/F-5) : gating PER-HIT (jamais skip toute la fonction, sinon
# on perdrait les catégories encore actives). Default-deny via _category_of.
disabled = cfg.get("disabled_kinds") or set()
def repl_once(s: str, old: str, new: str) -> str:
return re.sub(rf"\b{re.escape(old)}\b", new, s)
out = text
@@ -3307,11 +3383,15 @@ def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], au
if len(w) <= 2: # trop court
continue
if grp in {"PER", "PERSON"}:
if disabled and _category_of("NER_PER") in disabled: # catégorie NOM décochée
continue
audit.append(PiiHit(-1, "NER_PER", w, PLACEHOLDERS["NOM"]))
out = repl_once(out, w, PLACEHOLDERS["NOM"])
elif grp in {"ORG"}:
if keep_org_gpe:
continue
if disabled and _category_of("NER_ORG") in disabled: # catégorie ETAB décochée
continue
audit.append(PiiHit(-1, "NER_ORG", w, PLACEHOLDERS["ETAB"]))
out = repl_once(out, w, PLACEHOLDERS["ETAB"])
elif grp in {"LOC"}:
@@ -3368,6 +3448,9 @@ def apply_hf_ner_on_narrative(text_out: str, cfg: Dict[str, Any], manager: Optio
def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str:
"""Masque les entités détectées par EDS-Pseudo en utilisant le mapping eds_mapped_key."""
# Plan 1b (P1-2/F-5) : gating PER-HIT via la catégorie du kind EDS_{label}.
# Jamais de skip global (sinon perte des catégories actives). Default-deny.
disabled = cfg.get("disabled_kinds") or set()
def repl_once(s: str, old: str, new: str) -> str:
return re.sub(rf"\b{re.escape(old)}\b", new, s)
out = text
@@ -3439,6 +3522,9 @@ def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str,
continue
if w.upper() in _STRUCTURAL_WORDS:
continue
# Gating per-hit (F-5) : catégorie décochée → laisser en clair.
if disabled and _category_of(f"EDS_{label}") in disabled:
continue
placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"])
audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder))
out = repl_once(out, w, placeholder)
@@ -4319,6 +4405,11 @@ def _apply_whitelist(text: str, phrases: List[str], audit: List[PiiHit]) -> str:
def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
"""Rescan de sécurité : re-détecte les PII critiques qui auraient échappé au premier passage."""
# Plan 1b (P1-2/F-2) — filet de sécurité aussi gaté par catégorie : une
# catégorie décochée ne doit pas être re-masquée ici (sinon la valeur,
# laissée en clair plus haut, serait masquée par le rescan). Vide ⇒ no-op
# byte-for-byte. Default-deny conservé pour tout kind non toggleable.
disabled = (cfg or {}).get("disabled_kinds") or set()
# enlève TABLES du scope
def strip_tables(s: str):
kept = []
@@ -4345,33 +4436,38 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
# espacé soit consommé par RE_TEL.
def _rescan_nir(m: re.Match) -> str:
return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0)
protected = RE_NIR.sub(_rescan_nir, protected)
protected = RE_NIR_NO_KEY.sub(PLACEHOLDERS["NIR"], protected) # 13 chiffres label-ancré
if "NIR" not in disabled:
protected = RE_NIR.sub(_rescan_nir, protected)
protected = RE_NIR_NO_KEY.sub(PLACEHOLDERS["NIR"], protected) # 13 chiffres label-ancré
# FAX avant TEL pour que le numéro de fax devienne [FAX] et non [TEL].
protected = RE_FAX.sub(PLACEHOLDERS["FAX"], protected)
protected = RE_TEL_SLASH.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_TEL_COMPACT.sub(PLACEHOLDERS["TEL"], protected)
if "TEL" not in disabled:
protected = RE_TEL_SLASH.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_TEL_COMPACT.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected)
# X-L2 — identifiants jusque-là non rescannés (fuite si vus 1 fois puis répétés) :
protected = RE_RIB.sub(PLACEHOLDERS["IBAN"], protected)
protected = RE_BIC.sub(PLACEHOLDERS["IBAN"], protected)
protected = RE_ADELI.sub(PLACEHOLDERS["ADELI"], protected)
protected = RE_OGC.sub(PLACEHOLDERS["OGC"], protected)
protected = RE_NUM_ADHERENT.sub(PLACEHOLDERS["ADHERENT"], protected)
protected = RE_NUM_MUTUELLE.sub(PLACEHOLDERS["ADHERENT"], protected)
if "ADHERENT" not in disabled:
protected = RE_NUM_ADHERENT.sub(PLACEHOLDERS["ADHERENT"], protected)
protected = RE_NUM_MUTUELLE.sub(PLACEHOLDERS["ADHERENT"], protected)
# Nouvelles regex : dates de naissance, dates, adresses, codes postaux
protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected)
if "DATE_NAISSANCE" not in disabled:
protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected)
# protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) # désactivé
protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_ADRESSE_LIEU_DIT.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_BP.sub(PLACEHOLDERS["ADRESSE"], protected)
def _rescan_code_postal(m: re.Match) -> str:
if m.group(1):
return _replace_captured_value(m.group(0), m.group(1), PLACEHOLDERS["CODE_POSTAL"])
return PLACEHOLDERS["CODE_POSTAL"]
protected = RE_CODE_POSTAL.sub(_rescan_code_postal, protected)
if "ADRESSE" not in disabled:
protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_ADRESSE_LIEU_DIT.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_BP.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_CODE_POSTAL.sub(_rescan_code_postal, protected)
# N° Episode
protected = RE_EPISODE.sub(PLACEHOLDERS["EPISODE"], protected)
# N° venue / séjour
@@ -4386,30 +4482,33 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
def _rescan_finess(m: re.Match) -> str:
return PLACEHOLDERS["FINESS"] if m.group(1).upper() in _FINESS_NUMBERS else m.group(0)
protected = RE_BARE_9DIGITS.sub(_rescan_finess, protected)
# Établissements (regex)
protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected)
protected = RE_HOPITAL_VILLE.sub(PLACEHOLDERS["ETAB"], protected)
# Établissements (gazetteer Aho-Corasick FINESS — 116K noms distinctifs)
protected = _mask_finess_establishments(protected)
# Adresses (gazetteer Aho-Corasick FINESS — 28K noms de voie)
protected = _mask_finess_addresses(protected)
# Texte espacé d'en-tête : "C E N T R E H O S P I T A L I E R" → [ETABLISSEMENT]
_re_spaced = re.compile(r'(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ]\s){4,}[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ]')
_spaced_kw = {"HOSPITALIER", "HOSPITALIERE", "HOSPITALIERES", "HOSPITALIERS",
"CLINIQUE", "HOPITAL", "HÔPITAL", "POLYCLINIQUE",
"CENTRE", "ETABLISSEMENT", "MAISON", "RESIDENCE",
"EHPAD", "SSR", "USLD", "CHU", "CHRU"}
for m_sp in _re_spaced.finditer(protected):
collapsed = m_sp.group(0).replace(" ", "").upper()
if any(kw in collapsed for kw in _spaced_kw):
protected = protected.replace(m_sp.group(0), PLACEHOLDERS["ETAB"], 1)
# Établissements (regex + gazetteer + texte espacé) → catégorie ETAB.
if "ETAB" not in disabled:
protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected)
protected = RE_HOPITAL_VILLE.sub(PLACEHOLDERS["ETAB"], protected)
# Établissements (gazetteer Aho-Corasick FINESS — 116K noms distinctifs)
protected = _mask_finess_establishments(protected)
# Texte espacé d'en-tête : "C E N T R E H O S P I T A L I E R" → [ETABLISSEMENT]
_re_spaced = re.compile(r'(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ]\s){4,}[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ]')
_spaced_kw = {"HOSPITALIER", "HOSPITALIERE", "HOSPITALIERES", "HOSPITALIERS",
"CLINIQUE", "HOPITAL", "HÔPITAL", "POLYCLINIQUE",
"CENTRE", "ETABLISSEMENT", "MAISON", "RESIDENCE",
"EHPAD", "SSR", "USLD", "CHU", "CHRU"}
for m_sp in _re_spaced.finditer(protected):
collapsed = m_sp.group(0).replace(" ", "").upper()
if any(kw in collapsed for kw in _spaced_kw):
protected = protected.replace(m_sp.group(0), PLACEHOLDERS["ETAB"], 1)
# Adresses (gazetteer Aho-Corasick FINESS — 28K noms de voie) → catégorie ADRESSE.
if "ADRESSE" not in disabled:
protected = _mask_finess_addresses(protected)
# Villes (gazetteer Aho-Corasick — INSEE + FINESS)
if _VILLE_AC is None:
_build_ville_ac()
if _VILLE_AC is not None:
protected, _ = _mask_ville_gazetteers(protected)
# Services hospitaliers
protected = RE_SERVICE.sub(PLACEHOLDERS["MASK"], protected)
# Services hospitaliers → catégorie ETAB.
if "ETAB" not in disabled:
protected = RE_SERVICE.sub(PLACEHOLDERS["MASK"], protected)
# Lieu de naissance / Ville de résidence (accepte tout : villes, codes INSEE, minuscules)
_re_lieu_rescan = re.compile(r"(Lieu\s+de\s+naissance\s*:\s*)(\S.+)")
protected = _re_lieu_rescan.sub(lambda m: m.group(1) + PLACEHOLDERS["VILLE"], protected)
@@ -4433,20 +4532,21 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
if not clean:
return raw
return raw.replace(span, PLACEHOLDERS["NOM"])
protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected)
# Mr/Mme + initiale isolée : "Mme Z", "Mr R" → masquer
protected = RE_CIVILITE_INITIALE.sub(
lambda m: m.group(1) + PLACEHOLDERS["NOM"], protected
)
# Initiales identifiantes devant [NOM] : "Dr T. [NOM]" → "Dr [NOM] [NOM]"
_re_init_nom = re.compile(r'\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ])\.[\s\-]*(\[NOM\])')
protected = _re_init_nom.sub(r'[NOM] \2', protected)
# Références initiales : "Ref : JF/VA" → "Ref : [NOM]/[NOM]"
_re_ref_init = re.compile(r'(?:Ref\s*:\s*|Réf\s*:\s*)([A-Z]{1,3})\s*/\s*([A-Z]{1,3})\b')
protected = _re_ref_init.sub(
lambda m: m.group(0)[:m.group(0).index(m.group(1))] + PLACEHOLDERS["NOM"] + "/" + PLACEHOLDERS["NOM"],
protected,
)
if "NOM" not in disabled:
protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected)
# Mr/Mme + initiale isolée : "Mme Z", "Mr R" → masquer
protected = RE_CIVILITE_INITIALE.sub(
lambda m: m.group(1) + PLACEHOLDERS["NOM"], protected
)
# Initiales identifiantes devant [NOM] : "Dr T. [NOM]" → "Dr [NOM] [NOM]"
_re_init_nom = re.compile(r'\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ])\.[\s\-]*(\[NOM\])')
protected = _re_init_nom.sub(r'[NOM] \2', protected)
# Références initiales : "Ref : JF/VA" → "Ref : [NOM]/[NOM]"
_re_ref_init = re.compile(r'(?:Ref\s*:\s*|Réf\s*:\s*)([A-Z]{1,3})\s*/\s*([A-Z]{1,3})\b')
protected = _re_ref_init.sub(
lambda m: m.group(0)[:m.group(0).index(m.group(1))] + PLACEHOLDERS["NOM"] + "/" + PLACEHOLDERS["NOM"],
protected,
)
res = list(protected)
for start, end, payload in kept:
res[start:end] = list(payload)
@@ -5056,10 +5156,14 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
# ----------------- VLM pour PDFs scannés -----------------
def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: OcrWordMap, vlm_manager) -> None:
def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: OcrWordMap, vlm_manager,
disabled: Optional[Set[str]] = None) -> None:
"""Utilise un VLM (Ollama) pour détecter visuellement les PII sur chaque page d'un PDF scanné.
Les entités détectées sont ajoutées à anon.audit et au texte pseudonymisé.
Auto-rotation : si une page a peu de mots OCR, essaie 4 orientations."""
Auto-rotation : si une page a peu de mots OCR, essaie 4 orientations.
Plan 1b (P1-2/F-2) : gating PER-HIT via _category_of(kind). Une catégorie
décochée n'est ni ajoutée à l'audit ni masquée dans le texte. Default-deny."""
disabled = disabled or set()
from vlm_manager import VLM_CATEGORY_MAP
doc = fitz.open(str(pdf_path))
# Collecter les PII déjà détectés pour contexte VLM
@@ -5103,6 +5207,10 @@ def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: Oc
if cat not in VLM_CATEGORY_MAP:
continue
kind, placeholder_key = VLM_CATEGORY_MAP[cat]
# Gating per-hit (F-2) : catégorie décochée → laisser en clair
# (ni audit, ni texte, ni raster). Default-deny si non toggleable.
if disabled and _category_of(kind) in disabled:
continue
placeholder = PLACEHOLDERS.get(placeholder_key, PLACEHOLDERS["MASK"])
if cat in _SPLIT_CATS:
@@ -5229,13 +5337,17 @@ def process_pdf(
if ocr_used and vlm_manager is not None and VlmManager is not None:
try:
if vlm_manager.is_loaded():
_apply_vlm_on_scanned_pdf(pdf_path, anon, ocr_word_map, vlm_manager)
_apply_vlm_on_scanned_pdf(pdf_path, anon, ocr_word_map, vlm_manager,
disabled=(cfg.get("disabled_kinds") or set()))
_perf_mark("vlm_scan")
except Exception:
pass # dégradation gracieuse
# 2) NER (optionnel) — sur le narratif
final_text = anon.text_out
# Plan 1b (P1-2/F-2) — catégories décochées pour les passes post-masquage
# (cleanups + propagation globale). Vide ⇒ no-op byte-for-byte.
_disabled_cats = cfg.get("disabled_kinds") or set()
hf_hits: List[PiiHit] = []
if use_hf and ner_manager is not None and ner_manager.is_loaded():
# Détecter le type de manager et appeler la bonne fonction
@@ -5263,7 +5375,8 @@ def process_pdf(
return m.group(0)
anon.audit.append(PiiHit(-1, "NOM_GLOBAL", tok, PLACEHOLDERS["NOM"]))
return m.group(1) + PLACEHOLDERS["NOM"]
final_text = _re_nom_orphan.sub(_clean_nom_orphan, final_text)
if "NOM" not in _disabled_cats:
final_text = _re_nom_orphan.sub(_clean_nom_orphan, final_text)
# 3b) Nettoyage post-masquage : codes postaux orphelins (5 chiffres collés à un placeholder)
# et téléphones fragmentés sur plusieurs lignes
@@ -5271,7 +5384,8 @@ def process_pdf(
def _clean_cp_orphan(m):
anon.audit.append(PiiHit(-1, "CODE_POSTAL", m.group(2), PLACEHOLDERS["CODE_POSTAL"]))
return m.group(1) + PLACEHOLDERS["CODE_POSTAL"]
final_text = _re_cp_orphan.sub(_clean_cp_orphan, final_text)
if "ADRESSE" not in _disabled_cats: # CODE_POSTAL suit le toggle ADRESSE
final_text = _re_cp_orphan.sub(_clean_cp_orphan, final_text)
# Téléphones fragmentés : "0X XX XX XX\nXX" coupé en fin de ligne (ligne suivante immédiate)
_re_tel_frag = re.compile(r"((?:\+33\s?|0)\d(?:[ .-]?\d){6,7})\s*\n\s*(\d{2}(?!\d))")
@@ -5281,8 +5395,6 @@ def process_pdf(
anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"] + "\n"
return m.group(0)
final_text = _re_tel_frag.sub(_clean_tel_frag, final_text)
# Téléphones incomplets en fin de ligne (8 ou 9 chiffres au format 0X XX XX XX) : masquer la partie visible
_re_tel_partial = re.compile(r"(?<!\d)((?:\+33\s?|0)\d(?:[ .-]?\d){5,7})(?!\d)\s*$", re.MULTILINE)
def _clean_tel_partial(m):
@@ -5291,7 +5403,9 @@ def process_pdf(
anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
return m.group(0)
final_text = _re_tel_partial.sub(_clean_tel_partial, final_text)
if "TEL" not in _disabled_cats:
final_text = _re_tel_frag.sub(_clean_tel_frag, final_text)
final_text = _re_tel_partial.sub(_clean_tel_partial, final_text)
# 3c) Initiales identifiantes devant [NOM] : "Dr T. [NOM]" → "Dr [NOM] [NOM]"
_RE_INITIAL_BEFORE_NOM = re.compile(
@@ -5300,7 +5414,6 @@ def process_pdf(
def _clean_initial_before_nom(m):
anon.audit.append(PiiHit(-1, "NOM_INITIAL", m.group(1) + ".", PLACEHOLDERS["NOM"]))
return PLACEHOLDERS["NOM"] + " " + m.group(2)
final_text = _RE_INITIAL_BEFORE_NOM.sub(_clean_initial_before_nom, final_text)
# 3d) Références initiales : "Ref : JF/VA", "Réf : AD/EP" → "Ref : [NOM]/[NOM]"
_RE_REF_INITIALS = re.compile(
@@ -5311,7 +5424,9 @@ def process_pdf(
anon.audit.append(PiiHit(-1, "NOM_INITIAL", m.group(2), PLACEHOLDERS["NOM"]))
prefix = m.group(0)[:m.group(0).index(m.group(1))]
return prefix + PLACEHOLDERS["NOM"] + "/" + PLACEHOLDERS["NOM"]
final_text = _RE_REF_INITIALS.sub(_clean_ref_initials, final_text)
if "NOM" not in _disabled_cats:
final_text = _RE_INITIAL_BEFORE_NOM.sub(_clean_initial_before_nom, final_text)
final_text = _RE_REF_INITIALS.sub(_clean_ref_initials, final_text)
# 3e) Layout BACTERIO résiduel : le numéro de venue peut survivre s'il est
# rejeté plusieurs lignes après le libellé, juste avant "IPP : [IPP]".
@@ -5463,6 +5578,11 @@ def process_pdf(
continue
if h.kind in _GLOBAL_SKIP_KINDS:
continue
# Plan 1b (P1-2/F-2/F-4) : ne pas propager une catégorie décochée dans le
# texte (sa valeur, laissée en clair plus haut, serait re-masquée ici).
# Default-deny via _category_of. No-op quand _disabled_cats est vide.
if _disabled_cats and _category_of(h.kind) in _disabled_cats:
continue
token = h.original.strip()
if not token or len(token) < 4:
continue

View File

@@ -0,0 +1,260 @@
"""Plan 1b — Task 3 (P1-2 / F-2 + F-5) : gating TEXTE par catégorie.
Vérifie que, quand une des 7 catégories toggleables est décochée
(``cfg["disabled_kinds"]``), la valeur de cette catégorie ressort EN CLAIR
dans le texte produit, SANS jamais démasquer une autre catégorie encore
activée (pas de fuite croisée) et SANS régression quand rien n'est désactivé.
Entrées RÉELLES fabriquées à partir des vraies regex du moteur (aucun mock).
NIR valide (clé modulo 97) calculé : body 1850578006084 → clé 91.
"""
import re
import anonymizer_core_refactored_onnx as core
# --- Échantillons clairs par catégorie (1 PII de la catégorie cible) ---------
# Chaque échantillon est validé : masqué quand la catégorie est activée.
_SAMPLES = {
"NOM": ("Nom de famille : DUPONT", "DUPONT", "[NOM]"),
"DATE_NAISSANCE": ("Né le 12/03/1950", "12/03/1950", "[DATE_NAISSANCE]"),
"ETAB": ("Etablissement : EHPAD Solemnis", "Solemnis", "[ETABLISSEMENT]"),
"ADRESSE": ("Domicile : 13 rue des Lilas", "rue des Lilas", "[ADRESSE]"),
"NIR": ("NIR 185057800608491", "185057800608491", "[NIR]"),
"TEL": ("Tel : 0612345678", "0612345678", "[TEL]"),
"ADHERENT": ("N° adhérent : ABC123456", "ABC123456", "[ADHERENT]"),
}
# Une catégorie « témoin » différente, toujours activée, dont le placeholder doit
# rester présent (anti-fuite croisée). On choisit NIR comme témoin sauf pour la
# catégorie cible NIR (témoin = TEL).
_WITNESS = {
"NOM": ("NIR 185057800608491", "[NIR]"),
"DATE_NAISSANCE": ("NIR 185057800608491", "[NIR]"),
"ETAB": ("NIR 185057800608491", "[NIR]"),
"ADRESSE": ("NIR 185057800608491", "[NIR]"),
"NIR": ("Tel : 0612345678", "[TEL]"),
"TEL": ("NIR 185057800608491", "[NIR]"),
"ADHERENT": ("NIR 185057800608491", "[NIR]"),
}
_SEVEN = ["NOM", "DATE_NAISSANCE", "ETAB", "ADRESSE", "NIR", "TEL", "ADHERENT"]
def _run(text, disabled):
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = set(disabled)
return core.anonymise_document_regex([text], [], cfg)
import pytest
@pytest.mark.parametrize("cat", _SEVEN)
def test_disabled_category_left_in_clear_witness_masked(cat):
"""La catégorie décochée ressort en clair ; le témoin reste masqué."""
target_line, clear_value, target_ph = _SAMPLES[cat]
witness_line, witness_ph = _WITNESS[cat]
text = target_line + "\n" + witness_line
res = _run(text, {cat})
out = res.text_out
# 1) la valeur de la catégorie décochée doit être EN CLAIR
assert clear_value in out, (
f"{cat} décochée : '{clear_value}' devrait être en clair.\nout={out!r}")
# 2) son placeholder ne doit PAS apparaître
assert target_ph not in out, (
f"{cat} décochée : '{target_ph}' ne devrait pas apparaître.\nout={out!r}")
# 3) le témoin (autre catégorie activée) doit RESTER masqué
assert witness_ph in out, (
f"{cat} décochée : témoin {witness_ph} devrait rester masqué.\nout={out!r}")
@pytest.mark.parametrize("cat", _SEVEN)
def test_enabled_category_still_masked(cat):
"""Avec rien de désactivé, chaque catégorie reste masquée (non-régression)."""
target_line, clear_value, target_ph = _SAMPLES[cat]
res = _run(target_line, set())
assert target_ph in res.text_out, (
f"{cat} activée devrait être masquée.\nout={res.text_out!r}")
def test_one_disabled_all_others_stay_masked():
"""1 catégorie décochée : TOUTES les autres restent masquées (anti-fuite)."""
text = "\n".join(s[0] for s in _SAMPLES.values())
for off in _SEVEN:
res = _run(text, {off})
out = res.text_out
# la catégorie décochée doit être en clair
clear = _SAMPLES[off][1]
assert clear in out, f"{off} décochée devrait être en clair.\nout={out!r}"
# toutes les AUTRES doivent rester masquées
for other in _SEVEN:
if other == off:
continue
ph = _SAMPLES[other][2]
assert ph in out, (
f"{off} décochée NE doit PAS démasquer {other} ({ph}).\nout={out!r}")
def test_baseline_all_enabled_byte_for_byte():
"""disabled vide ⇒ sortie identique à un run sans la clé disabled_kinds."""
text = "\n".join(s[0] for s in _SAMPLES.values())
cfg_a = core.load_dictionaries(None)
cfg_a["disabled_kinds"] = set()
cfg_b = core.load_dictionaries(None) # pas de clé du tout
out_a = core.anonymise_document_regex([text], [], cfg_a).text_out
out_b = core.anonymise_document_regex([text], [], cfg_b).text_out
assert out_a == out_b
# et tout est bien masqué
for _line, _clear, ph in _SAMPLES.values():
assert ph in out_a
# --- selective_rescan : filet de sécurité, doit aussi gater ------------------
@pytest.mark.parametrize("cat,line,clear,ph", [
("TEL", "Joindre au 0612345678", "0612345678", "[TEL]"),
("NIR", "Secu 185057800608491", "185057800608491", "[NIR]"),
("ADRESSE", "13 rue des Lilas ici", "rue des Lilas", "[ADRESSE]"),
("DATE_NAISSANCE", "Né le 12/03/1950", "12/03/1950", "[DATE_NAISSANCE]"),
("ETAB", "Etablissement EHPAD Solemnis", "Solemnis", "[ETABLISSEMENT]"),
("ADHERENT", "N° adhérent : ABC123456", "ABC123456", "[ADHERENT]"),
])
def test_selective_rescan_gates_disabled(cat, line, clear, ph):
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = {cat}
out = core.selective_rescan(line, cfg=cfg)
assert clear in out, f"rescan {cat} décochée : '{clear}' devrait rester clair.\nout={out!r}"
assert ph not in out, f"rescan {cat} décochée : {ph} ne devrait pas apparaître.\nout={out!r}"
def test_selective_rescan_empty_disabled_byte_for_byte():
"""selective_rescan : disabled vide == aucune clé (non-régression)."""
line = ("Joindre au 0612345678, Secu 185057800608491, "
"13 rue des Lilas, Né le 12/03/1950, EHPAD Solemnis")
cfg_none = core.load_dictionaries(None)
cfg_empty = core.load_dictionaries(None)
cfg_empty["disabled_kinds"] = set()
assert core.selective_rescan(line, cfg=cfg_none) == core.selective_rescan(line, cfg=cfg_empty)
def test_selective_rescan_enabled_still_masks():
"""Non-régression rescan : rien désactivé ⇒ masque tout."""
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = set()
line = "Joindre au 0612345678 et Secu 185057800608491"
out = core.selective_rescan(line, cfg=cfg)
assert "[TEL]" in out and "[NIR]" in out
assert "0612345678" not in out and "185057800608491" not in out
# --- NER per-hit (F-5) : _mask_with_hf -------------------------------------
def test_mask_with_hf_per_hit_gating():
"""NOM décoché : l'entité PER ressort en clair, l'ORG (ETAB) reste masquée."""
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = {"NOM"}
text = "Le patient Martin suivi par Hopital Saint-Louis"
ents = [
{"word": "Martin", "entity_group": "PER"},
{"word": "Hopital Saint-Louis", "entity_group": "ORG"},
]
audit = []
out = core._mask_with_hf(text, ents, cfg, audit)
assert "Martin" in out, f"NOM décoché : Martin devrait rester clair.\nout={out!r}"
assert "[NOM]" not in out
assert "[ETABLISSEMENT]" in out, f"ETAB activé devrait être masqué.\nout={out!r}"
def test_mask_with_hf_no_disabled_masks_all():
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = set()
text = "Le patient Martin"
ents = [{"word": "Martin", "entity_group": "PER"}]
out = core._mask_with_hf(text, ents, cfg, [])
assert "[NOM]" in out and "Martin" not in out
# --- NER per-hit (F-5) : _mask_with_eds_pseudo -----------------------------
def test_mask_with_eds_pseudo_per_hit_gating():
"""NOM décoché : entité EDS NOM en clair, HOPITAL (ETAB) reste masquée."""
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = {"NOM"}
text = "Compte rendu Bernardo signe a Belledonne"
ents = [
{"word": "Bernardo", "entity_group": "NOM", "eds_mapped_key": "NOM", "score": 0.99},
{"word": "Belledonne", "entity_group": "HOPITAL", "eds_mapped_key": "ETAB", "score": 0.99},
]
out = core._mask_with_eds_pseudo(text, ents, cfg, [])
assert "Bernardo" in out, f"NOM décoché : Bernardo devrait rester clair.\nout={out!r}"
assert "[NOM]" not in out
assert "[ETABLISSEMENT]" in out, f"ETAB activé devrait être masqué.\nout={out!r}"
# --- VLM per-hit (F-2) : _apply_vlm gating helper --------------------------
def test_vlm_kind_gating_is_per_hit():
"""Le gating VLM s'évalue par hit via _category_of(kind)."""
import vlm_manager
# NOM décoché : VLM_NOM doit être filtré, VLM_ETAB conservé.
nom_kind, _ = vlm_manager.VLM_CATEGORY_MAP["NOM"]
etab_kind, _ = vlm_manager.VLM_CATEGORY_MAP["ETABLISSEMENT"]
assert core._category_of(nom_kind) == "NOM"
assert core._category_of(etab_kind) == "ETAB"
# === Régression AUDIT-LEVEL (revue qualité : fuite PDF FAX avec TEL décoché) ===
# Le burn PDF (vector+raster) dérive UNIQUEMENT de anon.audit. Un type non
# toggleable dont l'unique site de détection tombait dans (ou en aval d')un bloc
# gaté ne produisait plus de hit audit → numéro VISIBLE dans le PDF livré, même
# si le .txt paraissait propre. Ces tests assertent sur anon.audit, pas le texte.
def _audit_kinds(text, disabled):
"""Lance le constructeur d'audit (anonymise_document_regex) et renvoie les hits."""
cfg = core.load_dictionaries(None)
cfg["disabled_kinds"] = set(disabled)
return core.anonymise_document_regex([text], [], cfg).audit
def _has_hit(audit, kind, placeholder=None):
for h in audit:
if h.kind == kind and (placeholder is None or h.placeholder == placeholder):
return True
return False
@pytest.mark.parametrize("line,fax_value", [
("Fax : 0512345678", "0512345678"),
("Télécopie : 05 12 34 56 78", "05 12 34 56 78"),
("Télécopieur : 0512345678", "0512345678"),
])
def test_fax_audit_hit_survives_tel_disabled(line, fax_value):
"""FAX (non toggleable) DOIT rester dans anon.audit quand TEL est décoché.
C'est le test qui échouait avant le correctif de découplage FAX (fuite PDF)."""
audit = _audit_kinds(line, {"TEL"})
# Un hit FAX doit exister (kind ET placeholder), pour que le burn PDF le masque.
assert _has_hit(audit, "FAX", core.PLACEHOLDERS["FAX"]), (
f"FAX absent de l'audit avec TEL décoché → fuite PDF.\n"
f"line={line!r}\naudit={[(h.kind, h.original) for h in audit]}")
# La valeur ne doit pas survivre déguisée en hit TEL non plus.
assert not _has_hit(audit, "TEL"), "Un fax ne doit pas devenir un hit TEL."
def test_fax_audit_hit_present_when_nothing_disabled():
"""Non-régression : FAX produit bien un hit audit sur le chemin par défaut."""
audit = _audit_kinds("Fax : 0512345678", set())
assert _has_hit(audit, "FAX", core.PLACEHOLDERS["FAX"])
def test_tel_audit_hit_dropped_when_tel_disabled():
"""Cohérence : un vrai TÉLÉPHONE (toggleable) sort bien de l'audit si TEL décoché."""
audit = _audit_kinds("Tel : 0612345678", {"TEL"})
assert not _has_hit(audit, "TEL"), "TEL décoché ⇒ pas de hit TEL (numéro laissé clair)."
@pytest.mark.parametrize("off", ["NOM", "ADRESSE", "NIR", "ADHERENT", "ETAB", "DATE_NAISSANCE"])
def test_fax_audit_survives_any_unrelated_toggle(off):
"""Général : le non toggleable FAX reste dans l'audit quel que soit le toggle décoché."""
audit = _audit_kinds("Fax : 0512345678", {off})
assert _has_hit(audit, "FAX", core.PLACEHOLDERS["FAX"]), (
f"FAX absent de l'audit avec {off} décoché.\n"
f"audit={[(h.kind, h.original) for h in audit]}")