#!/usr/bin/env python3 """ Export silver annotations — BIO via alignement texte original ↔ pseudonymisé. ============================================================================= Aligne le texte extrait du PDF original avec le texte pseudonymisé (.pseudonymise.txt) pour créer des annotations BIO fiables. Les placeholders [NOM], [TEL], etc. dans le texte pseudonymisé indiquent exactement quels tokens ont été masqués. Usage: python scripts/export_silver_annotations.py [--limit N] [--out-dir DIR] Output: data/silver_annotations/ avec un fichier .bio par document Format BIO: TOKEN\tLABEL (un token par ligne, lignes vides entre phrases) """ import sys import re import difflib import argparse import unicodedata from pathlib import Path from typing import Dict, List, Set, Tuple sys.path.insert(0, str(Path(__file__).parent.parent)) # Mapping placeholder → label BIO PLACEHOLDER_TO_BIO: Dict[str, str] = { "NOM": "PER", "TEL": "TEL", "EMAIL": "EMAIL", "NIR": "NIR", "IPP": "IPP", "DOSSIER": "NDA", "NDA": "NDA", "EPISODE": "NDA", "RPPS": "RPPS", "DATE_NAISSANCE": "DATE_NAISSANCE", "ADRESSE": "ADRESSE", "CODE_POSTAL": "ZIP", "VILLE": "VILLE", "HOPITAL": "HOPITAL", "MASK": "HOPITAL", # [MASK] = hôpital masqué par force_regex "IBAN": "IBAN", "AGE": "AGE", } RE_PLACEHOLDER = re.compile(r"^\[([A-Z_]+)\]$") SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)") AUDIT_DIR = SRC / "anonymise_audit_30" # --- Gazetteer paths --- GAZETTEERS_DIR = Path(__file__).parent.parent / "data" VILLES_FINESS_PATH = GAZETTEERS_DIR / "finess" / "villes_finess.txt" COMMUNES_INSEE_PATH = GAZETTEERS_DIR / "insee" / "communes_france.txt" ETABLISSEMENTS_PATH = GAZETTEERS_DIR / "finess" / "etablissements_distinctifs.txt" # Mots de contexte indiquant qu'un token VILLE est bien un lieu (pas un nom commun) VILLE_CONTEXT_WORDS = { "à", "a", "de", "né", "née", "nee", "ne", "résid", "resid", "hospitalis", "transféré", "transfere", "transferée", "transferee", "domicilié", "domicilie", "domiciliée", "domiciliee", "habite", "habitant", "demeurant", "originaire", "ville", "commune", "cedex", } def _strip_accents(s: str) -> str: """Supprime les accents d'une chaîne (é→e, à→a, etc.).""" nfkd = unicodedata.normalize("NFKD", s) return "".join(c for c in nfkd if not unicodedata.combining(c)) def _normalize_gaz(s: str) -> str: """Normalise pour comparaison gazetteer : minuscule, sans accents, stripped.""" return _strip_accents(s.lower().strip()) def load_gazetteers() -> dict: """Charge les gazetteers depuis les fichiers, avec fallback gracieux. Retourne un dict avec: - "villes": set de tuples de tokens normalisés (ex: ("saint", "palais")) - "hopitaux": set de tuples de tokens normalisés (ex: ("ch", "argentan")) """ villes: Set[Tuple[str, ...]] = set() hopitaux: Set[Tuple[str, ...]] = set() # --- Villes FINESS (UPPERCASE, une par ligne) --- if VILLES_FINESS_PATH.exists(): for line in VILLES_FINESS_PATH.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue # Filtrer les entrées avec "CEDEX" (adresses postales, pas des villes) tokens = tuple(_normalize_gaz(t) for t in line.split() if t != "CEDEX") if tokens and len(tokens[0]) >= 2: # Ignorer entrées trop courtes villes.add(tokens) # --- Communes INSEE (UPPERCASE, une par ligne) --- if COMMUNES_INSEE_PATH.exists(): for line in COMMUNES_INSEE_PATH.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue tokens = tuple(_normalize_gaz(t) for t in line.split()) if tokens and len(tokens[0]) >= 2: villes.add(tokens) # --- Établissements (format "- nom normalisé", minuscule) --- if ETABLISSEMENTS_PATH.exists(): for line in ETABLISSEMENTS_PATH.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line.startswith("- "): continue name = line[2:].strip() if not name: continue tokens = tuple(_normalize_gaz(t) for t in name.split()) # Ignorer les entrées trop courtes (1 token de 3 chars ou moins) if tokens and (len(tokens) > 1 or len(tokens[0]) > 3): hopitaux.add(tokens) # Retirer les villes d'un seul token très court (risque élevé de faux positifs) villes = {v for v in villes if len(v) > 1 or len(v[0]) >= 3} return {"villes": villes, "hopitaux": hopitaux} def _has_ville_context(tokens: List[str], labels: List[str], pos: int, window: int = 3) -> bool: """Vérifie si un token à la position `pos` a un contexte indiquant un lieu. Regarde les `window` tokens précédents pour des mots-clés de contexte. """ start = max(0, pos - window) for i in range(start, pos): tok_norm = _normalize_gaz(tokens[i].strip(".,;:!?()[]{}\"'")) # Vérifier correspondance exacte ou préfixe (ex: "résid" matche "résidence") for ctx in VILLE_CONTEXT_WORDS: if tok_norm == ctx or tok_norm.startswith(ctx): return True return False def enrich_with_gazetteers( bio_tokens: List[Tuple[str, str]], gazetteers: dict, ) -> Tuple[List[Tuple[str, str]], int, int]: """Enrichit les annotations BIO avec les gazetteers. Ne modifie JAMAIS un label existant (non-O). Ajoute uniquement des labels sur les tokens actuellement "O". Retourne: (bio_tokens_enrichis, nb_villes_ajoutées, nb_hopitaux_ajoutés) """ tokens = [t for t, _ in bio_tokens] labels = [l for _, l in bio_tokens] n = len(tokens) added_villes = 0 added_hopitaux = 0 # Pré-calculer les tokens normalisés (sans ponctuation, sans accents, lowercase) tokens_norm = [ _normalize_gaz(t.strip(".,;:!?()[]{}\"'")) for t in tokens ] # --- Enrichissement HOPITAL (multi-token, sans contrainte de contexte) --- # On traite d'abord les hôpitaux car ils sont plus spécifiques for gaz_tokens in gazetteers.get("hopitaux", set()): gaz_len = len(gaz_tokens) if gaz_len == 0: continue i = 0 while i <= n - gaz_len: # Vérifier si la séquence de tokens matche match = True for k in range(gaz_len): if tokens_norm[i + k] != gaz_tokens[k]: match = False break if match: # Vérifier que TOUS les tokens sont actuellement "O" all_o = all(labels[i + k] == "O" for k in range(gaz_len)) if all_o: labels[i] = "B-HOPITAL" for k in range(1, gaz_len): labels[i + k] = "I-HOPITAL" added_hopitaux += 1 i += gaz_len continue i += 1 # --- Enrichissement VILLE (avec contexte obligatoire) --- for gaz_tokens in gazetteers.get("villes", set()): gaz_len = len(gaz_tokens) if gaz_len == 0: continue i = 0 while i <= n - gaz_len: # Vérifier si la séquence de tokens matche match = True for k in range(gaz_len): if tokens_norm[i + k] != gaz_tokens[k]: match = False break if match: # Vérifier que TOUS les tokens sont actuellement "O" all_o = all(labels[i + k] == "O" for k in range(gaz_len)) if all_o and _has_ville_context(tokens, labels, i): labels[i] = "B-VILLE" for k in range(1, gaz_len): labels[i + k] = "I-VILLE" added_villes += 1 i += gaz_len continue i += 1 enriched = list(zip(tokens, labels)) return enriched, added_villes, added_hopitaux def extract_original_text(pdf_path: Path) -> str: """Extrait le texte brut d'un PDF (même méthode que le pipeline).""" import anonymizer_core_refactored_onnx as core pages_text, _, _, _ = core.extract_text_with_fallback_ocr(pdf_path) return "\f".join(pages_text) def tokenize_text(text: str) -> List[str]: """Split en tokens whitespace, en nettoyant les caractères de contrôle.""" # Remplacer \f et \r par \n pour l'alignement text = text.replace("\f", "\n").replace("\r", "") tokens = [] for line in text.split("\n"): line_toks = line.split() if line_toks: tokens.extend(line_toks) return tokens def align_and_annotate(original_text: str, pseudo_text: str) -> List[Tuple[str, str]]: """Aligne texte original et pseudonymisé pour créer les annotations BIO. Utilise SequenceMatcher pour trouver les différences. Quand le pseudo contient [PLACEHOLDER], les tokens originaux correspondants reçoivent le label BIO approprié. """ orig_tokens = tokenize_text(original_text) pseudo_tokens = tokenize_text(pseudo_text) # Normaliser pour l'alignement (lowercase, sans accents pour meilleur matching) def normalize(tok): return tok.lower().strip(".,;:!?()[]{}\"'") orig_norm = [normalize(t) for t in orig_tokens] pseudo_norm = [normalize(t) for t in pseudo_tokens] sm = difflib.SequenceMatcher(None, orig_norm, pseudo_norm, autojunk=False) opcodes = sm.get_opcodes() bio_tokens: List[Tuple[str, str]] = [] for tag, i1, i2, j1, j2 in opcodes: if tag == "equal": # Tokens identiques → O for t in orig_tokens[i1:i2]: bio_tokens.append((t, "O")) elif tag == "replace": # Analyser le côté pseudo : quels tokens sont des placeholders ? pseudo_chunk = pseudo_tokens[j1:j2] placeholder_labels = [] # (index_in_pseudo, bio_label) pour chaque placeholder non_placeholder_norms = set() for pi, pt in enumerate(pseudo_chunk): m = RE_PLACEHOLDER.match(pt) if m: bio_label = PLACEHOLDER_TO_BIO.get(m.group(1)) if bio_label: placeholder_labels.append((pi, bio_label)) else: non_placeholder_norms.add(normalize(pt)) if not placeholder_labels: # Pas de placeholder → O for t in orig_tokens[i1:i2]: bio_tokens.append((t, "O")) elif len(placeholder_labels) == 1: # Un seul placeholder : tous les tokens originaux (sauf ceux # qui matchent un token non-placeholder du pseudo) prennent ce label label = placeholder_labels[0][1] first = True for t in orig_tokens[i1:i2]: if normalize(t) in non_placeholder_norms: bio_tokens.append((t, "O")) first = True else: prefix = "B-" if first else "I-" bio_tokens.append((t, f"{prefix}{label}")) first = False else: # Plusieurs placeholders : distribuer les tokens originaux # Stratégie : répartir proportionnellement, chaque groupe commence par B- n_orig = i2 - i1 n_placeholders = len(placeholder_labels) # Exclure d'abord les tokens qui matchent des non-placeholders orig_assignments = [] for t in orig_tokens[i1:i2]: if normalize(t) in non_placeholder_norms: orig_assignments.append(("O", None)) else: orig_assignments.append(("PII", None)) # Distribuer les tokens PII entre les placeholders pii_indices = [k for k, (tp, _) in enumerate(orig_assignments) if tp == "PII"] n_pii = len(pii_indices) if n_pii > 0 and n_placeholders > 0: chunk_size = max(1, n_pii // n_placeholders) for pi_idx, (_, label) in enumerate(placeholder_labels): start_pii = pi_idx * chunk_size end_pii = (pi_idx + 1) * chunk_size if pi_idx < n_placeholders - 1 else n_pii for k in range(start_pii, min(end_pii, n_pii)): orig_assignments[pii_indices[k]] = ("PII", label) # Générer les BIO tokens prev_label = None for k, (t, (tp, label)) in enumerate(zip(orig_tokens[i1:i2], orig_assignments)): if tp == "O" or label is None: bio_tokens.append((t, "O")) prev_label = None else: prefix = "B-" if label != prev_label else "I-" bio_tokens.append((t, f"{prefix}{label}")) prev_label = label elif tag == "delete": # Tokens présents uniquement dans l'original → O for t in orig_tokens[i1:i2]: bio_tokens.append((t, "O")) elif tag == "insert": # Tokens ajoutés dans le pseudo (rare) → ignorer pass return bio_tokens def export_document( pdf_path: Path, pseudo_path: Path, out_dir: Path, gazetteers: dict | None = None, ) -> Tuple[int, int, int, int]: """Exporte un document en format BIO. Retourne (nb_tokens, nb_entités_diff, nb_villes_gaz, nb_hopitaux_gaz). """ # Extraire le texte original original_text = extract_original_text(pdf_path) if not original_text.strip(): return 0, 0, 0, 0 # Lire le texte pseudonymisé pseudo_text = pseudo_path.read_text(encoding="utf-8") if not pseudo_text.strip(): return 0, 0, 0, 0 # Aligner et annoter (diff-based) bio_tokens = align_and_annotate(original_text, pseudo_text) n_ents_diff = sum(1 for _, l in bio_tokens if l.startswith("B-")) # Enrichissement gazetteer (post-processing) added_villes = 0 added_hopitaux = 0 if gazetteers: bio_tokens, added_villes, added_hopitaux = enrich_with_gazetteers( bio_tokens, gazetteers ) # Écrire en format CoNLL out_name = pdf_path.stem + ".bio" out_path = out_dir / out_name lines = [] for token, label in bio_tokens: # Séparer les phrases par des lignes vides (ponctuation finale) if token in (".", "!", "?") and label == "O": lines.append(f"{token}\t{label}") lines.append("") else: lines.append(f"{token}\t{label}") out_path.write_text("\n".join(lines), encoding="utf-8") return len(bio_tokens), n_ents_diff, added_villes, added_hopitaux def main(): parser = argparse.ArgumentParser(description="Export silver annotations BIO (alignement original ↔ pseudo)") parser.add_argument("--out-dir", type=Path, default=Path(__file__).parent.parent / "data" / "silver_annotations", help="Répertoire de sortie") parser.add_argument("--limit", type=int, default=0, help="Limiter à N fichiers (0=tous)") parser.add_argument("--no-gazetteers", action="store_true", help="Désactiver l'enrichissement par gazetteers") parser.add_argument("--extra-dir", type=Path, nargs="*", default=[], help="Répertoires supplémentaires contenant des .pseudonymise.txt") args = parser.parse_args() args.out_dir.mkdir(parents=True, exist_ok=True) # Charger les gazetteers gazetteers = None if not args.no_gazetteers: gazetteers = load_gazetteers() n_villes = len(gazetteers["villes"]) n_hop = len(gazetteers["hopitaux"]) print(f"Gazetteers chargés: {n_villes} villes, {n_hop} établissements") else: print("Gazetteers désactivés") # Trouver les paires PDF + pseudo (audit_30 + extra dirs) search_dirs = [AUDIT_DIR] + list(args.extra_dir) pseudo_files = [] for sdir in search_dirs: if sdir.exists(): pseudo_files.extend(sorted(sdir.glob("*.pseudonymise.txt"))) print(f" {sdir.name}: {len(list(sdir.glob('*.pseudonymise.txt')))} fichiers pseudo") # Dédupliquer par nom de base seen_bases = set() unique_pseudo = [] for pf in pseudo_files: base = pf.name.replace(".pseudonymise.txt", "") if base not in seen_bases: seen_bases.add(base) unique_pseudo.append(pf) pseudo_files = unique_pseudo pairs = [] for pseudo_path in pseudo_files: # Retrouver le PDF source base_name = pseudo_path.name.replace(".pseudonymise.txt", ".pdf") # Chercher dans les sous-dossiers OGC found = list(SRC.glob(f"*/{base_name}")) if found: pairs.append((found[0], pseudo_path)) if args.limit > 0: pairs = pairs[:args.limit] print(f"Export silver annotations: {len(pairs)} documents → {args.out_dir}") total_tokens = 0 total_ents_diff = 0 total_villes_gaz = 0 total_hop_gaz = 0 for pdf_path, pseudo_path in pairs: try: n_tok, n_diff, n_vgaz, n_hgaz = export_document( pdf_path, pseudo_path, args.out_dir, gazetteers ) total_tokens += n_tok total_ents_diff += n_diff total_villes_gaz += n_vgaz total_hop_gaz += n_hgaz gaz_info = "" if n_vgaz or n_hgaz: gaz_info = f" (+{n_vgaz} villes, +{n_hgaz} hôpitaux gaz.)" print(f" {pdf_path.name}: {n_tok} tokens, {n_diff} entités diff{gaz_info}") except Exception as e: print(f" {pdf_path.name}: ERREUR {e}") total_gaz = total_villes_gaz + total_hop_gaz total_all = total_ents_diff + total_gaz print(f"\n{'='*60}") print(f"Total tokens: {total_tokens}") print(f"Entités diff-based: {total_ents_diff} B-") print(f"Entités gazetteers: +{total_gaz} ({total_villes_gaz} VILLE, {total_hop_gaz} HOPITAL)") print(f"Total entités: {total_all} B-") if total_ents_diff > 0: pct = 100 * total_gaz / total_ents_diff print(f"Enrichissement: +{pct:.1f}% par gazetteers") print(f"Sortie: {args.out_dir}") if __name__ == "__main__": main()