Files
anonymisation/scripts/build_paranames_gazetteer.py
Domi31tls c110de4a2e feat(T-I): validateur paranames + filtre mots-outils FR du gazetteer
Validateur scripts/validate_paranames.py exécuté sur le gazetteer réel,
révèle 2 défauts → corrigés :

- Mots-outils FR (avec/dans/voir/...) présents dans INSEE/paranames →
  risque FP au contexte 'low'. Ajout de 347 mots-outils spaCy fr (sûrs,
  filtrés des patronymes INSEE fréquents) à stopwords_manuels.txt.
  build_paranames_gazetteer.py filtre désormais aussi contre ce fichier ;
  gazetteer reconstruit (1 379 196 noms, mots-outils ≥3 chars retirés).
- Priorité sécurité respectée : allez/polygone sont de vrais patronymes
  INSEE rares → laissés MASQUABLES (pas de fuite), hors stopwords.
- OYARCABAL reclassé en warning (couvert par regex F3, absent de Wikidata).

Garde-fous vérifiés : Petit/Boucher/Berger conservés, noms étrangers
(EJNAINI/NGUYEN/...) conservés. Validateur 5/5. tests/unit 85 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 11:20:21 +02:00

272 lines
9.0 KiB
Python

#!/usr/bin/env python3
"""Build gazetteer paranames pour anonymisation médicale FR.
Source : https://github.com/bltlab/paranames (CC BY 4.0)
Citation : Sälevä & Lignos, ParaNames 1.0, LREC-COLING 2024.
Workflow :
1. Télécharge ``data/train.parquet`` (~1.33 GB) du repo HF
``imvladikon/paranames`` via ``huggingface_hub.hf_hub_download`` (cache
persistant, pas de re-téléchargement si déjà présent).
2. Itère sur le fichier parquet **par batches** avec pyarrow (RAM constante,
< 500 Mo de pointe).
3. Filtre ``type == "PER"`` (personnes uniquement).
4. Pour chaque ``name`` :
- split par espace et séparateurs courants
- dernier token UPPER NFKD → candidat **nom de famille**
- tokens précédents UPPER NFKD → candidats **prénoms**
5. Normalisation NFKD + uppercase + suppression diacritiques + ASCII A-Z.
6. Filtrage anti-bruit :
- longueur ≥ 3
- exclusion des stop-words médicaments BDPM
7. Sortie : 2 fichiers ``.txt.gz`` triés alphabétiquement, encodés UTF-8.
Idempotent : relance = même résultat. Cache HF réutilisé si présent.
Usage :
python scripts/build_paranames_gazetteer.py
python scripts/build_paranames_gazetteer.py --hf-cache /tmp/hf_paranames
python scripts/build_paranames_gazetteer.py --limit 200000 # debug
"""
from __future__ import annotations
import argparse
import gzip
import os
import sys
import time
import unicodedata
from pathlib import Path
from typing import Iterable, Iterator
REPO_ROOT = Path(__file__).resolve().parent.parent
DATA_DIR = REPO_ROOT / "data" / "paranames"
BDPM_STOPWORDS = REPO_ROOT / "data" / "bdpm" / "medicaments_stopwords.txt"
MANUAL_STOPWORDS = REPO_ROOT / "data" / "stopwords_manuels.txt"
INSEE_NOMS = REPO_ROOT / "data" / "insee" / "noms_famille_france.txt"
OUT_NOMS = DATA_DIR / "noms_famille_world.txt.gz"
OUT_PRENOMS = DATA_DIR / "prenoms_world.txt.gz"
HF_REPO_ID = "imvladikon/paranames"
HF_PARQUET_PATH = "data/train.parquet"
MIN_TOKEN_LEN = 3
MAX_TOKEN_LEN = 25
# Caractères à découper en plus de l'espace (séparateurs internes).
SPLIT_CHARS = " \t /,;:|()[]{}\"'`«»–—−.·"
SPLIT_TABLE = str.maketrans({c: " " for c in SPLIT_CHARS})
def normalize(token: str) -> str:
"""NFKD → uppercase → drop diacritics → A-Z only."""
if not token:
return ""
nfkd = unicodedata.normalize("NFKD", token)
no_acc = "".join(c for c in nfkd if not unicodedata.combining(c))
up = no_acc.upper()
return "".join(c for c in up if "A" <= c <= "Z")
def _load_terms(path: Path, label: str, stop: set[str]) -> None:
"""Charge un fichier de termes (1 par ligne, # = commentaire) dans stop."""
if not path.exists():
print(f"[WARN] {path} introuvable — pas de filtrage {label}.")
return
before = len(stop)
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
n = normalize(line)
if n:
stop.add(n)
print(f"[INFO] {label} : +{len(stop) - before:,} termes.")
def load_stopwords() -> set[str]:
"""Filtre cumulé : médicaments BDPM + mots-outils/stop-words français curés.
stopwords_manuels.txt couvre les mots français courants qui apparaissent à
tort comme patronymes dans INSEE/paranames (ex. « voir », « avec »). Ces
termes sont exclus du gazetteer pour réduire les faux positifs au contexte
de détection faible.
"""
stop: set[str] = set()
_load_terms(BDPM_STOPWORDS, "BDPM stop-words", stop)
_load_terms(MANUAL_STOPWORDS, "stop-words manuels FR", stop)
print(f"[INFO] Filtre total : {len(stop):,} termes.")
return stop
def download_parquet(cache_dir: str) -> Path:
"""Télécharge (ou récupère du cache) le parquet paranames."""
try:
from huggingface_hub import hf_hub_download # type: ignore
except ImportError as e:
sys.exit(
"[FATAL] `huggingface_hub` requis. Install : pip install huggingface_hub\n"
f"Erreur : {e}"
)
try:
path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=HF_PARQUET_PATH,
repo_type="dataset",
cache_dir=cache_dir,
)
except Exception as e:
sys.exit(
f"[FATAL] Impossible de télécharger {HF_REPO_ID}:{HF_PARQUET_PATH}\n"
f" Vérifier réseau / cache HF / accès huggingface.co\n"
f" Erreur : {e}"
)
p = Path(path)
print(f"[INFO] Parquet local : {p} ({p.stat().st_size/1e9:.2f} GB)")
return p
def iter_per_names(parquet_path: Path, limit: int | None) -> Iterator[str]:
"""Stream les noms PER du parquet par row-groups (RAM constante)."""
try:
import pyarrow.parquet as pq # type: ignore
except ImportError as e:
sys.exit(f"[FATAL] pyarrow requis. Install : pip install pyarrow\n{e}")
pf = pq.ParquetFile(parquet_path)
print(
f"[INFO] Parquet : {pf.num_row_groups} row groups, "
f"{pf.metadata.num_rows:,} lignes totales."
)
count_in = 0
count_per = 0
# On ne lit que les colonnes utiles
for batch in pf.iter_batches(batch_size=65536, columns=["name", "type"]):
names = batch.column("name").to_pylist()
types = batch.column("type").to_pylist()
for nm, tp in zip(names, types):
count_in += 1
if tp != "PER":
continue
if nm:
count_per += 1
yield nm
if limit is not None and count_in >= limit:
print(f"[INFO] Limite atteinte ({limit}).")
print(f"[INFO] Total lignes lues : {count_in:,}")
print(f"[INFO] Total PER conservés : {count_per:,}")
return
if count_in % 1_000_000 < 65536:
print(
f"[PROGRESS] {count_in:>11,} lignes lues, "
f"{count_per:>11,} PER."
)
print(f"[INFO] Total lignes lues : {count_in:,}")
print(f"[INFO] Total PER conservés : {count_per:,}")
def split_name(name: str) -> tuple[list[str], str | None]:
clean = name.translate(SPLIT_TABLE)
tokens = [t for t in clean.split() if t]
if not tokens:
return [], None
if len(tokens) == 1:
return [], tokens[0]
return tokens[:-1], tokens[-1]
def good_token(tok: str, stop: set[str]) -> bool:
if not tok:
return False
if len(tok) < MIN_TOKEN_LEN or len(tok) > MAX_TOKEN_LEN:
return False
if tok in stop:
return False
return True
def write_sorted_gz(path: Path, items: Iterable[str]) -> int:
path.parent.mkdir(parents=True, exist_ok=True)
data = sorted(items)
with gzip.open(path, "wt", encoding="utf-8", compresslevel=9) as f:
for s in data:
f.write(s)
f.write("\n")
return len(data)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.split("\n")[0])
parser.add_argument(
"--hf-cache",
default=os.environ.get("HF_HOME", str(Path.home() / ".cache" / "huggingface")),
help="Répertoire cache HuggingFace (par défaut : ~/.cache/huggingface).",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limiter le nombre de lignes lues (debug).",
)
args = parser.parse_args()
t0 = time.time()
print(f"[INFO] Cache HF : {args.hf_cache}")
stopwords = load_stopwords()
parquet_path = download_parquet(args.hf_cache)
noms: set[str] = set()
prenoms: set[str] = set()
bad_kept = 0
for raw_name in iter_per_names(parquet_path, limit=args.limit):
prens, fam = split_name(raw_name)
if fam is not None:
n = normalize(fam)
if good_token(n, stopwords):
noms.add(n)
else:
bad_kept += 1
for p in prens:
n = normalize(p)
if good_token(n, stopwords):
prenoms.add(n)
print(f"[INFO] Noms de famille uniques (post-filtre) : {len(noms):,}")
print(f"[INFO] Prénoms uniques (post-filtre) : {len(prenoms):,}")
print(f"[INFO] Tokens rejetés (longueur/stop/vide) : {bad_kept:,}")
n_noms = write_sorted_gz(OUT_NOMS, noms)
n_pren = write_sorted_gz(OUT_PRENOMS, prenoms)
print(
f"[OK] {OUT_NOMS}{n_noms:,} entrées "
f"({OUT_NOMS.stat().st_size/1e6:.1f} Mo)"
)
print(
f"[OK] {OUT_PRENOMS}{n_pren:,} entrées "
f"({OUT_PRENOMS.stat().st_size/1e6:.1f} Mo)"
)
if INSEE_NOMS.exists():
insee_noms = {
line.strip().upper()
for line in INSEE_NOMS.read_text(encoding="utf-8").splitlines()
if line.strip()
}
inter = noms & insee_noms
cov = 100 * len(inter) / max(1, len(insee_noms))
print(
f"[INFO] Intersection noms_famille_world ∩ INSEE_FR : "
f"{len(inter):,} ({cov:.1f}% de couverture INSEE)"
)
print(f"[DONE] Temps total : {time.time()-t0:.1f}s")
return 0
if __name__ == "__main__":
sys.exit(main())