Amélioration qualité anonymisation : dico médicaments auto, noms soignants, garde trackare, validation EDS, QC audit
- Track A : chargement automatique de ~4200 noms de médicaments depuis edsnlp/drugs.json dans _MEDICAL_STOP_WORDS_SET (réduit les faux positifs médicaments) - Track B : règles de validation EDS par type (NOM rejeté si contexte dosage, HOPITAL rejeté si < 5 chars ou mot structurel) - Track C : nouveau script qc_audit.py pour contrôle qualité post-anonymisation (scan FN résiduels, densité placeholders, FP/FN candidats, mode batch CSV) - Track D : garde structurelle trackare — NOM_GLOBAL <= 3 chars ignoré dans les documents trackare pour éviter de masquer des codes diagnostics - Track E : détection enrichie des noms soignants (Pr/Professeur, Prescripteur, Prescrit par, Exécuté par, Réalisé par) Testé sur 3 OGC (407, 316, 589) — 4 PDFs, 0 erreur, 0 PII résiduel, 0 faux positif détecté. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
313
qc_audit.py
Normal file
313
qc_audit.py
Normal file
@@ -0,0 +1,313 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
QC Audit — Contrôle qualité post-anonymisation
|
||||
-----------------------------------------------
|
||||
Analyse les fichiers .audit.jsonl et .pseudonymise.txt pour détecter :
|
||||
- Faux négatifs résiduels (EMAIL, TEL, NIR non masqués)
|
||||
- Sur-masquage (densité de placeholders trop élevée)
|
||||
- Faux positifs candidats (NOM_EXTRACTED qui sont des stop words ou < 3 chars)
|
||||
- Faux négatifs candidats (mots ALL-CAPS >= 5 chars non masqués, non médicaux)
|
||||
- Stats audit (comptage par kind, top 10)
|
||||
|
||||
Usage :
|
||||
python3 qc_audit.py path/to/*.audit.jsonl
|
||||
python3 qc_audit.py --batch-dir path/anonymise/
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
# Regex PII critiques (mêmes patterns que le core)
|
||||
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
|
||||
RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[ .-]?\d){8}(?!\d)")
|
||||
RE_NIR = re.compile(
|
||||
r"\b([12])\s*(\d{2})\s*(0[1-9]|1[0-2]|2[AB])\s*(\d{2,3})\s*(\d{3})\s*(\d{3})\s*(\d{2})\b",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
RE_PLACEHOLDER = re.compile(r"\[[A-ZÉÈÀÙÂÊÎÔÛÄ_]+\]")
|
||||
|
||||
# Stop words médicaux (chargement léger pour le QC)
|
||||
try:
|
||||
from anonymizer_core_refactored_onnx import _MEDICAL_STOP_WORDS_SET
|
||||
except ImportError:
|
||||
_MEDICAL_STOP_WORDS_SET = set()
|
||||
|
||||
|
||||
def load_audit(audit_path: Path) -> List[Dict[str, Any]]:
|
||||
"""Charge un fichier .audit.jsonl."""
|
||||
entries = []
|
||||
with audit_path.open("r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
entries.append(json.loads(line))
|
||||
return entries
|
||||
|
||||
|
||||
def load_text(txt_path: Path) -> str:
|
||||
"""Charge un fichier .pseudonymise.txt."""
|
||||
return txt_path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def scan_residual_pii(text: str) -> Dict[str, List[str]]:
|
||||
"""Détecte les PII résiduels (faux négatifs) dans le texte pseudonymisé."""
|
||||
residuals: Dict[str, List[str]] = {}
|
||||
for m in RE_EMAIL.finditer(text):
|
||||
# Ignorer les emails dans les placeholders
|
||||
if "[" not in text[max(0, m.start() - 1):m.start()]:
|
||||
residuals.setdefault("EMAIL", []).append(m.group())
|
||||
for m in RE_TEL.finditer(text):
|
||||
if "[" not in text[max(0, m.start() - 1):m.start()]:
|
||||
residuals.setdefault("TEL", []).append(m.group())
|
||||
for m in RE_NIR.finditer(text):
|
||||
if "[" not in text[max(0, m.start() - 1):m.start()]:
|
||||
residuals.setdefault("NIR", []).append(m.group())
|
||||
return residuals
|
||||
|
||||
|
||||
def placeholder_density(text: str) -> Dict[str, Any]:
|
||||
"""Calcule la densité de placeholders. Alerte si [NOM] > 5% des mots."""
|
||||
words = text.split()
|
||||
total_words = len(words)
|
||||
if total_words == 0:
|
||||
return {"total_words": 0, "placeholders": 0, "density_pct": 0.0, "alert": False}
|
||||
|
||||
placeholder_count = sum(1 for w in words if RE_PLACEHOLDER.match(w))
|
||||
nom_count = text.count("[NOM]")
|
||||
density = placeholder_count / total_words * 100
|
||||
nom_density = nom_count / total_words * 100
|
||||
|
||||
return {
|
||||
"total_words": total_words,
|
||||
"placeholders": placeholder_count,
|
||||
"density_pct": round(density, 2),
|
||||
"nom_count": nom_count,
|
||||
"nom_density_pct": round(nom_density, 2),
|
||||
"alert_overmasking": nom_density > 5.0,
|
||||
}
|
||||
|
||||
|
||||
def audit_stats(entries: List[Dict[str, Any]]) -> Dict[str, int]:
|
||||
"""Comptage par kind (top 10)."""
|
||||
counter = Counter(e.get("kind", "UNKNOWN") for e in entries)
|
||||
return dict(counter.most_common(10))
|
||||
|
||||
|
||||
def fp_candidates(entries: List[Dict[str, Any]]) -> List[Dict[str, str]]:
|
||||
"""Faux positifs candidats : NOM_EXTRACTED qui sont des stop words ou < 3 chars."""
|
||||
candidates = []
|
||||
for e in entries:
|
||||
kind = e.get("kind", "")
|
||||
original = e.get("original", "")
|
||||
if kind != "NOM_EXTRACTED":
|
||||
continue
|
||||
is_fp = False
|
||||
reason = ""
|
||||
if len(original) < 3:
|
||||
is_fp = True
|
||||
reason = "trop court (< 3 chars)"
|
||||
elif _MEDICAL_STOP_WORDS_SET and original.lower() in _MEDICAL_STOP_WORDS_SET:
|
||||
is_fp = True
|
||||
reason = "stop word médical"
|
||||
if is_fp:
|
||||
candidates.append({"kind": kind, "original": original, "reason": reason})
|
||||
return candidates
|
||||
|
||||
|
||||
def fn_candidates(text: str) -> List[str]:
|
||||
"""Faux négatifs candidats : mots ALL-CAPS >= 5 chars non masqués, non médicaux."""
|
||||
candidates = []
|
||||
seen = set()
|
||||
for m in re.finditer(r"\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{5,})\b", text):
|
||||
word = m.group(1)
|
||||
if word in seen:
|
||||
continue
|
||||
seen.add(word)
|
||||
# Ignorer les placeholders et mots dans les balises
|
||||
if "[" in text[max(0, m.start() - 1):m.start()]:
|
||||
continue
|
||||
if "]" in text[m.end():min(len(text), m.end() + 1)]:
|
||||
continue
|
||||
# Ignorer les mots médicaux connus
|
||||
if _MEDICAL_STOP_WORDS_SET and word.lower() in _MEDICAL_STOP_WORDS_SET:
|
||||
continue
|
||||
# Ignorer les abréviations médicales très courantes
|
||||
if word in {"TABLES", "FINESS", "EMAIL", "ADRESSE", "IBAN", "EPISODE",
|
||||
"ETABLISSEMENT", "DATE", "NAISSANCE", "POSTAL", "MASK",
|
||||
"DOSSIER", "RPPS", "GLOBAL", "EXTRACTED", "TRACKARE"}:
|
||||
continue
|
||||
candidates.append(word)
|
||||
return candidates
|
||||
|
||||
|
||||
def analyze_file(audit_path: Path, txt_path: Optional[Path] = None) -> Dict[str, Any]:
|
||||
"""Analyse complète d'un couple audit.jsonl + pseudonymise.txt."""
|
||||
result: Dict[str, Any] = {"file": str(audit_path)}
|
||||
|
||||
# Charger l'audit
|
||||
entries = load_audit(audit_path)
|
||||
result["total_hits"] = len(entries)
|
||||
result["stats"] = audit_stats(entries)
|
||||
result["fp_candidates"] = fp_candidates(entries)
|
||||
|
||||
# Charger le texte si disponible
|
||||
if txt_path is None:
|
||||
# Déduire le chemin du .pseudonymise.txt
|
||||
stem = audit_path.name.replace(".audit.jsonl", "")
|
||||
txt_path = audit_path.parent / f"{stem}.pseudonymise.txt"
|
||||
|
||||
if txt_path.exists():
|
||||
text = load_text(txt_path)
|
||||
result["residual_pii"] = scan_residual_pii(text)
|
||||
result["density"] = placeholder_density(text)
|
||||
result["fn_candidates"] = fn_candidates(text)
|
||||
else:
|
||||
result["residual_pii"] = {}
|
||||
result["density"] = {}
|
||||
result["fn_candidates"] = []
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def print_report(analysis: Dict[str, Any]) -> None:
|
||||
"""Affiche un rapport lisible pour un fichier."""
|
||||
print(f"\n{'='*70}")
|
||||
print(f" QC Audit : {analysis['file']}")
|
||||
print(f"{'='*70}")
|
||||
|
||||
print(f"\n Total hits audit : {analysis['total_hits']}")
|
||||
|
||||
# Stats par kind
|
||||
print("\n Top 10 kinds :")
|
||||
for kind, count in analysis.get("stats", {}).items():
|
||||
print(f" {kind:30s} : {count}")
|
||||
|
||||
# Densité
|
||||
density = analysis.get("density", {})
|
||||
if density:
|
||||
print(f"\n Densité placeholders : {density.get('density_pct', 0)}% "
|
||||
f"({density.get('placeholders', 0)}/{density.get('total_words', 0)} mots)")
|
||||
print(f" [NOM] : {density.get('nom_count', 0)} occurrences "
|
||||
f"({density.get('nom_density_pct', 0)}%)")
|
||||
if density.get("alert_overmasking"):
|
||||
print(" *** ALERTE : sur-masquage possible ([NOM] > 5% des mots) ***")
|
||||
|
||||
# PII résiduels
|
||||
residuals = analysis.get("residual_pii", {})
|
||||
if residuals:
|
||||
print("\n PII résiduels (faux négatifs) :")
|
||||
for pii_type, values in residuals.items():
|
||||
print(f" {pii_type} : {len(values)} trouvé(s)")
|
||||
for v in values[:3]:
|
||||
print(f" - {v}")
|
||||
else:
|
||||
print("\n PII résiduels : aucun détecté")
|
||||
|
||||
# FP candidats
|
||||
fps = analysis.get("fp_candidates", [])
|
||||
if fps:
|
||||
print(f"\n FP candidats ({len(fps)}) :")
|
||||
for fp in fps[:10]:
|
||||
print(f" - {fp['original']:20s} ({fp['reason']})")
|
||||
|
||||
# FN candidats
|
||||
fns = analysis.get("fn_candidates", [])
|
||||
if fns:
|
||||
print(f"\n FN candidats ({len(fns)} mots ALL-CAPS non masqués) :")
|
||||
for fn in fns[:15]:
|
||||
print(f" - {fn}")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def batch_report(results: List[Dict[str, Any]], csv_path: Optional[Path] = None) -> None:
|
||||
"""Rapport batch résumé. Optionnel : export CSV."""
|
||||
print(f"\n{'='*70}")
|
||||
print(f" RAPPORT BATCH — {len(results)} fichier(s)")
|
||||
print(f"{'='*70}")
|
||||
|
||||
total_hits = sum(r.get("total_hits", 0) for r in results)
|
||||
total_residuals = sum(
|
||||
sum(len(v) for v in r.get("residual_pii", {}).values())
|
||||
for r in results
|
||||
)
|
||||
total_fps = sum(len(r.get("fp_candidates", [])) for r in results)
|
||||
total_fns = sum(len(r.get("fn_candidates", [])) for r in results)
|
||||
alerts = [r["file"] for r in results if r.get("density", {}).get("alert_overmasking")]
|
||||
|
||||
print(f"\n Total hits audit : {total_hits}")
|
||||
print(f" PII résiduels (FN) : {total_residuals}")
|
||||
print(f" FP candidats : {total_fps}")
|
||||
print(f" FN candidats (ALL-CAPS) : {total_fns}")
|
||||
print(f" Alertes sur-masquage : {len(alerts)}")
|
||||
if alerts:
|
||||
for a in alerts:
|
||||
print(f" - {a}")
|
||||
|
||||
if csv_path:
|
||||
with csv_path.open("w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow([
|
||||
"fichier", "total_hits", "residual_pii", "density_pct",
|
||||
"nom_density_pct", "alert_overmasking", "fp_count", "fn_count",
|
||||
])
|
||||
for r in results:
|
||||
d = r.get("density", {})
|
||||
writer.writerow([
|
||||
Path(r["file"]).name,
|
||||
r.get("total_hits", 0),
|
||||
sum(len(v) for v in r.get("residual_pii", {}).values()),
|
||||
d.get("density_pct", ""),
|
||||
d.get("nom_density_pct", ""),
|
||||
d.get("alert_overmasking", ""),
|
||||
len(r.get("fp_candidates", [])),
|
||||
len(r.get("fn_candidates", [])),
|
||||
])
|
||||
print(f"\n Rapport CSV : {csv_path}")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="QC Audit post-anonymisation")
|
||||
parser.add_argument("files", nargs="*", help="Fichiers .audit.jsonl à analyser")
|
||||
parser.add_argument("--batch-dir", type=str, help="Répertoire contenant les fichiers anonymisés")
|
||||
parser.add_argument("--csv", type=str, help="Chemin du rapport CSV résumé (mode batch)")
|
||||
args = parser.parse_args()
|
||||
|
||||
audit_files: List[Path] = []
|
||||
|
||||
if args.batch_dir:
|
||||
batch_dir = Path(args.batch_dir)
|
||||
audit_files = sorted(batch_dir.glob("**/*.audit.jsonl"))
|
||||
elif args.files:
|
||||
audit_files = [Path(f) for f in args.files]
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
if not audit_files:
|
||||
print("Aucun fichier .audit.jsonl trouvé.")
|
||||
sys.exit(1)
|
||||
|
||||
results = []
|
||||
for af in audit_files:
|
||||
analysis = analyze_file(af)
|
||||
results.append(analysis)
|
||||
print_report(analysis)
|
||||
|
||||
if len(results) > 1:
|
||||
csv_path = Path(args.csv) if args.csv else None
|
||||
batch_report(results, csv_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user