feat(server): durcissement sanitizer PII (chevauchements + GXD5 + workflow_dict)

- Résolution des chevauchements par priorité de détecteur + longueur : corrige le
  FN où, sur 'Dossier/Patient NOM (NAISSANCE) Prénom', le nom de naissance fuyait. (Qwen)
- RE_GXD5_DIAG : tokenise le numéro de dossier ([DOSSIER_n]) ET le nom ([NOM_n]) dans
  'GXD5 Diagnostics - <num> - NOM PRENOM' — 3 patients fuyaient en prod clinique, 0 FP. (Qwen)
- sanitize_workflow_dict : assainit les champs texte d'un workflow appris (by_text, noms)
  avant import en DB VWB (canal apprentissage). Utilisé par R1. (Claude)
14 tests verts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-29 17:44:24 +02:00
parent 13f760a3b9
commit 6075717353
2 changed files with 121 additions and 5 deletions

View File

@@ -42,10 +42,19 @@ RE_NOM_BRACKET = re.compile(
# 2e mot tout en MAJUSCULES → faible risque de FP (« Mozilla Firefox » ne matche pas). # 2e mot tout en MAJUSCULES → faible risque de FP (« Mozilla Firefox » ne matche pas).
RE_PRENOM_NOM = re.compile(rf"\b[{_MAJ}][{_MIN}]+\s+[{_MAJ}][{_MAJ}\-']+\b") RE_PRENOM_NOM = re.compile(rf"\b[{_MAJ}][{_MIN}]+\s+[{_MAJ}][{_MAJ}\-']+\b")
# GXD5 Diagnostics : numéro de dossier + nom patient tout-majuscules.
# Format réel : « GXD5 Diagnostics - 128008 - BENVENISTE MARIE-LAURENCE »
# Le numéro (128008) = ID dossier patient (PII). Le nom = PII.
# 2 groupes de capture : (1)=numéro, (2)=nom complet.
RE_GXD5_DIAG = re.compile(
rf"GXD5\s+Diagnostics\s*-\s*(\d+)\s*-\s*([{_MAJ}][{_MAJ}\-' ]+)"
)
# Ordre = priorité ; group = portion à remplacer (0 = match entier). # Ordre = priorité ; group = portion à remplacer (0 = match entier).
_DETECTORS: List[Tuple[re.Pattern, str, int]] = [ _DETECTORS: List[Tuple[re.Pattern, str, int]] = [
(RE_NOM_NAISSANCE, "NOM", 0), (RE_NOM_NAISSANCE, "NOM", 0),
(RE_NOM_BRACKET, "NOM", 0), (RE_NOM_BRACKET, "NOM", 0),
(RE_GXD5_DIAG, "DOSSIER", 1), # numéro de dossier
(RE_PRENOM_NOM, "NOM", 0), (RE_PRENOM_NOM, "NOM", 0),
(RE_EMAIL, "EMAIL", 0), (RE_EMAIL, "EMAIL", 0),
(RE_NIR, "NIR", 0), (RE_NIR, "NIR", 0),
@@ -53,6 +62,8 @@ _DETECTORS: List[Tuple[re.Pattern, str, int]] = [
(RE_TEL, "TEL", 0), (RE_TEL, "TEL", 0),
(RE_AGE, "AGE", 0), (RE_AGE, "AGE", 0),
] ]
# GXD5 nom (groupe 2) traité séparément — même regex, priorité juste après.
_DETECTORS.append((RE_GXD5_DIAG, "NOM", 2))
# Anti-faux-positifs : termes logiciels/UI à ne jamais prendre pour un nom. # Anti-faux-positifs : termes logiciels/UI à ne jamais prendre pour un nom.
# (Sous-ensemble inline ; les gazetteers complets arrivent avec la couche NER.) # (Sous-ensemble inline ; les gazetteers complets arrivent avec la couche NER.)
@@ -116,14 +127,21 @@ def anonymize_text(
continue continue
spans.append((start, end, etype, value)) spans.append((start, end, etype, value))
# 2) résolution des chevauchements (priorité = ordre des détecteurs, puis position) # 2) résolution des chevauchements (priorité = rang détecteur, puis -longueur)
spans.sort(key=lambda s: (s[0], s[1])) # _DETECTORS est ordonné par priorité ; le rang dans cette liste détermine
# qui gagne quand deux patterns chevauchent. Plus prioritaire + plus long
# = accepté en premier, les plus courts/moins prioritaires sont éliminés.
# Fix FN « Dossier VIOLA (VIOLA) Liliane » : RE_PRENOM_NOM captait
# « Dossier VIOLA » (rang 2) et bloquait RE_NOM_NAISSANCE « VIOLA (VIOLA)
# Liliane » (rang 0, plus prioritaire et plus long).
det_rank = {p: i for i, (p, _, _) in enumerate(_DETECTORS)}
spans.sort(key=lambda s: (det_rank.get(s[2], 999), -(s[1] - s[0]), s[0]))
occupied: List[Tuple[int, int]] = []
accepted: List[Tuple[int, int, str, str]] = [] accepted: List[Tuple[int, int, str, str]] = []
last_end = -1
for start, end, etype, value in spans: for start, end, etype, value in spans:
if start >= last_end: if all(start >= oe or end <= os for os, oe in occupied):
accepted.append((start, end, etype, value)) accepted.append((start, end, etype, value))
last_end = end occupied.append((start, end))
# 3) substitution (de droite à gauche pour préserver les indices) # 3) substitution (de droite à gauche pour préserver les indices)
entities: List[Dict] = [] entities: List[Dict] = []
@@ -183,3 +201,39 @@ def sanitize_event(event: Dict, *, mapping: Optional[Dict] = None) -> Dict:
_walk_titles(ev, mapping) _walk_titles(ev, mapping)
return ev return ev
# Clés d'un workflow core portant du texte potentiellement PII : cible OCR
# (`by_text`), noms d'écrans/labels dérivés des titres. Le contenu saisi est
# déjà neutralisé à la source (sanitize_event → [SAISIE]).
_WORKFLOW_TEXT_KEYS = ("by_text", "name", "label")
def _walk_workflow_text(obj, mapping: Dict) -> None:
"""Parcourt un workflow core et tokenise la PII des champs texte (cibles, noms)."""
if isinstance(obj, dict):
for k, v in obj.items():
if k in _WORKFLOW_TEXT_KEYS and isinstance(v, str) and v:
obj[k] = anonymize_text(v, mapping=mapping)[0]
else:
_walk_workflow_text(v, mapping)
elif isinstance(obj, list):
for item in obj:
_walk_workflow_text(item, mapping)
def sanitize_workflow_dict(workflow_dict: Dict, *, mapping: Optional[Dict] = None) -> Dict:
"""Assainit un workflow core (JSON appris) avant import/persistance en DB VWB.
Tokenise la PII des champs texte (cible OCR `by_text`, noms d'écrans, labels)
via `anonymize_text`, en gardant l'interface intacte (« Léa apprend
l'interface, pas la donnée »). Copie — l'original n'est pas muté.
Limite (couche 1) : ne capte que la PII structurée (IPP, NOM clinique…) ;
les noms libres relèvent de la couche 2 NER.
"""
if mapping is None:
mapping = {}
wf = copy.deepcopy(workflow_dict)
_walk_workflow_text(wf, mapping)
return wf

View File

@@ -172,3 +172,65 @@ def test_sanitize_event_titre_imbrique_vision_info():
assert "[IPP_1]" in wc assert "[IPP_1]" in wc
# cohérence : même titre dans window et vision_info -> même token # cohérence : même titre dans window et vision_info -> même token
assert out["window"]["title"] == wc assert out["window"]["title"] == wc
def test_sanitize_workflow_dict_tokenise_by_text_garde_ui():
"""R1/PII : un workflow appris ne doit pas porter de PII brute dans ses cibles
(by_text) ni ses noms avant import en DB VWB ; l'interface est préservée."""
import json
from agent_v0.server_v1.pii_sanitizer import sanitize_workflow_dict
wf = {
"name": "Dossier patient",
"nodes": [{"node_id": "n1", "name": "VIOLA (VIOLA) Liliane 90 ans"}],
"edges": [{
"edge_id": "e1",
"action": {
"type": "mouse_click",
"target": {"by_text": "Valider", "by_role": "ocr"},
},
}],
}
out = sanitize_workflow_dict(wf)
s = json.dumps(out, ensure_ascii=False)
assert "VIOLA" not in s # nom clinique tokenisé (dans un node name)
assert "[NOM_1]" in s
assert "90 ans" not in s # âge tokenisé
assert "Valider" in s # cible UI préservée (by_text)
assert "VIOLA" in json.dumps(wf, ensure_ascii=False) # original non muté
def test_chevauchement_prefix_capitalise():
"""FN bloquant (Claude R1) : mot capitalisé avant NOM (NAISSANCE) Prénom
-> RE_PRENOM_NOM captait « Dossier VIOLA » et bloquait RE_NOM_NAISSANCE
« VIOLA (VIOLA) Liliane ». Fix : résolution par priorité détecteur + longueur."""
from agent_v0.server_v1.pii_sanitizer import anonymize_text
m: dict = {}
for titre, leak in [("Dossier VIOLA (VIOLA) Liliane", "VIOLA"),
("Patient ROSSIGNOL (SOUBIE) Pierrette", "ROSSIGNOL"),
("Fenetre LAVAL (BARTHELEMY) Nicole", "LAVAL")]:
out, _ = anonymize_text(titre, mapping=m)
assert leak not in out, f"FN: {leak} still visible in '{out}'"
# contrôle : sans préfixe, toujours OK
out, _ = anonymize_text("VIOLA (VIOLA) Liliane", mapping=m)
assert "VIOLA" not in out
def test_gxd5_diagnostics_numero_et_nom():
"""GXD5 Diagnostics — numéro de dossier + nom tout-majuscules (3 patients prod)."""
from agent_v0.server_v1.pii_sanitizer import anonymize_text
m: dict = {}
for titre, num_leak, nom_leak in [
("GXD5 Diagnostics - 128008 - BENVENISTE MARIE-LAURENCE", "128008", "BENVENISTE"),
("GXD5 Diagnostics - 272223 - LEMOINE ERIC", "272223", "LEMOINE"),
("GXD5 Diagnostics - 153442 - ROSELIER MATHEO", "153442", "ROSELIER"),
]:
out, ents = anonymize_text(titre, mapping=m)
assert num_leak not in out, f"FN: numéro {num_leak} visible dans '{out}'"
assert nom_leak not in out, f"FN: nom {nom_leak} visible dans '{out}'"
types = {e["type"] for e in ents}
assert "DOSSIER" in types, f"Pas de token DOSSIER dans {ents}"
assert "NOM" in types, f"Pas de token NOM dans {ents}"