From 6075717353410826de075a876bbc5dd980a01d6f Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 29 Jun 2026 17:44:24 +0200 Subject: [PATCH] feat(server): durcissement sanitizer PII (chevauchements + GXD5 + workflow_dict) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Résolution des chevauchements par priorité de détecteur + longueur : corrige le FN où, sur 'Dossier/Patient NOM (NAISSANCE) Prénom', le nom de naissance fuyait. (Qwen) - RE_GXD5_DIAG : tokenise le numéro de dossier ([DOSSIER_n]) ET le nom ([NOM_n]) dans 'GXD5 Diagnostics - - NOM PRENOM' — 3 patients fuyaient en prod clinique, 0 FP. (Qwen) - sanitize_workflow_dict : assainit les champs texte d'un workflow appris (by_text, noms) avant import en DB VWB (canal apprentissage). Utilisé par R1. (Claude) 14 tests verts. Co-Authored-By: Claude Opus 4.8 (1M context) --- agent_v0/server_v1/pii_sanitizer.py | 64 ++++++++++++++++++++++++++--- tests/unit/test_pii_sanitizer.py | 62 ++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/agent_v0/server_v1/pii_sanitizer.py b/agent_v0/server_v1/pii_sanitizer.py index 8515ef631..c85207efb 100644 --- a/agent_v0/server_v1/pii_sanitizer.py +++ b/agent_v0/server_v1/pii_sanitizer.py @@ -42,10 +42,19 @@ RE_NOM_BRACKET = re.compile( # 2e mot tout en MAJUSCULES → faible risque de FP (« Mozilla Firefox » ne matche pas). RE_PRENOM_NOM = re.compile(rf"\b[{_MAJ}][{_MIN}]+\s+[{_MAJ}][{_MAJ}\-']+\b") +# GXD5 Diagnostics : numéro de dossier + nom patient tout-majuscules. +# Format réel : « GXD5 Diagnostics - 128008 - BENVENISTE MARIE-LAURENCE » +# Le numéro (128008) = ID dossier patient (PII). Le nom = PII. +# 2 groupes de capture : (1)=numéro, (2)=nom complet. +RE_GXD5_DIAG = re.compile( + rf"GXD5\s+Diagnostics\s*-\s*(\d+)\s*-\s*([{_MAJ}][{_MAJ}\-' ]+)" +) + # Ordre = priorité ; group = portion à remplacer (0 = match entier). _DETECTORS: List[Tuple[re.Pattern, str, int]] = [ (RE_NOM_NAISSANCE, "NOM", 0), (RE_NOM_BRACKET, "NOM", 0), + (RE_GXD5_DIAG, "DOSSIER", 1), # numéro de dossier (RE_PRENOM_NOM, "NOM", 0), (RE_EMAIL, "EMAIL", 0), (RE_NIR, "NIR", 0), @@ -53,6 +62,8 @@ _DETECTORS: List[Tuple[re.Pattern, str, int]] = [ (RE_TEL, "TEL", 0), (RE_AGE, "AGE", 0), ] +# GXD5 nom (groupe 2) traité séparément — même regex, priorité juste après. +_DETECTORS.append((RE_GXD5_DIAG, "NOM", 2)) # Anti-faux-positifs : termes logiciels/UI à ne jamais prendre pour un nom. # (Sous-ensemble inline ; les gazetteers complets arrivent avec la couche NER.) @@ -116,14 +127,21 @@ def anonymize_text( continue spans.append((start, end, etype, value)) - # 2) résolution des chevauchements (priorité = ordre des détecteurs, puis position) - spans.sort(key=lambda s: (s[0], s[1])) + # 2) résolution des chevauchements (priorité = rang détecteur, puis -longueur) + # _DETECTORS est ordonné par priorité ; le rang dans cette liste détermine + # qui gagne quand deux patterns chevauchent. Plus prioritaire + plus long + # = accepté en premier, les plus courts/moins prioritaires sont éliminés. + # Fix FN « Dossier VIOLA (VIOLA) Liliane » : RE_PRENOM_NOM captait + # « Dossier VIOLA » (rang 2) et bloquait RE_NOM_NAISSANCE « VIOLA (VIOLA) + # Liliane » (rang 0, plus prioritaire et plus long). + det_rank = {p: i for i, (p, _, _) in enumerate(_DETECTORS)} + spans.sort(key=lambda s: (det_rank.get(s[2], 999), -(s[1] - s[0]), s[0])) + occupied: List[Tuple[int, int]] = [] accepted: List[Tuple[int, int, str, str]] = [] - last_end = -1 for start, end, etype, value in spans: - if start >= last_end: + if all(start >= oe or end <= os for os, oe in occupied): accepted.append((start, end, etype, value)) - last_end = end + occupied.append((start, end)) # 3) substitution (de droite à gauche pour préserver les indices) entities: List[Dict] = [] @@ -183,3 +201,39 @@ def sanitize_event(event: Dict, *, mapping: Optional[Dict] = None) -> Dict: _walk_titles(ev, mapping) return ev + + +# Clés d'un workflow core portant du texte potentiellement PII : cible OCR +# (`by_text`), noms d'écrans/labels dérivés des titres. Le contenu saisi est +# déjà neutralisé à la source (sanitize_event → [SAISIE]). +_WORKFLOW_TEXT_KEYS = ("by_text", "name", "label") + + +def _walk_workflow_text(obj, mapping: Dict) -> None: + """Parcourt un workflow core et tokenise la PII des champs texte (cibles, noms).""" + if isinstance(obj, dict): + for k, v in obj.items(): + if k in _WORKFLOW_TEXT_KEYS and isinstance(v, str) and v: + obj[k] = anonymize_text(v, mapping=mapping)[0] + else: + _walk_workflow_text(v, mapping) + elif isinstance(obj, list): + for item in obj: + _walk_workflow_text(item, mapping) + + +def sanitize_workflow_dict(workflow_dict: Dict, *, mapping: Optional[Dict] = None) -> Dict: + """Assainit un workflow core (JSON appris) avant import/persistance en DB VWB. + + Tokenise la PII des champs texte (cible OCR `by_text`, noms d'écrans, labels) + via `anonymize_text`, en gardant l'interface intacte (« Léa apprend + l'interface, pas la donnée »). Copie — l'original n'est pas muté. + + Limite (couche 1) : ne capte que la PII structurée (IPP, NOM clinique…) ; + les noms libres relèvent de la couche 2 NER. + """ + if mapping is None: + mapping = {} + wf = copy.deepcopy(workflow_dict) + _walk_workflow_text(wf, mapping) + return wf diff --git a/tests/unit/test_pii_sanitizer.py b/tests/unit/test_pii_sanitizer.py index 7a99eec12..dfd9101ad 100644 --- a/tests/unit/test_pii_sanitizer.py +++ b/tests/unit/test_pii_sanitizer.py @@ -172,3 +172,65 @@ def test_sanitize_event_titre_imbrique_vision_info(): assert "[IPP_1]" in wc # cohérence : même titre dans window et vision_info -> même token assert out["window"]["title"] == wc + + +def test_sanitize_workflow_dict_tokenise_by_text_garde_ui(): + """R1/PII : un workflow appris ne doit pas porter de PII brute dans ses cibles + (by_text) ni ses noms avant import en DB VWB ; l'interface est préservée.""" + import json + from agent_v0.server_v1.pii_sanitizer import sanitize_workflow_dict + + wf = { + "name": "Dossier patient", + "nodes": [{"node_id": "n1", "name": "VIOLA (VIOLA) Liliane 90 ans"}], + "edges": [{ + "edge_id": "e1", + "action": { + "type": "mouse_click", + "target": {"by_text": "Valider", "by_role": "ocr"}, + }, + }], + } + out = sanitize_workflow_dict(wf) + s = json.dumps(out, ensure_ascii=False) + assert "VIOLA" not in s # nom clinique tokenisé (dans un node name) + assert "[NOM_1]" in s + assert "90 ans" not in s # âge tokenisé + assert "Valider" in s # cible UI préservée (by_text) + assert "VIOLA" in json.dumps(wf, ensure_ascii=False) # original non muté + + +def test_chevauchement_prefix_capitalise(): + """FN bloquant (Claude R1) : mot capitalisé avant NOM (NAISSANCE) Prénom + -> RE_PRENOM_NOM captait « Dossier VIOLA » et bloquait RE_NOM_NAISSANCE + « VIOLA (VIOLA) Liliane ». Fix : résolution par priorité détecteur + longueur.""" + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + m: dict = {} + for titre, leak in [("Dossier VIOLA (VIOLA) Liliane", "VIOLA"), + ("Patient ROSSIGNOL (SOUBIE) Pierrette", "ROSSIGNOL"), + ("Fenetre LAVAL (BARTHELEMY) Nicole", "LAVAL")]: + out, _ = anonymize_text(titre, mapping=m) + assert leak not in out, f"FN: {leak} still visible in '{out}'" + + # contrôle : sans préfixe, toujours OK + out, _ = anonymize_text("VIOLA (VIOLA) Liliane", mapping=m) + assert "VIOLA" not in out + + +def test_gxd5_diagnostics_numero_et_nom(): + """GXD5 Diagnostics — numéro de dossier + nom tout-majuscules (3 patients prod).""" + from agent_v0.server_v1.pii_sanitizer import anonymize_text + + m: dict = {} + for titre, num_leak, nom_leak in [ + ("GXD5 Diagnostics - 128008 - BENVENISTE MARIE-LAURENCE", "128008", "BENVENISTE"), + ("GXD5 Diagnostics - 272223 - LEMOINE ERIC", "272223", "LEMOINE"), + ("GXD5 Diagnostics - 153442 - ROSELIER MATHEO", "153442", "ROSELIER"), + ]: + out, ents = anonymize_text(titre, mapping=m) + assert num_leak not in out, f"FN: numéro {num_leak} visible dans '{out}'" + assert nom_leak not in out, f"FN: nom {nom_leak} visible dans '{out}'" + types = {e["type"] for e in ents} + assert "DOSSIER" in types, f"Pas de token DOSSIER dans {ents}" + assert "NOM" in types, f"Pas de token NOM dans {ents}"