From 23e19e17e442b9dabf1ee150a568606e30cf9fe2 Mon Sep 17 00:00:00 2001 From: Domi31tls Date: Tue, 31 Mar 2026 08:31:44 +0200 Subject: [PATCH] feat(ner-first): add NER-first architecture scaffolding (steps 1-4) Add infrastructure for NER-first name validation without changing existing behavior. New code only, quality score remains 100/100. Step 1: Load INSEE family names (219K) and prenoms (33K) as module-level gazetteers (_INSEE_NOMS_FAMILLE, _INSEE_PRENOMS_SET) normalized uppercase without accents. Step 2: Add _run_ner_on_original_text() that runs all available NER models (EDS-Pseudo, GLiNER, CamemBERT-bio) on unmasked text and returns deduplicated NerDetection list. Step 3: Add NerDetection and NameCandidate dataclasses. Modify _extract_document_names and _extract_trackare_identity to also return NameCandidate lists with context_strength (high/medium/low) metadata. Callers updated for new return values. Step 4: Add _cross_validate_name_candidates() implementing decision matrix: high context always accepted, medium/low validated against NER confirmations, INSEE membership, and stopword filtering. Co-Authored-By: Claude Opus 4.6 (1M context) --- anonymizer_core_refactored_onnx.py | 474 ++++++++++++++++++++++++----- 1 file changed, 396 insertions(+), 78 deletions(-) diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index dbd8210..240a2e1 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -151,6 +151,50 @@ def _load_insee_gazetteers(): _load_insee_gazetteers() +# ----------------- Gazetteers INSEE noms de famille + prénoms (uppercase, sans accents) --------- +_INSEE_NOMS_FAMILLE: set = set() +_INSEE_PRENOMS_SET: set = set() + +def _normalize_nfkd_upper(s: str) -> str: + """Supprime les accents et met en majuscules (pour matching INSEE).""" + import unicodedata + return "".join( + c for c in unicodedata.normalize("NFD", s) + if unicodedata.category(c) != "Mn" + ).upper() + +def _load_insee_noms_prenoms(): + """Charge noms de famille et prénoms INSEE, normalisés uppercase sans accents.""" + global _INSEE_NOMS_FAMILLE, _INSEE_PRENOMS_SET + data_dir = Path(__file__).parent / "data" / "insee" + + noms_path = data_dir / "noms_famille_france.txt" + if noms_path.exists(): + try: + _INSEE_NOMS_FAMILLE = { + _normalize_nfkd_upper(line.strip()) + for line in noms_path.read_text(encoding="utf-8").splitlines() + if line.strip() and len(line.strip()) >= 3 + } + log.info(f"Gazetteers INSEE noms de famille: {len(_INSEE_NOMS_FAMILLE)} entrées") + except Exception as e: + log.warning(f"Erreur chargement noms de famille INSEE: {e}") + + prenoms_path = data_dir / "prenoms_france.txt" + if prenoms_path.exists(): + try: + _INSEE_PRENOMS_SET = { + _normalize_nfkd_upper(line.strip()) + for line in prenoms_path.read_text(encoding="utf-8").splitlines() + if line.strip() and len(line.strip()) >= 3 + } + log.info(f"Gazetteers INSEE prénoms (set): {len(_INSEE_PRENOMS_SET)} entrées") + except Exception as e: + log.warning(f"Erreur chargement prénoms INSEE (set): {e}") + +_load_insee_noms_prenoms() + + # ----------------- Gazetteer FINESS (établissements de santé) ----------------- _FINESS_NUMBERS: set = set() # numéros FINESS 9 chiffres _FINESS_ETAB_NAMES: set = set() # noms d'établissements (lowercase) @@ -1061,6 +1105,25 @@ class AnonResult: audit: List[PiiHit] = field(default_factory=list) is_trackare: bool = False +@dataclass +class NerDetection: + """Détection NER sur le texte original (non masqué). + Utilisé par l'architecture NER-first pour la validation croisée des noms.""" + token: str + label: str # NOM, PRENOM, HOPITAL, VILLE, LOC, ORG + score: float + page_idx: int + source: str # "eds_pseudo", "gliner", "camembert_bio" + +@dataclass +class NameCandidate: + """Candidat de nom extrait par regex avec métadonnées de confiance. + Utilisé pour la validation croisée NER-first.""" + token: str + source: str # regex ou champ qui a capturé ce nom + context_strength: str # "high", "medium", "low" + bypass_stopwords: bool # ce que force_names signifie actuellement + # ----------------- Config loader ----------------- def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]: @@ -1708,21 +1771,36 @@ def _is_trackare_document(text: str) -> bool: return sum(1 for m in markers if m.lower() in t) >= 2 -def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: +def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit], set, List[NameCandidate]]: """Parse les champs structurés d'un document Trackare pour extraire les PII. - Retourne (name_tokens, pii_hits) avec les noms à masquer et les hits additionnels.""" + Retourne (name_tokens, pii_hits, force_names, candidates) avec les noms à masquer, + les hits additionnels, le sous-ensemble force_names, et les NameCandidate + pour la validation croisée NER-first.""" names: set = set() hits: List[PiiHit] = [] + candidates: List[NameCandidate] = [] force_names: set = set() # noms issus de contextes structurés (DR., Signé, etc.) → bypass stop words - def _add_name(s: str): + def _add_candidate(token: str, source: str, strength: str, bypass: bool): + """Ajoute un NameCandidate à la liste.""" + token = token.strip(" .-'(),") + if len(token) < 4: + return + candidates.append(NameCandidate( + token=token, source=source, + context_strength=strength, bypass_stopwords=bypass, + )) + + def _add_name(s: str, _cand_source: str = "", _cand_strength: str = "medium"): s = s.strip() parts = s.split() for tok in parts: tok = tok.strip(" .-'(),") if len(tok) >= 4 and tok[0].isupper(): names.add(tok) + if _cand_source: + _add_candidate(tok, _cand_source, _cand_strength, False) # Garder aussi le nom composé complet (DI LULLO, LE MOIGNE, etc.) if len(parts) >= 2: compound = " ".join(t.strip(" .-'(),") for t in parts if len(t.strip(" .-'(),")) >= 2) @@ -1738,12 +1816,14 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: "préremplie", } - def _add_name_force(tok: str): + def _add_name_force(tok: str, _cand_source: str = "", _cand_strength: str = "medium"): """Ajoute un nom depuis un contexte structuré fiable (DR., Signé direct, Note d'évolution). Bypass les stop words généraux mais filtre médicaments et termes de soins courants.""" tok = tok.strip(" .-'(),") if len(tok) < 4 or not tok[0].isupper(): return + if _cand_source: + _add_candidate(tok, _cand_source, _cand_strength, True) if tok.lower() in _FORCE_EXCLUDE: return # Filtre supplémentaire : ne pas force-add les mots médicaux connus @@ -1752,18 +1832,18 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: names.add(tok) force_names.add(tok) - # --- Identité patient --- + # --- Identité patient (high context: DPI structured fields) --- # Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire) for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE): - _add_name(m.group(1).strip()) + _add_name(m.group(1).strip(), "trackare_nom_naissance", "high") # Nom et Prénom: DIEGO PATRICIA for m in re.finditer(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE): - _add_name(m.group(1).strip()) + _add_name(m.group(1).strip(), "trackare_nom_prenom", "high") # Prénom de naissance / Prénom utilisé : REGINA for m in re.finditer(r"Pr[ée]nom\s+(?:de\s+naissance|utilis[ée])\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): - _add_name(m.group(1).strip()) + _add_name(m.group(1).strip(), "trackare_prenom", "high") # Lieu de naissance: BAYONNE, biarritz, 64102, 99999 → masquer comme VILLE for m in re.finditer(r"Lieu\s+de\s+naissance\s*:\s*(\S[^\n]*?)(?:\s*$)", full_text, re.MULTILINE): @@ -1798,19 +1878,19 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if len(val) > 3: hits.append(PiiHit(-1, "ADRESSE", val, PLACEHOLDERS["ADRESSE"])) - # --- Pied de page : "Patient : NOM PRENOM - Date de naissance..." --- + # --- Pied de page : "Patient : NOM PRENOM - Date de naissance..." (high context) --- for m in re.finditer(r"Patient\s*:\s*(.+?)\s*-\s*Date\s+de\s+naissance", full_text): - _add_name(m.group(1).strip()) + _add_name(m.group(1).strip(), "trackare_patient_footer", "high") - # --- Médecin courant (toutes occurrences) --- + # --- Médecin courant (toutes occurrences) (medium context) --- for m in re.finditer(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE): - _add_name(m.group(1).strip()) + _add_name(m.group(1).strip(), "trackare_medecin_courant", "medium") - # --- Médecin traitant (ligne après "Nom Adresse Téléphone") --- + # --- Médecin traitant (ligne après "Nom Adresse Téléphone") (medium context) --- for m in re.finditer(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE): - _add_name(m.group(1).strip()) + _add_name(m.group(1).strip(), "trackare_medecin_traitant", "medium") - # --- Contacts structurés --- + # --- Contacts structurés (medium context) --- # Pattern: Relation NOM PRENOM [ADRESSE] [TEL] # Accepte les minuscules (Trackare écrit parfois "Conjoint vandestock michele") # Capture jusqu'à 3 tokens pour les noms composés (le moigne christophe) @@ -1825,6 +1905,7 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: contact_parts = [g.strip(" .-'(),") for g in (m.group(1), m.group(2), m.group(3)) if g] # Ajouter chaque token >= 4 chars (pas les articles courts comme "le", "di", ni acronymes 3 lettres) for tok in contact_parts: + _add_candidate(tok, "trackare_contact", "medium", False) if len(tok) >= 4 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: names.add(tok) if tok[0].islower(): @@ -1856,7 +1937,7 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if tok[0].islower(): names.add(tok.capitalize()) - # --- Prescripteurs / Exécutants (trackare) --- + # --- Prescripteurs / Exécutants (trackare) (medium context) --- for m in re.finditer( r"(?:Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" r"(?:(?:Dr|Pr)\.?\s+)?" @@ -1864,24 +1945,24 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+))?", full_text, ): - _add_name(m.group(1)) + _add_name(m.group(1), "trackare_prescripteur", "medium") if m.group(2): - _add_name(m.group(2)) + _add_name(m.group(2), "trackare_prescripteur", "medium") - # --- Médecins urgences (IAO, prise en charge, décision) --- + # --- Médecins urgences (IAO, prise en charge, décision) (medium context) --- for m in re.finditer(r"IAO\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)", full_text): - _add_name(m.group(1)) + _add_name(m.group(1), "trackare_iao", "medium") for m in re.finditer( r"Médecin\s+de\s+la\s+(?:prise\s+en\s+charge|décision)\s+médicale\s+" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+))?", full_text, ): - _add_name(m.group(1)) + _add_name(m.group(1), "trackare_medecin_urgences", "medium") if m.group(2): - _add_name(m.group(2)) + _add_name(m.group(2), "trackare_medecin_urgences", "medium") - # --- Noms soignants dans les Notes d'évolution / Notes IDE / Notes médicales --- + # --- Noms soignants dans les Notes d'évolution / Notes IDE / Notes médicales (low context) --- # Pattern: "Note IDE\nPrenom NOM" ou "Note d'évolution\nPrenom NOM" for m in re.finditer( r"Note\s+(?:IDE|AS|d'[ée]volution|m[ée]dicale|kin[ée])\s*\n\s*" @@ -1890,22 +1971,26 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: full_text ): prenom, nom = m.group(1), m.group(2) + _add_candidate(prenom, "trackare_note_ide_newline", "low", False) + _add_candidate(nom, "trackare_note_ide_newline", "low", False) if prenom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(prenom) if nom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(nom) - # --- Noms soignants multi-lignes : "Prénom\nNOM" dans les tableaux de prescriptions/soins --- + # --- Noms soignants multi-lignes : "Prénom\nNOM" dans les tableaux de prescriptions/soins (low context) --- for m in re.finditer( r'\b([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{3,})\s*\n\s*([A-ZÉÈÀÙÂÊÎÔÛ]{4,})\b', full_text ): prenom, nom = m.group(1), m.group(2) + _add_candidate(prenom, "trackare_prenom_nom_multiline", "low", False) + _add_candidate(nom, "trackare_prenom_nom_multiline", "low", False) if prenom.lower() not in _MEDICAL_STOP_WORDS_SET and nom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(prenom) _add_name(nom) - # --- Noms soignants sur la même ligne que "Note d'évolution" (ex: "Note d'évolution LACLAU-") --- + # --- Noms soignants sur la même ligne que "Note d'évolution" (ex: "Note d'évolution LACLAU-") (low context) --- for m in re.finditer( r"Note[ \t]+(?:IDE|AS|d'[ée]volution|m[ée]dicale|kin[ée])[ \t]+" r"(?:DR\.?[ \t]+)?" @@ -1917,9 +2002,9 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if g: tok = g.rstrip('-') if len(tok) >= 4: - _add_name_force(tok) + _add_name_force(tok, "trackare_note_ide_inline", "low") - # --- "Signé" suivi directement d'un nom de soignant (ex: "Signé LARRIEU-") --- + # --- "Signé" suivi directement d'un nom de soignant (ex: "Signé LARRIEU-") (low context) --- # IMPORTANT: [ \t]+ (pas \s+) pour éviter de capturer les médicaments sur la ligne suivante for m in re.finditer( r"Signé[ \t]+(?!—|par\b)([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+)" @@ -1930,9 +2015,9 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if g: tok = g.rstrip('-') if len(tok) >= 4: - _add_name_force(tok) + _add_name_force(tok, "trackare_signe_direct", "low") - # --- "Signé —" + médicament + nom soignant (ex: "Signé — PARACETAMOL BBM 1000 MG INJ NARZABAL") --- + # --- "Signé —" + médicament + nom soignant (ex: "Signé — PARACETAMOL BBM 1000 MG INJ NARZABAL") (low context) --- for m in re.finditer( r"Signé[ \t]+—[ \t]+.*(?:INJ|COMP|GEL|PDR|SOL|PERF|SUSP|CAPS|CREM|SACHET|SIROP)[ \t]+[-]?[ \t]*" r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]{3,})" @@ -1942,20 +2027,22 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: for g in (m.group(1), m.group(2)): if g: tok = g.rstrip('-') + _add_candidate(tok, "trackare_signe_med", "low", False) if len(tok) >= 4 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(tok) - # --- Noms soignants après conditionnement médicament (ex: "Flacon(s) LACROUTS") --- + # --- Noms soignants après conditionnement médicament (ex: "Flacon(s) LACROUTS") (low context) --- for m in re.finditer( r"(?:Flacon|Ampoule|Seringue|Poche|Comprim[ée]|Gélule|Sachet)(?:\(s\))?[ \t]+" r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]{3,})", full_text ): tok = m.group(1).rstrip('-') + _add_candidate(tok, "trackare_flacon", "low", False) if len(tok) >= 4 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(tok) - # --- "DR." / "DR" suivi d'un prénom seul (ex: "DR. Ute", "DR. Tam") dans les prescriptions --- + # --- "DR." / "DR" suivi d'un prénom seul (ex: "DR. Ute", "DR. Tam") dans les prescriptions (medium context) --- for m in re.finditer( r"DR\.?[ \t]+([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{3,})" r"(?:[ \t]+([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+))?", @@ -1965,9 +2052,9 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if g: tok = g.strip() if len(tok) >= 4: - _add_name_force(tok) + _add_name_force(tok, "trackare_dr", "medium") - # --- Noms soignants après timestamps dans activités de soins (ex: "07:00 ETCHEBARNE") --- + # --- Noms soignants après timestamps dans activités de soins (ex: "07:00 ETCHEBARNE") (low context) --- # Format Trackare : actions de soins suivies de "HH:MM NOM" ou "HH : MM NOM" # Pattern restrictif : nom ALL-CAPS de 4+ lettres, filtre stop words (pattern bruyant) for m in re.finditer( @@ -1979,6 +2066,7 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: for g in (m.group(1), m.group(2)): if g: tok = g.rstrip('-') + _add_candidate(tok, "trackare_timestamp", "low", False) if len(tok) >= 4 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(tok) @@ -1996,18 +2084,30 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: continue filtered.add(tok) - return filtered, hits, force_names + return filtered, hits, force_names, candidates -def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, set]: +def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, set, List[NameCandidate]]: """Pré-scan du document brut pour extraire les noms de personnes depuis les champs structurés (Patient, Rédigé par, etc.). - Retourne (names, force_names) : ensemble de tokens à masquer, - et sous-ensemble qui bypass les stop words.""" + Retourne (names, force_names, candidates) : ensemble de tokens à masquer, + sous-ensemble qui bypass les stop words, et liste de NameCandidate + avec métadonnées de confiance pour la validation croisée NER-first.""" wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) names: set = set() force_names: set = set() + candidates: List[NameCandidate] = [] + + def _add_candidate(token: str, source: str, strength: str, bypass: bool): + """Ajoute un NameCandidate à la liste (dédupliqué par token+source).""" + token = token.strip(" .-'") + if len(token) < 4: + return + candidates.append(NameCandidate( + token=token, source=source, + context_strength=strength, bypass_stopwords=bypass, + )) def _add_compound(match_str: str): """Ajoute le nom composé complet en plus des tokens individuels (DI LULLO, LE MOIGNE).""" @@ -2017,7 +2117,7 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s if len(compound) >= 5: names.add(compound) - def _add_tokens(match_str: str): + def _add_tokens(match_str: str, _cand_source: str = "", _cand_strength: str = "medium"): _add_compound(match_str) for token in match_str.split(): token = token.strip(" .-'") @@ -2025,11 +2125,13 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s continue if token.upper() in wl_sections or token in wl_phrases: continue + if _cand_source: + _add_candidate(token, _cand_source, _cand_strength, False) if token.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(token) - def _add_tokens_force_all(match_str: str): + def _add_tokens_force_all(match_str: str, _cand_source: str = "", _cand_strength: str = "high"): """Bypass stop words pour TOUS les tokens (contexte Patient: très fiable).""" _add_compound(match_str) for token in match_str.split(): @@ -2038,10 +2140,12 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s continue if token.upper() in wl_sections or token in wl_phrases: continue + if _cand_source: + _add_candidate(token, _cand_source, _cand_strength, True) names.add(token) force_names.add(token) - def _add_tokens_force_first(match_str): + def _add_tokens_force_first(match_str, _cand_source: str = "", _cand_strength: str = "medium"): """Comme _add_tokens mais force TOUS les tokens (contexte Dr/Mme fort). Après Dr/Mme, tous les tokens sont des noms — même s'ils sont @@ -2055,54 +2159,58 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s continue if token.upper() in wl_sections or token in wl_phrases: continue + if _cand_source: + _add_candidate(token, _cand_source, _cand_strength, True) names.add(token) force_names.add(token) + # --- high context: DPI structured fields (near-certain patient identity) --- for m in RE_EXTRACT_PATIENT.finditer(full_text): - _add_tokens_force_all(m.group(1)) - for m in RE_EXTRACT_REDIGE.finditer(full_text): - _add_tokens(m.group(1)) - for m in RE_EXTRACT_MME_MR.finditer(full_text): - _add_tokens_force_first(m.group(1)) - for m in RE_EXTRACT_DR_DEST.finditer(full_text): - _add_tokens_force_first(m.group(1)) + _add_tokens_force_all(m.group(1), "RE_EXTRACT_PATIENT", "high") # Champs d'identité structurés (trackare / DPI) for m in RE_EXTRACT_NOM_NAISSANCE.finditer(full_text): - _add_tokens_force_all(m.group(1)) + _add_tokens_force_all(m.group(1), "RE_EXTRACT_NOM_NAISSANCE", "high") for m in RE_EXTRACT_NOM_UTILISE.finditer(full_text): - _add_tokens_force_all(m.group(1)) + _add_tokens_force_all(m.group(1), "RE_EXTRACT_NOM_UTILISE", "high") for m in RE_EXTRACT_NOM_PRENOM.finditer(full_text): - _add_tokens_force_all(m.group(1)) + _add_tokens_force_all(m.group(1), "RE_EXTRACT_NOM_PRENOM", "high") for m in RE_EXTRACT_PRENOM.finditer(full_text): - _add_tokens_force_all(m.group(1)) - for m in RE_EXTRACT_LIEU_NAISSANCE.finditer(full_text): - _add_tokens(m.group(1)) - for m in RE_EXTRACT_VILLE_RESIDENCE.finditer(full_text): - _add_tokens(m.group(1)) - # Contacts structurés (conjoint, concubin, etc.) - for m in RE_EXTRACT_CONTACT.finditer(full_text): - _add_tokens(m.group(1)) - if m.group(2): - _add_tokens(m.group(2)) - # Personnel médical avec rôle (Aide, Cadre Infirmier, Prescripteur, etc.) - for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text): - _add_tokens(m.group(1)) - # Pr / Professeur + nom(s) - for m in RE_EXTRACT_PR.finditer(full_text): - _add_tokens_force_first(m.group(1)) - # Opérateur / Anesthésiste / Chirurgien + nom(s) - for m in RE_EXTRACT_OPERATEUR.finditer(full_text): - _add_tokens_force_first(m.group(1)) - # Nom de cabinet (ex: "CABINET ETXEBARNONDOA") - for m in RE_EXTRACT_CABINET.finditer(full_text): - _add_tokens(m.group(1)) + _add_tokens_force_all(m.group(1), "RE_EXTRACT_PRENOM", "high") # En-tête "Courrier Epi - NOM, PRENOM" (lettres de sortie) for m in RE_EXTRACT_COURRIER.finditer(full_text): - # Format "NOM, PRENOM" : chaque partie est un token de nom for part in m.group(1).split(","): part = part.strip() if part: - _add_tokens_force_all(part) + _add_tokens_force_all(part, "RE_EXTRACT_COURRIER", "high") + + # --- medium context: medical titles (Dr, Mme, Pr, Opérateur, etc.) --- + for m in RE_EXTRACT_REDIGE.finditer(full_text): + _add_tokens(m.group(1), "RE_EXTRACT_REDIGE", "medium") + for m in RE_EXTRACT_MME_MR.finditer(full_text): + _add_tokens_force_first(m.group(1), "RE_EXTRACT_MME_MR", "medium") + for m in RE_EXTRACT_DR_DEST.finditer(full_text): + _add_tokens_force_first(m.group(1), "RE_EXTRACT_DR_DEST", "medium") + for m in RE_EXTRACT_LIEU_NAISSANCE.finditer(full_text): + _add_tokens(m.group(1), "RE_EXTRACT_LIEU_NAISSANCE", "medium") + for m in RE_EXTRACT_VILLE_RESIDENCE.finditer(full_text): + _add_tokens(m.group(1), "RE_EXTRACT_VILLE_RESIDENCE", "medium") + # Contacts structurés (conjoint, concubin, etc.) + for m in RE_EXTRACT_CONTACT.finditer(full_text): + _add_tokens(m.group(1), "RE_EXTRACT_CONTACT", "medium") + if m.group(2): + _add_tokens(m.group(2), "RE_EXTRACT_CONTACT", "medium") + # Personnel médical avec rôle (Aide, Cadre Infirmier, Prescripteur, etc.) + for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text): + _add_tokens(m.group(1), "RE_EXTRACT_STAFF_ROLE", "medium") + # Pr / Professeur + nom(s) + for m in RE_EXTRACT_PR.finditer(full_text): + _add_tokens_force_first(m.group(1), "RE_EXTRACT_PR", "medium") + # Opérateur / Anesthésiste / Chirurgien + nom(s) + for m in RE_EXTRACT_OPERATEUR.finditer(full_text): + _add_tokens_force_first(m.group(1), "RE_EXTRACT_OPERATEUR", "medium") + # Nom de cabinet (ex: "CABINET ETXEBARNONDOA") + for m in RE_EXTRACT_CABINET.finditer(full_text): + _add_tokens(m.group(1), "RE_EXTRACT_CABINET", "medium") # Extraction des noms dans les listes virgulées après Dr/Docteur ou Mmes/Mme # ex: "le Dr DUVAL, MACHELART, CHARLANNE, LAZARO, il a été proposé" @@ -2117,6 +2225,7 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s continue if tok.upper() in wl_sections or tok in wl_phrases: continue + _add_candidate(tok, "RE_DR_COMMA_LIST", "medium", False) if tok.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(tok) @@ -2138,6 +2247,7 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s continue if tok.upper() in wl_sections or tok in wl_phrases: continue + _add_candidate(tok, "RE_CIVILITE_COMMA_LIST", "medium", False) if tok.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(tok) @@ -2150,6 +2260,7 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s for m in _RE_DR_INITIAL_DOT_NAME.finditer(full_text): names.add(m.group(2)) # Le nom (ELLIE) force_names.add(m.group(2)) + _add_candidate(m.group(2), "DR_INITIAL_DOT_NAME", "medium", True) # Ajouter aussi "X.NOM" complet pour le raster (token collé) names.add(f"{m.group(1)}.{m.group(2)}") force_names.add(f"{m.group(1)}.{m.group(2)}") @@ -2163,7 +2274,7 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s re.MULTILINE ) for m in _RE_EMAIL_HEADER.finditer(full_text): - _add_tokens_force_all(m.group(1)) + _add_tokens_force_all(m.group(1), "EMAIL_HEADER", "medium") # Pour les noms composés avec tiret (ex: "LACLAU-LACROUTS"), # ajouter aussi les parties individuelles pour capturer les occurrences standalone. @@ -2179,7 +2290,87 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> Tuple[set, s names.add(part) force_names.add(part) - return names, force_names + return names, force_names, candidates + + +def _cross_validate_name_candidates( + candidates: List[NameCandidate], + ner_detections: List["NerDetection"], + insee_noms: set, + insee_prenoms: set, + medical_stopwords: set, +) -> Tuple[set, set]: + """Valide les candidats de noms extraits par regex en croisant avec les détections NER + et les gazetteers INSEE. + + Matrice de décision : + - high context → toujours accepter (champs DPI structurés, quasi-certains) + - medium context + NER confirmé → accepter + - medium context + non NER + INSEE + non stopword → accepter + - medium context + non NER + non INSEE + stopword → REJETER + - medium context + non NER + non INSEE + non stopword → accepter (bénéfice du doute) + - low context + NER confirmé → accepter + - low context + non NER + INSEE + non stopword → accepter + - low context + non NER + stopword → REJETER + - low context + non NER + non INSEE → REJETER + + Un nom est "NER confirmé" si une NerDetection a un token correspondant (case-insensitive). + + Returns: + (validated_names, validated_force_names) : ensembles de tokens validés. + """ + # Construire le set de tokens confirmés par NER (uppercase sans accents, pour matching) + ner_confirmed_tokens: set = set() + for det in ner_detections: + ner_confirmed_tokens.add(_normalize_nfkd_upper(det.token)) + + validated_names: set = set() + validated_force_names: set = set() + + for cand in candidates: + tok = cand.token + tok_upper = _normalize_nfkd_upper(tok) + tok_lower = tok.lower() + + is_ner_confirmed = tok_upper in ner_confirmed_tokens + is_in_insee = tok_upper in insee_noms or tok_upper in insee_prenoms + is_stopword = tok_lower in medical_stopwords + + strength = cand.context_strength + + accepted = False + + if strength == "high": + # Toujours accepter les champs DPI structurés + accepted = True + elif strength == "medium": + if is_ner_confirmed: + accepted = True + elif is_in_insee and not is_stopword: + accepted = True + elif not is_in_insee and is_stopword: + accepted = False # REJETER + else: + # non NER + non INSEE + non stopword → bénéfice du doute + accepted = True + elif strength == "low": + if is_ner_confirmed: + accepted = True + elif is_in_insee and not is_stopword: + accepted = True + elif is_stopword: + accepted = False # REJETER + elif not is_in_insee: + accepted = False # REJETER + else: + accepted = False + + if accepted: + validated_names.add(tok) + if cand.bypass_stopwords: + validated_force_names.add(tok) + + return validated_names, validated_force_names def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_names: set = None) -> str: @@ -2262,15 +2453,17 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] full_raw = "\n".join(pages_text) + "\n" + "\n".join( "\n".join(rows) for rows in tables_lines ) - extracted_names, doc_force_names = _extract_document_names(full_raw, cfg) + extracted_names, doc_force_names, doc_candidates = _extract_document_names(full_raw, cfg) # Phase 0b : si document Trackare, extraction renforcée des PII structurés is_trackare = _is_trackare_document(full_raw) trackare_force_names: set = set() + all_candidates: List[NameCandidate] = list(doc_candidates) if is_trackare: - trackare_names, trackare_hits, trackare_force_names = _extract_trackare_identity(full_raw) + trackare_names, trackare_hits, trackare_force_names, trackare_candidates = _extract_trackare_identity(full_raw) extracted_names.update(trackare_names) audit.extend(trackare_hits) + all_candidates.extend(trackare_candidates) # Fusionner les force_names des deux sources all_force_names = doc_force_names | trackare_force_names @@ -2570,6 +2763,131 @@ def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: " final = "".join(rebuilt_list) return final, hits + +# ----------------- NER-first: run NER on original (unmasked) text ----------------- + +def _run_ner_on_original_text( + pages_text: List[str], + eds_pseudo_mgr=None, + gliner_mgr=None, + camembert_mgr=None, + cfg: Dict[str, Any] = None, +) -> List[NerDetection]: + """Exécute les modèles NER disponibles sur le texte original (non masqué) + et retourne une liste dédupliquée de NerDetection. + + Cette fonction est conçue pour l'architecture NER-first : on fait tourner + les NER *avant* le masquage regex, afin de pouvoir valider les candidats + de noms extraits par regex avec les signaux NER. + + Paramètres: + pages_text: texte original par page (non masqué) + eds_pseudo_mgr: instance EdsPseudoManager (optionnelle) + gliner_mgr: instance GlinerManager (optionnelle) + camembert_mgr: instance CamembertNerManager (optionnelle) + cfg: configuration (non utilisée pour l'instant, réservée) + + Returns: + Liste de NerDetection dédupliquée (par token+label+page+source). + """ + detections: List[NerDetection] = [] + seen: set = set() # (token_lower, label, page_idx, source) pour dédoublonnage + + def _add_detection(token: str, label: str, score: float, page_idx: int, source: str): + """Ajoute une détection si non déjà vue.""" + key = (token.lower(), label, page_idx, source) + if key not in seen: + seen.add(key) + detections.append(NerDetection( + token=token, label=label, score=score, + page_idx=page_idx, source=source, + )) + + # Mapping des labels NER vers labels normalisés + _EDS_LABEL_NORM = { + "NOM": "NOM", "PRENOM": "PRENOM", + "HOPITAL": "HOPITAL", "VILLE": "VILLE", + "ADRESSE": "LOC", "ZIP": "LOC", + } + _GLINER_LABEL_NORM = { + "person_name": "NOM", "hospital": "HOPITAL", + "city": "VILLE", "postal_address": "LOC", + } + _CAMEMBERT_LABEL_NORM = { + "PER": "NOM", "HOPITAL": "HOPITAL", + "VILLE": "VILLE", "ADRESSE": "LOC", "ZIP": "LOC", + } + + for page_idx, page_text in enumerate(pages_text): + if not page_text.strip(): + continue + + # Découper en paragraphes (comme apply_eds_pseudo_on_narrative) + paras = [p for p in re.split(r"\n\s*\n", page_text) if p.strip()] + + # --- EDS-Pseudo --- + if eds_pseudo_mgr is not None and hasattr(eds_pseudo_mgr, 'infer_paragraphs') and eds_pseudo_mgr.is_loaded(): + try: + ents_per_para = eds_pseudo_mgr.infer_paragraphs(paras) + for para_ents in ents_per_para: + for ent in para_ents: + raw_label = ent.get("entity_group", "") + norm_label = _EDS_LABEL_NORM.get(raw_label) + if norm_label: + _add_detection( + token=ent.get("word", ""), + label=norm_label, + score=ent.get("score", 0.0), + page_idx=page_idx, + source="eds_pseudo", + ) + except Exception as e: + log.warning(f"_run_ner_on_original_text: EDS-Pseudo erreur page {page_idx}: {e}") + + # --- GLiNER --- + if gliner_mgr is not None and hasattr(gliner_mgr, 'predict') and gliner_mgr.is_loaded(): + try: + # GLiNER sur le texte complet de la page (pas par paragraphe) + gliner_ents = gliner_mgr.predict(page_text, threshold=0.4) + for ent in gliner_ents: + raw_label = ent.get("label", "") + norm_label = _GLINER_LABEL_NORM.get(raw_label) + if norm_label: + _add_detection( + token=ent.get("text", ""), + label=norm_label, + score=ent.get("score", 0.0), + page_idx=page_idx, + source="gliner", + ) + except Exception as e: + log.warning(f"_run_ner_on_original_text: GLiNER erreur page {page_idx}: {e}") + + # --- CamemBERT-bio --- + if camembert_mgr is not None and hasattr(camembert_mgr, 'predict_long') and camembert_mgr.is_loaded(): + try: + cam_ents = camembert_mgr.predict_long(page_text, threshold=0.4) + for ent in cam_ents: + raw_label = ent.get("label", "") + norm_label = _CAMEMBERT_LABEL_NORM.get(raw_label) + if norm_label: + _add_detection( + token=ent.get("word", ""), + label=norm_label, + score=ent.get("score", 0.0), + page_idx=page_idx, + source="camembert_bio", + ) + except Exception as e: + log.warning(f"_run_ner_on_original_text: CamemBERT-bio erreur page {page_idx}: {e}") + + log.info(f"NER-first: {len(detections)} détections sur {len(pages_text)} pages " + f"(eds={sum(1 for d in detections if d.source == 'eds_pseudo')}, " + f"gliner={sum(1 for d in detections if d.source == 'gliner')}, " + f"camembert={sum(1 for d in detections if d.source == 'camembert_bio')})") + return detections + + # ----------------- FINESS Aho-Corasick establishment matching ----------------- def _build_finess_ac():