diff --git a/agent_v0/server_v1/pii_sanitizer.py b/agent_v0/server_v1/pii_sanitizer.py new file mode 100644 index 000000000..21752cb2c --- /dev/null +++ b/agent_v0/server_v1/pii_sanitizer.py @@ -0,0 +1,133 @@ +"""Assainissement PII des données capturées (titres de fenêtre, texte saisi, OCR). + +Côté serveur. Remplace la PII par des **tokens typés et cohérents** +(`[IPP_1]`, `[AGE_1]`, `[NOM_1]`…) : on protège la donnée **et** on garde la +structure (champ de type NOM/IPP) utile à l'apprentissage des variables. + +Couche 1 (ce module, sans modèle) : filet **regex** sur la PII structurée +(IPP, NIR, téléphone, email, âge) + règles **structurelles** des titres +cliniques (`NOM (NAISSANCE) Prénom`, `[Nom Prénom]` des fenêtres PACS). Regex +réutilisées du projet `anonymisation`. +Couche 2 (à venir) : NER CamemBERT-bio (ONNX) pour les noms libres que la +couche 1 ne capte pas — branchée plus tard, ce module marche sans. + +Branche feat/push-log-dgx — assainissement PII clinique. +""" + +from __future__ import annotations + +import re +from typing import Dict, List, Optional, Tuple + +# --- Filet regex (réutilisé de anonymisation/anonymizer_core_refactored_onnx.py) --- +RE_IPP = re.compile(r"\b(?:I\.?P\.?P\.?|IPP|N°\s*Ipp)\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE) +RE_NIR = re.compile(r"(? str: + """Clé de cohérence : même entité -> même token.""" + if etype in ("IPP", "NIR", "TEL"): + return re.sub(r"\s+", "", value) + if etype == "EMAIL": + return value.lower() + return re.sub(r"\s+", " ", value).strip().upper() + + +def _is_blacklisted_name(value: str) -> bool: + toks = [t for t in re.split(r"[^\wÀ-ÿ]+", value) if t] + return bool(toks) and all(t.upper() in _SOFTWARE_BLACKLIST for t in toks) + + +def _assign_token(mapping: Dict, etype: str, norm: str) -> str: + key = (etype, norm) + if key in mapping: + return mapping[key] + n = 1 + sum(1 for k in mapping if isinstance(k, tuple) and k[0] == etype) + token = f"[{etype}_{n}]" + mapping[key] = token + return token + + +def anonymize_text( + text: str, *, mapping: Optional[Dict] = None +) -> Tuple[str, List[Dict]]: + """Remplace la PII de `text` par des tokens typés cohérents. + + `mapping` : table de cohérence partagée (ex. à l'échelle d'une session) — + la même valeur PII reçoit le même token d'un appel à l'autre. Mutée en place ; + si None, une table locale est utilisée. + + Retourne `(texte_assaini, entités)` où chaque entité = + `{"type", "original", "token", "start", "end"}` (positions dans le texte source). + """ + if not text: + return text, [] + if mapping is None: + mapping = {} + + # 1) collecte des candidats (start, end, type, valeur) + spans: List[Tuple[int, int, str, str]] = [] + for pattern, etype, group in _DETECTORS: + for m in pattern.finditer(text): + start, end = m.span(group) + if start == end: + continue + value = m.group(group) + if etype == "NOM" and _is_blacklisted_name(value): + continue + spans.append((start, end, etype, value)) + + # 2) résolution des chevauchements (priorité = ordre des détecteurs, puis position) + spans.sort(key=lambda s: (s[0], s[1])) + accepted: List[Tuple[int, int, str, str]] = [] + last_end = -1 + for start, end, etype, value in spans: + if start >= last_end: + accepted.append((start, end, etype, value)) + last_end = end + + # 3) substitution (de droite à gauche pour préserver les indices) + entities: List[Dict] = [] + out = text + for start, end, etype, value in sorted(accepted, key=lambda s: s[0], reverse=True): + token = _assign_token(mapping, etype, _normalize(etype, value)) + out = out[:start] + token + out[end:] + entities.append( + {"type": etype, "original": value, "token": token, "start": start, "end": end} + ) + entities.reverse() + return out, entities diff --git a/tests/unit/test_pii_sanitizer.py b/tests/unit/test_pii_sanitizer.py new file mode 100644 index 000000000..bc3f4be5c --- /dev/null +++ b/tests/unit/test_pii_sanitizer.py @@ -0,0 +1,81 @@ +"""Tests de l'assainissement PII des données capturées (titres, texte, OCR). + +Couche 1 (sans modèle) : filet regex sur la PII structurée (IPP, NIR, TEL, +EMAIL, AGE) + règles structurelles cliniques (NOM (NAISSANCE) Prénom ; +[Nom Prénom] des fenêtres PACS), avec tokens TYPÉS et COHÉRENTS ([IPP_1]…). + +Réutilise l'approche du projet `anonymisation` (placeholders + regex). La +couche NER (noms libres) viendra en complément. Cas réels remontés en clinique +le 28/06 (anonymisés ici par construction). Branche feat/push-log-dgx. +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +def test_ipp_et_age_tokenises(): + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + titre = "VIOLA (VIOLA) Liliane 90 ans - IPP: 168246 - Expert Sante - Mozilla Firefox" + out, ents = anonymize_text(titre) + + assert "168246" not in out, out # IPP retiré + assert "[IPP_1]" in out + assert "90 ans" not in out # âge retiré + assert "[AGE_1]" in out + # le nom format clinique « NOM (NAISSANCE) Prénom » est tokenisé + assert "VIOLA" not in out and "Liliane" not in out, out + assert "[NOM_1]" in out + # le logiciel n'est pas pris pour de la PII + assert "Firefox" in out and "Expert Sante" in out + types = {e["type"] for e in ents} + assert {"IPP", "AGE", "NOM"} <= types + + +def test_nom_entre_crochets_pacs(): + """Le PACS met le patient entre crochets : `[DATTIN Alix]`.""" + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + titre = "GXD5 Pacs 4.0.4.307 CIM ARES - [DATTIN Alix] - Mozilla Firefox" + out, _ = anonymize_text(titre) + + assert "DATTIN" not in out and "Alix" not in out, out + assert "[NOM_1]" in out + assert "Pacs" in out and "Firefox" in out # contexte logiciel préservé + + +def test_coherence_meme_ipp_meme_token(): + """Même valeur PII -> même token (sur un mapping partagé de session).""" + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + mapping: dict = {} + o1, _ = anonymize_text("IPP: 168246 ouvert", mapping=mapping) + o2, _ = anonymize_text("dossier IPP: 168246 fermé", mapping=mapping) + o3, _ = anonymize_text("IPP: 270020 autre", mapping=mapping) + + assert "[IPP_1]" in o1 and "[IPP_1]" in o2 # même patient -> même token + assert "[IPP_2]" in o3 # patient différent -> token différent + assert "270020" not in o3 + + +def test_email_et_telephone(): + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + out, _ = anonymize_text("contact j.dupont@chu.fr / 06 12 34 56 78") + assert "@chu.fr" not in out and "[EMAIL_1]" in out + assert "06 12 34 56 78" not in out and "[TEL_1]" in out + + +def test_texte_sans_pii_inchange(): + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + t = "Expert Sante - Consultation - Mozilla Firefox" + out, ents = anonymize_text(t) + assert out == t + assert ents == []