#!/usr/bin/env python3 """Build gazetteer paranames pour anonymisation médicale FR. Source : https://github.com/bltlab/paranames (CC BY 4.0) Citation : Sälevä & Lignos, ParaNames 1.0, LREC-COLING 2024. Workflow : 1. Télécharge ``data/train.parquet`` (~1.33 GB) du repo HF ``imvladikon/paranames`` via ``huggingface_hub.hf_hub_download`` (cache persistant, pas de re-téléchargement si déjà présent). 2. Itère sur le fichier parquet **par batches** avec pyarrow (RAM constante, < 500 Mo de pointe). 3. Filtre ``type == "PER"`` (personnes uniquement). 4. Pour chaque ``name`` : - split par espace et séparateurs courants - dernier token UPPER NFKD → candidat **nom de famille** - tokens précédents UPPER NFKD → candidats **prénoms** 5. Normalisation NFKD + uppercase + suppression diacritiques + ASCII A-Z. 6. Filtrage anti-bruit : - longueur ≥ 3 - exclusion des stop-words médicaments BDPM 7. Sortie : 2 fichiers ``.txt.gz`` triés alphabétiquement, encodés UTF-8. Idempotent : relance = même résultat. Cache HF réutilisé si présent. Usage : python scripts/build_paranames_gazetteer.py python scripts/build_paranames_gazetteer.py --hf-cache /tmp/hf_paranames python scripts/build_paranames_gazetteer.py --limit 200000 # debug """ from __future__ import annotations import argparse import gzip import os import sys import time import unicodedata from pathlib import Path from typing import Iterable, Iterator REPO_ROOT = Path(__file__).resolve().parent.parent DATA_DIR = REPO_ROOT / "data" / "paranames" BDPM_STOPWORDS = REPO_ROOT / "data" / "bdpm" / "medicaments_stopwords.txt" MANUAL_STOPWORDS = REPO_ROOT / "data" / "stopwords_manuels.txt" INSEE_NOMS = REPO_ROOT / "data" / "insee" / "noms_famille_france.txt" OUT_NOMS = DATA_DIR / "noms_famille_world.txt.gz" OUT_PRENOMS = DATA_DIR / "prenoms_world.txt.gz" HF_REPO_ID = "imvladikon/paranames" HF_PARQUET_PATH = "data/train.parquet" MIN_TOKEN_LEN = 3 MAX_TOKEN_LEN = 25 # Caractères à découper en plus de l'espace (séparateurs internes). SPLIT_CHARS = " \t /,;:|()[]{}\"'`«»–—−.·" SPLIT_TABLE = str.maketrans({c: " " for c in SPLIT_CHARS}) def normalize(token: str) -> str: """NFKD → uppercase → drop diacritics → A-Z only.""" if not token: return "" nfkd = unicodedata.normalize("NFKD", token) no_acc = "".join(c for c in nfkd if not unicodedata.combining(c)) up = no_acc.upper() return "".join(c for c in up if "A" <= c <= "Z") def _load_terms(path: Path, label: str, stop: set[str]) -> None: """Charge un fichier de termes (1 par ligne, # = commentaire) dans stop.""" if not path.exists(): print(f"[WARN] {path} introuvable — pas de filtrage {label}.") return before = len(stop) with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue n = normalize(line) if n: stop.add(n) print(f"[INFO] {label} : +{len(stop) - before:,} termes.") def load_stopwords() -> set[str]: """Filtre cumulé : médicaments BDPM + mots-outils/stop-words français curés. stopwords_manuels.txt couvre les mots français courants qui apparaissent à tort comme patronymes dans INSEE/paranames (ex. « voir », « avec »). Ces termes sont exclus du gazetteer pour réduire les faux positifs au contexte de détection faible. """ stop: set[str] = set() _load_terms(BDPM_STOPWORDS, "BDPM stop-words", stop) _load_terms(MANUAL_STOPWORDS, "stop-words manuels FR", stop) print(f"[INFO] Filtre total : {len(stop):,} termes.") return stop def download_parquet(cache_dir: str) -> Path: """Télécharge (ou récupère du cache) le parquet paranames.""" try: from huggingface_hub import hf_hub_download # type: ignore except ImportError as e: sys.exit( "[FATAL] `huggingface_hub` requis. Install : pip install huggingface_hub\n" f"Erreur : {e}" ) try: path = hf_hub_download( repo_id=HF_REPO_ID, filename=HF_PARQUET_PATH, repo_type="dataset", cache_dir=cache_dir, ) except Exception as e: sys.exit( f"[FATAL] Impossible de télécharger {HF_REPO_ID}:{HF_PARQUET_PATH}\n" f" Vérifier réseau / cache HF / accès huggingface.co\n" f" Erreur : {e}" ) p = Path(path) print(f"[INFO] Parquet local : {p} ({p.stat().st_size/1e9:.2f} GB)") return p def iter_per_names(parquet_path: Path, limit: int | None) -> Iterator[str]: """Stream les noms PER du parquet par row-groups (RAM constante).""" try: import pyarrow.parquet as pq # type: ignore except ImportError as e: sys.exit(f"[FATAL] pyarrow requis. Install : pip install pyarrow\n{e}") pf = pq.ParquetFile(parquet_path) print( f"[INFO] Parquet : {pf.num_row_groups} row groups, " f"{pf.metadata.num_rows:,} lignes totales." ) count_in = 0 count_per = 0 # On ne lit que les colonnes utiles for batch in pf.iter_batches(batch_size=65536, columns=["name", "type"]): names = batch.column("name").to_pylist() types = batch.column("type").to_pylist() for nm, tp in zip(names, types): count_in += 1 if tp != "PER": continue if nm: count_per += 1 yield nm if limit is not None and count_in >= limit: print(f"[INFO] Limite atteinte ({limit}).") print(f"[INFO] Total lignes lues : {count_in:,}") print(f"[INFO] Total PER conservés : {count_per:,}") return if count_in % 1_000_000 < 65536: print( f"[PROGRESS] {count_in:>11,} lignes lues, " f"{count_per:>11,} PER." ) print(f"[INFO] Total lignes lues : {count_in:,}") print(f"[INFO] Total PER conservés : {count_per:,}") def split_name(name: str) -> tuple[list[str], str | None]: clean = name.translate(SPLIT_TABLE) tokens = [t for t in clean.split() if t] if not tokens: return [], None if len(tokens) == 1: return [], tokens[0] return tokens[:-1], tokens[-1] def good_token(tok: str, stop: set[str]) -> bool: if not tok: return False if len(tok) < MIN_TOKEN_LEN or len(tok) > MAX_TOKEN_LEN: return False if tok in stop: return False return True def write_sorted_gz(path: Path, items: Iterable[str]) -> int: path.parent.mkdir(parents=True, exist_ok=True) data = sorted(items) with gzip.open(path, "wt", encoding="utf-8", compresslevel=9) as f: for s in data: f.write(s) f.write("\n") return len(data) def main() -> int: parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) parser.add_argument( "--hf-cache", default=os.environ.get("HF_HOME", str(Path.home() / ".cache" / "huggingface")), help="Répertoire cache HuggingFace (par défaut : ~/.cache/huggingface).", ) parser.add_argument( "--limit", type=int, default=None, help="Limiter le nombre de lignes lues (debug).", ) args = parser.parse_args() t0 = time.time() print(f"[INFO] Cache HF : {args.hf_cache}") stopwords = load_stopwords() parquet_path = download_parquet(args.hf_cache) noms: set[str] = set() prenoms: set[str] = set() bad_kept = 0 for raw_name in iter_per_names(parquet_path, limit=args.limit): prens, fam = split_name(raw_name) if fam is not None: n = normalize(fam) if good_token(n, stopwords): noms.add(n) else: bad_kept += 1 for p in prens: n = normalize(p) if good_token(n, stopwords): prenoms.add(n) print(f"[INFO] Noms de famille uniques (post-filtre) : {len(noms):,}") print(f"[INFO] Prénoms uniques (post-filtre) : {len(prenoms):,}") print(f"[INFO] Tokens rejetés (longueur/stop/vide) : {bad_kept:,}") n_noms = write_sorted_gz(OUT_NOMS, noms) n_pren = write_sorted_gz(OUT_PRENOMS, prenoms) print( f"[OK] {OUT_NOMS} — {n_noms:,} entrées " f"({OUT_NOMS.stat().st_size/1e6:.1f} Mo)" ) print( f"[OK] {OUT_PRENOMS} — {n_pren:,} entrées " f"({OUT_PRENOMS.stat().st_size/1e6:.1f} Mo)" ) if INSEE_NOMS.exists(): insee_noms = { line.strip().upper() for line in INSEE_NOMS.read_text(encoding="utf-8").splitlines() if line.strip() } inter = noms & insee_noms cov = 100 * len(inter) / max(1, len(insee_noms)) print( f"[INFO] Intersection noms_famille_world ∩ INSEE_FR : " f"{len(inter):,} ({cov:.1f}% de couverture INSEE)" ) print(f"[DONE] Temps total : {time.time()-t0:.1f}s") return 0 if __name__ == "__main__": sys.exit(main())