fix(core): renforcer detection PII et FINESS Corse
Couvre les corrections PII batch A/A-2, le NIR multi-ligne en flux reel, le gazetteer FINESS Corse derive depuis la base locale, et les tests de regression associes. Aucun build ni diffusion.
This commit is contained in:
@@ -423,7 +423,7 @@ def _load_finess_gazetteers():
|
||||
if finess_path.exists():
|
||||
try:
|
||||
_FINESS_NUMBERS = {
|
||||
line.strip() for line in finess_path.read_text(encoding="utf-8").splitlines()
|
||||
line.strip().upper() for line in finess_path.read_text(encoding="utf-8").splitlines()
|
||||
if line.strip()
|
||||
}
|
||||
log.info(f"Gazetteer FINESS numéros: {len(_FINESS_NUMBERS)} entrées")
|
||||
@@ -520,6 +520,8 @@ PLACEHOLDERS = {
|
||||
"EPISODE": "[EPISODE]",
|
||||
"RPPS": "[RPPS]",
|
||||
"ADHERENT": "[ADHERENT]",
|
||||
"ADELI": "[ADELI]",
|
||||
"FAX": "[FAX]",
|
||||
}
|
||||
|
||||
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"}
|
||||
@@ -532,7 +534,7 @@ RE_TEL_COMPACT = re.compile(r"(?<!\d)0[1-9]\d{8}(?!\d)")
|
||||
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}(?:\s?[A-Z0-9]{4}){3,7}(?:\s?[A-Z0-9]{1,4})\b")
|
||||
RE_IPP = re.compile(r"\b(?:I\.?P\.?P\.?|IPP|N°\s*Ipp)\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE)
|
||||
RE_CSULT = re.compile(r"\b(?:N°\s*Csult|N°\s*Interv)\s*[:\-]?\s*(\d{6,})\b", re.IGNORECASE)
|
||||
RE_FINESS = re.compile(r"\b(?:N°\s*)?FINESS?\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
|
||||
RE_FINESS = re.compile(r"\b(?:N°\s*)?FINESS?\s*[:\-]?\s*(\d{9}|2[AB]\d{7})\b", re.IGNORECASE)
|
||||
RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE)
|
||||
RE_RPPS = re.compile(
|
||||
r"\b(?:N°\s*)?RPPS"
|
||||
@@ -551,6 +553,51 @@ RE_NUM_ADHERENT = re.compile(
|
||||
r"\b(?:n[°o]?\s*|num[ée]ro\s+(?:d['’]\s*)?)adh[ée]rent[e]?\s*[:\-]?\s*([A-Z0-9]{6,15})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# Numéro mutuelle / AMC / CSS / CSTS / organisme complémentaire (audit PII FORT
|
||||
# M-L1/M-L6). Séparateur ([:\-] ou n°) REQUIS + valeur commençant par un chiffre
|
||||
# → évite de masquer un NOM de mutuelle (« Mutuelle : MGEN » laisse MGEN intact)
|
||||
# tout en captant « Mutuelle : 123456 », « AMC : 1234567 », « CSS n° : 1234567 ».
|
||||
RE_NUM_MUTUELLE = re.compile(
|
||||
r"\b(?:mutuelle|AMC|CSTS|CSS|organisme\s+compl[ée]mentaire)\s*(?:n[°o]\s*)?[:\-]\s*(\d[A-Z0-9]{5,14})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# --- Batch A-2 (rectificatif PII FORT 2026-06-17) ---------------------------
|
||||
# X-L1 — ADELI (identifiant professionnel de santé, PII nominative). Valeur
|
||||
# commençant par un chiffre (format dept+catégorie+séquence) → anti-FP sur un
|
||||
# simple mot après « ADELI ».
|
||||
RE_ADELI = re.compile(
|
||||
r"\b(?:n[°o]?\s*|num[ée]ro\s+)?ADELI\s*[:\-]?\s*(\d[A-Za-z0-9]{5,8})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# #9 — FAX : numéro de fax label-ancré → placeholder [FAX] (appliqué AVANT
|
||||
# RE_TEL pour ne pas le masquer en [TEL]). Anti-FP : sans label fax, rien.
|
||||
RE_FAX = re.compile(
|
||||
r"\b(?:fax|t[ée]l[ée]copie(?:ur)?)\s*[:\-]?\s*"
|
||||
r"((?:\+33\s?(?:\(0\))?\s?|0)\d(?:[\s.\-]?\d){8})",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_NIR_NO_KEY_LABEL = (
|
||||
r"(?:NIR|N°\s*SS|n°\s*s[ée]cu(?:rit[ée])?|"
|
||||
r"s[ée]curit[ée]\s+sociale|vitale|matricule)"
|
||||
)
|
||||
# #11 — NIR 13 chiffres SANS clé, STRICTEMENT après un label NIR/SS/Vitale/
|
||||
# matricule. Anti-FP fort : 13 chiffres nus (sans label) ne matchent jamais.
|
||||
RE_NIR_NO_KEY = re.compile(
|
||||
r"\b" + _NIR_NO_KEY_LABEL + r"\s*[:\-]?\s*"
|
||||
r"(\d(?:[\s.\-]?\d){12})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# X-L3 — RIB français + BIC/SWIFT → placeholder [IBAN] (même famille bancaire),
|
||||
# label-ancrés (anti-FP sur acronymes type « BNPAFRPP » sans label).
|
||||
RE_RIB = re.compile(
|
||||
r"\b(?:RIB|relev[ée]\s+d['’]identit[ée]\s+bancaire)\s*[:\-]?\s*"
|
||||
r"(\d{5}\s*\d{5}\s*\d{11}\s*\d{2})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
RE_BIC = re.compile(
|
||||
r"\b(?:BIC|SWIFT)\s*[:\-]?\s*([A-Z]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?)\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Variantes de label "Nom" qui ne correspondent pas à RE_EXTRACT_NOM_NAISSANCE
|
||||
# (Nom de jeune fille, Nom de famille, Nom marital, Nom d'usage, Nom marié).
|
||||
@@ -703,8 +750,10 @@ _refresh_medical_stopwords_pattern()
|
||||
_PERSON_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑa-zéèàùâêîôûäëïöüçñ\-\']+"
|
||||
RE_PERSON_CONTEXT = re.compile(
|
||||
r"(?:(?:\bDr\.?|\bDR\.?|\bDocteur|\bPr\.?|\bProfesseur|\bMme|\bMME|\bMadame|\bM\.|\bMr\.?|\bMonsieur"
|
||||
r"|\bMlle\.?|\bMLLE|\bMademoiselle"
|
||||
r"|\bNom[ \t]*:[ \t]*"
|
||||
r"|\bRédigé[ \t]+par|\bValidé[ \t]+par|\bSigné[ \t]+par|\bSaisi[ \t]+par|\bRéalisé[ \t]+par"
|
||||
r"|\bFait[ \t]+par[ \t]*:?"
|
||||
r")[ \t]+)"
|
||||
rf"({_PERSON_TOKEN}(?:[ \t]+{_PERSON_TOKEN}){{0,2}})" # Max 3 mots, pas de newline
|
||||
)
|
||||
@@ -884,12 +933,30 @@ RE_LIEU_DIT_SEUL = re.compile(
|
||||
)
|
||||
|
||||
# --- Nouvelles regex : dates, adresses, âges, dossiers ---
|
||||
_MOIS_FR = r"(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)"
|
||||
_MOIS_FR = (
|
||||
r"(?:janvier|janv\.?|février|févr\.?|fév\.?|mars|avril|avr\.?|mai|juin"
|
||||
r"|juillet|juil\.?|août|aout|septembre|sept\.?|sep\.?|octobre|oct\.?"
|
||||
r"|novembre|nov\.?|décembre|déc\.?)"
|
||||
)
|
||||
# Labels « date de naissance » (audit PII FORT 2026-06-17, D-L2) :
|
||||
# - « Né/Née/Né(e)/Nées le », « (date (de) )naissance », « DDN », « DN ».
|
||||
# Le masquage DDN n'a lieu que si une DATE suit (cf. RE_DATE_NAISSANCE) → un
|
||||
# label seul (« lieu de naissance : Paris ») ne déclenche pas de masque DDN.
|
||||
# X-L5 : « le/la/en » rendu optionnel après Né/Née/Né(e) → couvre « Né(e) : 19/09/1972 »
|
||||
# et « Née la 19/09/1972 ». Le mot doit être un « né » à frontière de mot (pas
|
||||
# « réalisée », « signé », « René »…) → pas de faux positif sur les dates cliniques.
|
||||
_RE_DATE_NAISSANCE_LABEL = r"(?:\bn[ée]+(?:\(?e?\)?)?s?\s*(?:le|la|en)?|(?:date\s+(?:de\s+)?)?naissance|\bDDN\b|\bDN\b)"
|
||||
RE_DATE_NAISSANCE = re.compile(
|
||||
r"(?:\bn[ée]+(?:\(?e?\)?)?\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*"
|
||||
_RE_DATE_NAISSANCE_LABEL + r"\s*[:\-]?\s*"
|
||||
r"(\d{1,2}[\s/.\-]\d{1,2}[\s/.\-]\d{2,4}|\d{1,2}\s+" + _MOIS_FR + r"\s+\d{4})",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# X-L5 : « Né en 1972 » (année seule de naissance). Strictement « né(e) en YYYY »
|
||||
# → anti-FP sur « vu en 2020 », « opéré en 2019 » (pas de « né » à la frontière).
|
||||
RE_DATE_NAISSANCE_ANNEE = re.compile(
|
||||
r"\bn[ée]+(?:\(?e?\)?)?s?\s+en\s+(\d{4})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
RE_DATE = re.compile(
|
||||
r"\b(\d{1,2})\s*[/.\-]\s*(\d{1,2})\s*[/.\-]\s*(\d{4})\b"
|
||||
r"|"
|
||||
@@ -908,14 +975,21 @@ RE_DATE = re.compile(
|
||||
_RE_VOIE_TYPE = (
|
||||
r"(?:rue|avenue|av\.?|boulevard|bd\.?|place|chemin|all[ée]e|impasse|route|cours"
|
||||
r"|passage|square|r[ée]sidence|lotissement|lot\.?|cit[ée]|hameau|quartier|voie"
|
||||
r"|parvis|esplanade|promenade|côte)"
|
||||
r"|parvis|esplanade|promenade|côte"
|
||||
# audit PII FORT 2026-06-17 (A-L2) : types de voie supplémentaires
|
||||
r"|villa|faubourg|escalier|sentier|rond[\s-]?point|traverse|carrefour|mont[ée]e)"
|
||||
)
|
||||
_RE_VOIE_TOKEN = (
|
||||
r"(?:[A-Za-zÀ-ÿ]\.|[A-Za-zÀ-ÿ0-9'’]+(?:-[A-Za-zÀ-ÿ0-9'’]+)*)"
|
||||
)
|
||||
_RE_NUMERO_VOIE = (
|
||||
r"\d{1,4}[ \t]*,?[ \t]*(?:bis|ter)?[ \t]*,?[ \t]*"
|
||||
r"(?:(?:[-–—/]|à|au|a)[ \t]*"
|
||||
r"\d{1,4}[ \t]*,?[ \t]*(?:bis|ter)?[ \t]*,?[ \t]*)?"
|
||||
)
|
||||
RE_ADRESSE = re.compile(
|
||||
r"\b\d{1,4}[ \t]*,?[ \t]*(?:bis|ter)?[ \t]*,?[ \t]*"
|
||||
+ _RE_VOIE_TYPE +
|
||||
r"\b" + _RE_NUMERO_VOIE +
|
||||
_RE_VOIE_TYPE +
|
||||
r"[ \t]+" + _RE_VOIE_TOKEN + r"(?:[ \t]+" + _RE_VOIE_TOKEN + r"){0,9}",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
@@ -1474,7 +1548,7 @@ def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[st
|
||||
return line
|
||||
|
||||
|
||||
RE_BARE_9DIGITS = re.compile(r"\b(\d{9})\b")
|
||||
RE_BARE_9DIGITS = re.compile(r"\b(\d{9}|2[AB]\d{7})\b", re.IGNORECASE)
|
||||
|
||||
def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
|
||||
m = RE_FINESS.search(line)
|
||||
@@ -1482,10 +1556,10 @@ def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[s
|
||||
val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"]))
|
||||
return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line)
|
||||
|
||||
# Détection FINESS par gazetteer : nombre 9 chiffres qui matche un vrai numéro FINESS
|
||||
# Détection FINESS par gazetteer : identifiant FINESS nu connu (9 chiffres ou Corse 2A/2B).
|
||||
if _FINESS_NUMBERS:
|
||||
for m9 in RE_BARE_9DIGITS.finditer(line):
|
||||
if m9.group(1) in _FINESS_NUMBERS:
|
||||
if m9.group(1).upper() in _FINESS_NUMBERS:
|
||||
val = m9.group(1)
|
||||
audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"]))
|
||||
line = line.replace(val, PLACEHOLDERS["FINESS"], 1)
|
||||
@@ -1540,6 +1614,20 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
|
||||
audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"]))
|
||||
return PLACEHOLDERS["NIR"]
|
||||
line = RE_NIR.sub(_repl_nir, line)
|
||||
# NIR 13 chiffres sans clé, STRICTEMENT après label (pas de validation modulo
|
||||
# possible sans la clé ; l'ancre label suffit à éviter les faux positifs).
|
||||
def _repl_nir_no_key(m: re.Match) -> str:
|
||||
val = m.group(1)
|
||||
audit.append(PiiHit(page_idx, "NIR", val, PLACEHOLDERS["NIR"]))
|
||||
return m.group(0).replace(val, PLACEHOLDERS["NIR"])
|
||||
line = RE_NIR_NO_KEY.sub(_repl_nir_no_key, line)
|
||||
|
||||
# FAX (label-ancré) AVANT TEL : un numéro de fax doit devenir [FAX], pas [TEL].
|
||||
def _repl_fax(m: re.Match) -> str:
|
||||
num = m.group(1)
|
||||
audit.append(PiiHit(page_idx, "FAX", num, PLACEHOLDERS["FAX"]))
|
||||
return m.group(0).replace(num, PLACEHOLDERS["FAX"])
|
||||
line = RE_FAX.sub(_repl_fax, line)
|
||||
|
||||
# TEL
|
||||
def _repl_tel(m: re.Match) -> str:
|
||||
@@ -1554,12 +1642,32 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
|
||||
audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"]))
|
||||
return PLACEHOLDERS["IBAN"]
|
||||
line = RE_IBAN.sub(_repl_iban, line)
|
||||
# RIB français + BIC/SWIFT (label-ancrés) → [IBAN] (même famille bancaire).
|
||||
def _repl_iban_value(m: re.Match) -> str:
|
||||
val = m.group(1)
|
||||
audit.append(PiiHit(page_idx, "IBAN", val, PLACEHOLDERS["IBAN"]))
|
||||
return m.group(0).replace(val, PLACEHOLDERS["IBAN"])
|
||||
line = RE_RIB.sub(_repl_iban_value, line)
|
||||
line = RE_BIC.sub(_repl_iban_value, line)
|
||||
|
||||
# ADELI (identifiant professionnel de santé) label-ancré → [ADELI].
|
||||
def _repl_adeli(m: re.Match) -> str:
|
||||
val = m.group(1)
|
||||
audit.append(PiiHit(page_idx, "ADELI", val, PLACEHOLDERS["ADELI"]))
|
||||
return m.group(0).replace(val, PLACEHOLDERS["ADELI"])
|
||||
line = RE_ADELI.sub(_repl_adeli, line)
|
||||
|
||||
# DATE_NAISSANCE (plus spécifique, avant DATE générique)
|
||||
def _repl_date_naissance(m: re.Match) -> str:
|
||||
audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"]))
|
||||
return PLACEHOLDERS["DATE_NAISSANCE"]
|
||||
line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line)
|
||||
# « Né en 1972 » (année seule de naissance) → [DATE_NAISSANCE]
|
||||
def _repl_date_naissance_annee(m: re.Match) -> str:
|
||||
val = m.group(1)
|
||||
audit.append(PiiHit(page_idx, "DATE_NAISSANCE", val, PLACEHOLDERS["DATE_NAISSANCE"]))
|
||||
return m.group(0).replace(val, PLACEHOLDERS["DATE_NAISSANCE"])
|
||||
line = RE_DATE_NAISSANCE_ANNEE.sub(_repl_date_naissance_annee, line)
|
||||
|
||||
# DATE générique — désactivé : seules les dates de naissance sont masquées
|
||||
# def _repl_date(m: re.Match) -> str:
|
||||
@@ -1639,6 +1747,7 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
|
||||
full = m.group(0)
|
||||
return full[:full.find(val)] + PLACEHOLDERS["ADHERENT"]
|
||||
line = RE_NUM_ADHERENT.sub(_repl_adherent, line)
|
||||
line = RE_NUM_MUTUELLE.sub(_repl_adherent, line)
|
||||
|
||||
# Établissements de santé (EHPAD Chicago, SSR Anonyme, Hôpital de Chicago, etc.)
|
||||
def _repl_etab(m: re.Match) -> str:
|
||||
@@ -1902,6 +2011,7 @@ def _mask_structured_line(line: str, audit: List[PiiHit], page_idx: int) -> str:
|
||||
masked = RE_NUMERO_DOSSIER.sub(_repl_dossier, masked)
|
||||
masked = RE_VENUE_SEJOUR.sub(_repl_venue, masked)
|
||||
masked = RE_NUM_ADHERENT.sub(_repl_adherent, masked)
|
||||
masked = RE_NUM_MUTUELLE.sub(_repl_adherent, masked)
|
||||
masked = RE_LABEL_NOM_VARIANTES.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
|
||||
masked = RE_LABEL_PRENOM.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
|
||||
masked = RE_LABEL_NOM_PROFESSIONNEL.sub(_repl_label_with_placeholder("NOM_FORCE", "NOM"), masked)
|
||||
@@ -2700,10 +2810,10 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_nam
|
||||
|
||||
|
||||
def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit], cfg: Dict[str, Any] | None = None) -> str:
|
||||
"""Applique les PiiHit non-NOM dans le texte (NDA, DOSSIER, EPISODE, RPPS, FINESS, etc.).
|
||||
"""Applique les PiiHit non-NOM dans le texte (NDA, DOSSIER, EPISODE, RPPS, FINESS, VILLE, etc.).
|
||||
Ces hits sont détectés par _extract_trackare_identity ou la phase 0c
|
||||
mais n'étaient appliqués qu'au PDF raster, pas au fichier .pseudonymise.txt."""
|
||||
_APPLY_KINDS = {"DOSSIER", "EPISODE", "FINESS", "NDA", "RPPS"}
|
||||
_APPLY_KINDS = {"DOSSIER", "EPISODE", "FINESS", "NDA", "NIR", "RPPS", "VILLE"}
|
||||
admin_rules = (cfg or {}).get("admin_rules_compiled") or {}
|
||||
for rule in admin_rules.get("detection_rules", []) or []:
|
||||
kind = rule.get("kind")
|
||||
@@ -2819,7 +2929,7 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
|
||||
# Ex: "Né(e) le :\n07/04/1943" ou "Date de naissance\n01/02/1950"
|
||||
# Variante large : tolère 0-3 lignes intermédiaires (tableaux BACTERIO)
|
||||
_RE_DATE_NAISSANCE_MULTILINE = re.compile(
|
||||
r"(?:\bn[ée]+(?:\(?e?\)?)?\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*\n"
|
||||
_RE_DATE_NAISSANCE_LABEL + r"\s*[:\-]?\s*\n"
|
||||
r"(?:[^\n]*\n){0,3}\s*"
|
||||
r"(\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4})",
|
||||
re.IGNORECASE,
|
||||
@@ -2835,6 +2945,17 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
|
||||
for m in _RE_IPP_MULTILINE.finditer(full_raw):
|
||||
audit.append(PiiHit(-1, "IPP", m.group(1), PLACEHOLDERS["IPP"]))
|
||||
|
||||
# Phase 0e-bis : NIR 13 chiffres sans clé sur la ligne suivant le label.
|
||||
# Le passage ligne par ligne ne peut pas le voir ; on capture uniquement la
|
||||
# valeur après un label fort pour éviter de masquer des références nues.
|
||||
_RE_NIR_NO_KEY_MULTILINE = re.compile(
|
||||
r"\b" + _NIR_NO_KEY_LABEL + r"\s*[:\-]?\s*\n\s*"
|
||||
r"(\d(?:[\s.\-]?\d){12})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
for m in _RE_NIR_NO_KEY_MULTILINE.finditer(full_raw):
|
||||
audit.append(PiiHit(-1, "NIR", m.group(1), PLACEHOLDERS["NIR"]))
|
||||
|
||||
# Phase 0f : numéro d'accession / d'examen en en-tête de labo ou imagerie
|
||||
# Ex:
|
||||
# N° 23L35781
|
||||
@@ -3532,21 +3653,38 @@ def _build_finess_addr_ac():
|
||||
log.warning(f"Erreur construction FINESS adresses Aho-Corasick: {e}")
|
||||
|
||||
|
||||
def _mask_finess_addresses(text: str, return_matched_names: bool = False):
|
||||
"""Masque les adresses FINESS détectées par Aho-Corasick.
|
||||
def _extend_finess_address_span(text: str, start: int, end: int) -> Tuple[int, int]:
|
||||
"""Étend un match FINESS adresse au numéro de voie et aux compléments BP/CS."""
|
||||
ext_start = start
|
||||
prefix = text[max(0, start - 15):start]
|
||||
num_match = re.search(
|
||||
r'(\d{1,4}\s*,?\s*(?:bis|ter)?\s*,?\s*'
|
||||
r'(?:(?:[-–—/]|à|au|a)\s*\d{1,4}\s*,?\s*(?:bis|ter)?\s*,?\s*)?)$',
|
||||
prefix,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if num_match:
|
||||
ext_start = start - (len(prefix) - num_match.start())
|
||||
|
||||
Utilise une normalisation avec position-map pour gérer apostrophes, points,
|
||||
et autres caractères non-alphanumériques courants dans les adresses.
|
||||
"""
|
||||
ext_end = end
|
||||
suffix = text[end:min(len(text), end + 60)]
|
||||
bp_match = re.match(
|
||||
r'(\s*(?:BP|CS)\s*\d+\s*[,.]?\s*(?:\d{5}\s*)?(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑa-zéèàùâêîôûäëïöüçñ\s\-]+(?:CEDEX)?)?)',
|
||||
suffix, re.IGNORECASE)
|
||||
if bp_match:
|
||||
ext_end = end + len(bp_match.group(1).rstrip())
|
||||
return ext_start, ext_end
|
||||
|
||||
|
||||
def _find_finess_address_spans(text: str) -> List[Tuple[int, int, str]]:
|
||||
"""Retourne les spans texte des adresses FINESS, avec extension de voie."""
|
||||
global _FINESS_ADDR_AC
|
||||
if _FINESS_ADDR_AC is None:
|
||||
_build_finess_addr_ac()
|
||||
if _FINESS_ADDR_AC is None:
|
||||
return (text, []) if return_matched_names else text
|
||||
return []
|
||||
|
||||
normalized, posmap = _normalize_addr_with_posmap(text)
|
||||
placeholder = PLACEHOLDERS.get("ADRESSE", "[ADRESSE]")
|
||||
|
||||
matches = []
|
||||
for end_idx, name in _FINESS_ADDR_AC.iter(normalized):
|
||||
start_idx = end_idx - len(name) + 1
|
||||
@@ -3568,7 +3706,7 @@ def _mask_finess_addresses(text: str, return_matched_names: bool = False):
|
||||
matches.append((orig_start, orig_end, name))
|
||||
|
||||
if not matches:
|
||||
return (text, []) if return_matched_names else text
|
||||
return []
|
||||
|
||||
# Garder les plus longs en cas de chevauchement
|
||||
matches.sort(key=lambda x: (x[0], -(x[1] - x[0])))
|
||||
@@ -3579,32 +3717,43 @@ def _mask_finess_addresses(text: str, return_matched_names: bool = False):
|
||||
deduped.append((start, end, name))
|
||||
last_end = end
|
||||
|
||||
spans = []
|
||||
for start, end, name in deduped:
|
||||
ext_start, ext_end = _extend_finess_address_span(text, start, end)
|
||||
spans.append((ext_start, ext_end, text[start:end]))
|
||||
|
||||
# Re-dédupliquer après extension.
|
||||
spans.sort(key=lambda x: (x[0], -(x[1] - x[0])))
|
||||
merged = []
|
||||
last_end = 0
|
||||
for start, end, original in spans:
|
||||
if start >= last_end:
|
||||
merged.append((start, end, original))
|
||||
last_end = end
|
||||
return merged
|
||||
|
||||
|
||||
def _mask_finess_addresses(text: str, return_matched_names: bool = False):
|
||||
"""Masque les adresses FINESS détectées par Aho-Corasick.
|
||||
|
||||
Utilise une normalisation avec position-map pour gérer apostrophes, points,
|
||||
et autres caractères non-alphanumériques courants dans les adresses.
|
||||
"""
|
||||
spans = _find_finess_address_spans(text)
|
||||
if not spans:
|
||||
return (text, []) if return_matched_names else text
|
||||
|
||||
placeholder = PLACEHOLDERS.get("ADRESSE", "[ADRESSE]")
|
||||
result = []
|
||||
matched_names = []
|
||||
last_pos = 0
|
||||
for start, end, name in deduped:
|
||||
for start, end, original_text in spans:
|
||||
if start > len(text) or end > len(text):
|
||||
continue
|
||||
original_text = text[start:end]
|
||||
matched_names.append(original_text)
|
||||
# Étendre vers la gauche pour capturer le numéro de voie (ex: "13, ")
|
||||
ext_start = start
|
||||
prefix = text[max(0, start - 15):start]
|
||||
num_match = re.search(r'(\d+\s*[,.]?\s*)$', prefix)
|
||||
if num_match:
|
||||
ext_start = start - (len(prefix) - num_match.start())
|
||||
# Étendre vers la droite pour capturer BP/CS + code postal + ville
|
||||
ext_end = end
|
||||
suffix = text[end:min(len(text), end + 60)]
|
||||
# BP/CS + numéro + éventuel code postal + ville
|
||||
bp_match = re.match(
|
||||
r'(\s*(?:BP|CS)\s*\d+\s*[,.]?\s*(?:\d{5}\s*)?(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇÑa-zéèàùâêîôûäëïöüçñ\s\-]+(?:CEDEX)?)?)',
|
||||
suffix, re.IGNORECASE)
|
||||
if bp_match:
|
||||
ext_end = end + len(bp_match.group(1).rstrip())
|
||||
result.append(text[last_pos:ext_start])
|
||||
result.append(text[last_pos:start])
|
||||
result.append(placeholder)
|
||||
last_pos = ext_end
|
||||
last_pos = end
|
||||
result.append(text[last_pos:])
|
||||
|
||||
masked = "".join(result)
|
||||
@@ -3953,10 +4102,20 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
|
||||
def _rescan_nir(m: re.Match) -> str:
|
||||
return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0)
|
||||
protected = RE_NIR.sub(_rescan_nir, protected)
|
||||
protected = RE_NIR_NO_KEY.sub(PLACEHOLDERS["NIR"], protected) # 13 chiffres label-ancré
|
||||
# FAX avant TEL pour que le numéro de fax devienne [FAX] et non [TEL].
|
||||
protected = RE_FAX.sub(PLACEHOLDERS["FAX"], protected)
|
||||
protected = RE_TEL_SLASH.sub(PLACEHOLDERS["TEL"], protected)
|
||||
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
|
||||
protected = RE_TEL_COMPACT.sub(PLACEHOLDERS["TEL"], protected)
|
||||
protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected)
|
||||
# X-L2 — identifiants jusque-là non rescannés (fuite si vus 1 fois puis répétés) :
|
||||
protected = RE_RIB.sub(PLACEHOLDERS["IBAN"], protected)
|
||||
protected = RE_BIC.sub(PLACEHOLDERS["IBAN"], protected)
|
||||
protected = RE_ADELI.sub(PLACEHOLDERS["ADELI"], protected)
|
||||
protected = RE_OGC.sub(PLACEHOLDERS["OGC"], protected)
|
||||
protected = RE_NUM_ADHERENT.sub(PLACEHOLDERS["ADHERENT"], protected)
|
||||
protected = RE_NUM_MUTUELLE.sub(PLACEHOLDERS["ADHERENT"], protected)
|
||||
# Nouvelles regex : dates de naissance, dates, adresses, codes postaux
|
||||
protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected)
|
||||
# protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) # désactivé
|
||||
@@ -3978,10 +4137,10 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
|
||||
)
|
||||
# N° RPPS
|
||||
protected = RE_RPPS.sub(PLACEHOLDERS["RPPS"], protected)
|
||||
# FINESS par gazetteer (nombres 9 chiffres matchant un vrai numéro FINESS)
|
||||
# FINESS par gazetteer (identifiants nus connus, dont Corse 2A/2B).
|
||||
if _FINESS_NUMBERS:
|
||||
def _rescan_finess(m: re.Match) -> str:
|
||||
return PLACEHOLDERS["FINESS"] if m.group(1) in _FINESS_NUMBERS else m.group(0)
|
||||
return PLACEHOLDERS["FINESS"] if m.group(1).upper() in _FINESS_NUMBERS else m.group(0)
|
||||
protected = RE_BARE_9DIGITS.sub(_rescan_finess, protected)
|
||||
# Établissements (regex)
|
||||
protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected)
|
||||
@@ -4164,6 +4323,73 @@ def _search_whole_word(page, token: str) -> list:
|
||||
return rects
|
||||
|
||||
|
||||
def _merge_text_spans(spans: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
|
||||
if not spans:
|
||||
return []
|
||||
spans = sorted(spans)
|
||||
merged = [spans[0]]
|
||||
for start, end in spans[1:]:
|
||||
prev_start, prev_end = merged[-1]
|
||||
if start <= prev_end:
|
||||
merged[-1] = (prev_start, max(prev_end, end))
|
||||
else:
|
||||
merged.append((start, end))
|
||||
return merged
|
||||
|
||||
|
||||
def _address_spans_in_text(text: str) -> List[Tuple[int, int]]:
|
||||
"""Spans d'adresses sûres dans une ligne texte.
|
||||
|
||||
Utilisé en défense supplémentaire par le caviardage PDF : si l'audit ne
|
||||
retrouve pas la chaîne exacte dans le PDF, on masque tout de même les mots
|
||||
dont la ligne porte une adresse structurée ou une adresse FINESS.
|
||||
"""
|
||||
spans = [(m.start(), m.end()) for m in RE_ADRESSE.finditer(text)]
|
||||
spans.extend((start, end) for start, end, _ in _find_finess_address_spans(text))
|
||||
return _merge_text_spans(spans)
|
||||
|
||||
|
||||
def _page_word_lines(page) -> List[Tuple[str, List[Tuple[int, int, "fitz.Rect"]]]]:
|
||||
"""Reconstruit les lignes PDF en texte + spans de mots vers rectangles."""
|
||||
if fitz is None:
|
||||
return []
|
||||
words = page.get_text("words") or []
|
||||
grouped: Dict[Tuple[int, int], list] = {}
|
||||
for w in words:
|
||||
grouped.setdefault((w[5], w[6]), []).append(w)
|
||||
|
||||
lines = []
|
||||
ordered_groups = sorted(grouped.values(), key=lambda ws: (min(w[1] for w in ws), min(w[0] for w in ws)))
|
||||
for line_words in ordered_groups:
|
||||
ordered = sorted(line_words, key=lambda w: (w[7], w[0]))
|
||||
parts = []
|
||||
spans = []
|
||||
pos = 0
|
||||
for w in ordered:
|
||||
if parts:
|
||||
parts.append(" ")
|
||||
pos += 1
|
||||
token = str(w[4])
|
||||
start = pos
|
||||
parts.append(token)
|
||||
pos += len(token)
|
||||
spans.append((start, pos, fitz.Rect(w[0], w[1], w[2], w[3])))
|
||||
lines.append(("".join(parts), spans))
|
||||
return lines
|
||||
|
||||
|
||||
def _search_pdf_address_lines(page) -> list:
|
||||
"""Défense PDF directe pour les adresses structurées visibles sur la page."""
|
||||
rects = []
|
||||
for line_text, word_spans in _page_word_lines(page):
|
||||
for start, end in _address_spans_in_text(line_text):
|
||||
for word_start, word_end, rect in word_spans:
|
||||
if word_end <= start or word_start >= end:
|
||||
continue
|
||||
rects.append(fitz.Rect(rect.x0 - 1, rect.y0 - 1, rect.x1 + 1, rect.y1 + 1))
|
||||
return rects
|
||||
|
||||
|
||||
def _search_labeled_identifier_value(page, label: str, token: str) -> list:
|
||||
"""Cherche une valeur courte uniquement sur une ligne portant son label.
|
||||
|
||||
@@ -4260,11 +4486,11 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, oc
|
||||
for pno in range(len(doc)):
|
||||
page = doc[pno]
|
||||
hits = by_page.get(pno, []) + by_page.get(-1, [])
|
||||
if not hits:
|
||||
all_rects = _search_pdf_address_lines(page)
|
||||
if not hits and not all_rects:
|
||||
continue
|
||||
# Dédupliquer les tokens : (token, kind) → rechercher une seule fois par page
|
||||
seen_tokens: set = set()
|
||||
all_rects = []
|
||||
for h in hits:
|
||||
token = h.original.strip()
|
||||
if not token:
|
||||
@@ -4432,6 +4658,7 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
|
||||
rects.append(fitz.Rect(margin, margin, page.rect.width - margin, page.rect.height - margin))
|
||||
all_rects[pno] = rects
|
||||
continue
|
||||
rects = _search_pdf_address_lines(page)
|
||||
for h in hits:
|
||||
token = h.original.strip()
|
||||
if not token or h.kind in _RASTER_SKIP_KINDS:
|
||||
@@ -4923,13 +5150,15 @@ def process_pdf(
|
||||
# anon.audit.append(PiiHit(page=-1, kind="NOM_GLOBAL", original=token, placeholder=PLACEHOLDERS["NOM"]))
|
||||
|
||||
# 4b) Propagation globale SÉLECTIVE : uniquement pour les PII critiques
|
||||
# Les PII critiques (DATE_NAISSANCE, NIR, IPP, EMAIL) sont propagés sur toutes les pages
|
||||
# pour éviter les fuites sur les documents multi-pages (ex: CRO)
|
||||
# Les PII critiques (NIR, IPP, EMAIL, etc.) sont propagés sur toutes les pages
|
||||
# pour éviter les fuites sur les documents multi-pages (ex: CRO). Les villes
|
||||
# sont propagées uniquement après détection confirmée (label/contexte), sans
|
||||
# réactiver un masquage global de toutes les communes du texte.
|
||||
# (v11.5 P0) DATE_NAISSANCE retiré de la propagation globale : on ne masque
|
||||
# plus une date nue sur tout le document (ni texte, ni audit, ni PDF/raster).
|
||||
# La DDN reste masquée en contexte fort, page par page (RE_DATE_NAISSANCE +
|
||||
# multiligne). Cela évite de masquer une date clinique égale à la DDN.
|
||||
_CRITICAL_PII_TYPES = {"NIR", "IPP", "EMAIL", "force_term", "force_regex", "FINESS", "DOSSIER", "NDA", "EPISODE"}
|
||||
_CRITICAL_PII_TYPES = {"NIR", "IPP", "EMAIL", "force_term", "force_regex", "FINESS", "DOSSIER", "NDA", "EPISODE", "VILLE", "ADHERENT", "OGC", "ADELI", "FAX"}
|
||||
|
||||
_global_pii: Dict[str, set] = {}
|
||||
for h in anon.audit:
|
||||
|
||||
Reference in New Issue
Block a user