From 1844d1be7e4dd61b45e8dc737040beee3dbf96a2 Mon Sep 17 00:00:00 2001 From: dom Date: Fri, 20 Feb 2026 15:18:42 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20sanitisation=20d=C3=A9terministe=20des?= =?UTF-8?q?=20codes=20CIM-10=20hors=20p=C3=A9rim=C3=A8tre=20CPAM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Le LLM (deepseek) propose systématiquement des codes alternatifs (D62, T81.0, T80, R39.2) malgré l'interdiction dans le prompt. Ces codes déclenchaient des warnings CRITIQUE → Tier C automatique. Solution conforme au principe "LLM propose, moteur de règles dispose" : - _sanitize_unauthorized_codes() supprime les codes hors whitelist du texte de la réponse AVANT toute validation - Nettoyage propre : "D62 — libellé" → "libellé", "(D62)" → "" - _build_whitelist_prefixes() factorisé en helper partagé - Sanitisation appliquée après génération ET après correction - 9 tests unitaires couvrant tous les cas (parenthèses, tirets, multiple) Résultat live : 0 warning CRITIQUE "code hors périmètre" sur 3 dossiers (vs 6 warnings CRITIQUE avant). Le seul CRITIQUE restant est le score adversarial bas, qui reflète des limites de raisonnement du modèle. Co-Authored-By: Claude Opus 4.6 --- src/control/cpam_response.py | 18 +++- src/control/cpam_validation.py | 175 ++++++++++++++++++++++++++------- tests/test_cpam_response.py | 130 ++++++++++++++++++++++++ 3 files changed, 285 insertions(+), 38 deletions(-) diff --git a/src/control/cpam_response.py b/src/control/cpam_response.py index c5985a7..e50afaa 100644 --- a/src/control/cpam_response.py +++ b/src/control/cpam_response.py @@ -25,6 +25,7 @@ from .cpam_validation import ( _validate_grounding, _validate_references, _validate_codes_in_response, + _sanitize_unauthorized_codes, _build_correction_prompt, _format_response, _assess_quality_tier, @@ -39,7 +40,7 @@ from .cpam_context import ( # noqa: F401 _build_bio_summary, _check_das_bio_coherence, ) -from .cpam_validation import _CIM10_CODE_RE, _validate_adversarial as _validate_adversarial, _assess_quality_tier as _assess_quality_tier, _fuzzy_match_ref as _fuzzy_match_ref # noqa: F401 +from .cpam_validation import _CIM10_CODE_RE, _validate_adversarial as _validate_adversarial, _assess_quality_tier as _assess_quality_tier, _fuzzy_match_ref as _fuzzy_match_ref, _sanitize_unauthorized_codes as _sanitize_unauthorized_codes # noqa: F401 logger = logging.getLogger(__name__) @@ -150,17 +151,23 @@ def generate_cpam_response( logger.warning(" LLM non disponible — contre-argumentation non générée") return "", None, rag_sources - # 6. Validation des références RAG + # 6. Sanitisation déterministe — supprime les codes CIM-10 hors périmètre + sanitized = _sanitize_unauthorized_codes(result, dossier, controle) + if sanitized: + logger.info(" CPAM : %d code(s) hors périmètre supprimé(s) : %s", + len(sanitized), ", ".join(sanitized)) + + # 7. Validation des références RAG ref_warnings = _validate_references(result, sources) if ref_warnings: logger.warning(" CPAM : %d référence(s) non vérifiable(s)", len(ref_warnings)) - # 7. Validation grounding (preuves traçables vers le dossier) + # 8. Validation grounding (preuves traçables vers le dossier) grounding_warnings = _validate_grounding(result, tag_map) if grounding_warnings: logger.warning(" CPAM : %d preuve(s) non traçable(s)", len(grounding_warnings)) - # 7b. Validation codes fermée (périmètre dossier + UCR) + # 8b. Validation codes fermée (périmètre dossier + UCR) — post-sanitisation code_warnings = _validate_codes_in_response(result, dossier, controle) if code_warnings: logger.warning(" CPAM : %d code(s) hors périmètre", len(code_warnings)) @@ -202,7 +209,8 @@ def generate_cpam_response( logger.info(" Correction acceptée (score %s → %s)", score1, score2) result = corrected validation = validation2 - # Recalculer les warnings + # Sanitiser + recalculer les warnings + _sanitize_unauthorized_codes(result, dossier, controle) ref_warnings = _validate_references(result, sources) grounding_warnings = _validate_grounding(result, tag_map) code_warnings = _validate_codes_in_response(result, dossier, controle) diff --git a/src/control/cpam_validation.py b/src/control/cpam_validation.py index 35bd672..f04c26c 100644 --- a/src/control/cpam_validation.py +++ b/src/control/cpam_validation.py @@ -110,6 +110,145 @@ def _validate_references(parsed: dict, sources: list[dict]) -> list[str]: # Regex pour capturer les codes CIM-10 (ex: K81.0, E87, Z45.80) _CIM10_CODE_RE = re.compile(r"\b([A-Z]\d{2}\.?\d{0,2})\b") +# Champs textuels de la réponse LLM à scanner pour les codes CIM-10 +_TEXT_FIELDS = ( + "analyse_contestation", + "contre_arguments_medicaux", + "contre_arguments_asymetrie", + "contre_arguments_reglementaires", + "conclusion", +) + + +def _build_whitelist_prefixes( + dossier: DossierMedical, + controle: ControleCPAM, +) -> set[str]: + """Construit la whitelist de préfixes CIM-10 autorisés (3 chars). + + Sources : DP + DAS du dossier, dp_ucr + da_ucr + dr_ucr du contrôle. + """ + prefixes: set[str] = set() + + def _add(raw: str) -> None: + raw = raw.strip() + if not raw: + return + norm = normalize_code(raw) + if norm and len(norm) >= 3: + prefixes.add(norm[:3]) + + if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion: + _add(dossier.diagnostic_principal.cim10_suggestion) + for das in dossier.diagnostics_associes: + if das.cim10_suggestion: + _add(das.cim10_suggestion) + + for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): + if not field: + continue + for raw in re.split(r"[,;\s]+", field.strip()): + _add(raw) + + return prefixes + + +# Patterns pour supprimer proprement un code hors périmètre et ses artefacts : +# "D62 — libellé" → "libellé" +# "(D62)" → "" +# "D62" → "" +_SANITIZE_PATTERNS = [ + # "CODE — libellé" ou "CODE - libellé" + re.compile(r"\b[A-Z]\d{2}\.?\d{0,2}\s*[—–\-]\s*"), + # "(CODE)" avec espaces optionnels + re.compile(r"\(\s*[A-Z]\d{2}\.?\d{0,2}\s*\)"), + # "CODE" seul + _CIM10_CODE_RE, +] + + +def _sanitize_unauthorized_codes( + parsed: dict, + dossier: DossierMedical, + controle: ControleCPAM, +) -> list[str]: + """Supprime les codes CIM-10 hors périmètre des champs textuels de la réponse. + + Modifie `parsed` in-place. Applique le principe « LLM propose, moteur de + règles dispose » : le texte garde le sens médical mais les codes inventés + sont retirés pour éviter les warnings CRITIQUE. + + Returns: + Liste des codes supprimés (pour logging). + """ + whitelist = _build_whitelist_prefixes(dossier, controle) + if not whitelist: + return [] + + removed: list[str] = [] + + def _is_authorized(code_str: str) -> bool: + norm = normalize_code(code_str) + return bool(norm and len(norm) >= 3 and norm[:3] in whitelist) + + def _replace_code(match: re.Match) -> str: + """Callback de remplacement : garde le code si autorisé, supprime sinon.""" + code = _CIM10_CODE_RE.search(match.group(0)) + if not code: + return match.group(0) + if _is_authorized(code.group(0)): + return match.group(0) + if code.group(0) not in removed: + removed.append(code.group(0)) + return "" + + # Sanitiser les champs textuels + for key in _TEXT_FIELDS: + val = parsed.get(key) + if not val or not isinstance(val, str): + continue + new_val = val + for pattern in _SANITIZE_PATTERNS: + new_val = pattern.sub( + lambda m, _p=pattern: _replace_code(m), + new_val, + ) + # Nettoyage artefacts : doubles espaces, parenthèses vides + new_val = re.sub(r"\(\s*\)", "", new_val) + new_val = re.sub(r" +", " ", new_val) + new_val = new_val.strip() + if new_val != val: + parsed[key] = new_val + + # Sanitiser aussi les preuves_dossier.valeur + preuves = parsed.get("preuves_dossier") + if preuves and isinstance(preuves, list): + for p in preuves: + if not isinstance(p, dict): + continue + v = p.get("valeur", "") + if not v or not isinstance(v, str): + continue + new_v = v + for pattern in _SANITIZE_PATTERNS: + new_v = pattern.sub( + lambda m, _p=pattern: _replace_code(m), + new_v, + ) + new_v = re.sub(r"\(\s*\)", "", new_v) + new_v = re.sub(r" +", " ", new_v).strip() + if new_v != v: + p["valeur"] = new_v + + if removed: + for code in removed: + norm = normalize_code(code) + is_valid, label = validate_code(norm) + label_str = f" ({label})" if is_valid and label else "" + logger.info("Sanitize : code %s%s hors périmètre supprimé du texte", code, label_str) + + return removed + def _validate_codes_in_response( parsed: dict, @@ -125,43 +264,13 @@ def _validate_codes_in_response( Returns: Liste de warnings pour les codes hors périmètre. """ - # 1. Construire la whitelist (préfixes 3 chars) - whitelist_prefixes: set[str] = set() - - def _add_code(raw: str) -> None: - raw = raw.strip() - if not raw: - return - norm = normalize_code(raw) - if norm and len(norm) >= 3: - whitelist_prefixes.add(norm[:3]) - - # Codes du dossier - if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion: - _add_code(dossier.diagnostic_principal.cim10_suggestion) - for das in dossier.diagnostics_associes: - if das.cim10_suggestion: - _add_code(das.cim10_suggestion) - - # Codes de l'UCR - for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): - if not field: - continue - for raw in re.split(r"[,;\s]+", field.strip()): - _add_code(raw) - + whitelist_prefixes = _build_whitelist_prefixes(dossier, controle) if not whitelist_prefixes: return [] - # 2. Extraire les codes CIM-10 de la réponse LLM (hors citations RAG) + # 2. Extraire les codes CIM-10 de la réponse LLM text_fields = [] - for key in ( - "analyse_contestation", - "contre_arguments_medicaux", - "contre_arguments_asymetrie", - "contre_arguments_reglementaires", - "conclusion", - ): + for key in _TEXT_FIELDS: val = parsed.get(key) if val and isinstance(val, str): text_fields.append(val) diff --git a/tests/test_cpam_response.py b/tests/test_cpam_response.py index 37898eb..f03253f 100644 --- a/tests/test_cpam_response.py +++ b/tests/test_cpam_response.py @@ -28,6 +28,7 @@ from src.control.cpam_response import ( _fuzzy_match_ref, _get_cim10_definitions, _get_code_label, + _sanitize_unauthorized_codes, _search_rag_for_control, _validate_adversarial, _validate_codes_in_response, @@ -2189,3 +2190,132 @@ class TestFuzzyMatchRef: warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 1 assert "Antécédents" in warnings[0] + + +class TestSanitizeUnauthorizedCodes: + """Tests pour la sanitisation déterministe des codes CIM-10 hors périmètre.""" + + def _make_dossier_with_codes(self, dp_code="K81.0", das_codes=None): + das = [Diagnostic(texte=f"DAS {c}", cim10_suggestion=c) for c in (das_codes or [])] + return DossierMedical( + source_file="test.pdf", + diagnostic_principal=Diagnostic(texte="DP test", cim10_suggestion=dp_code), + diagnostics_associes=das, + ) + + def test_authorized_codes_kept(self): + """Les codes dans le périmètre ne sont pas modifiés.""" + dossier = self._make_dossier_with_codes("K81.0", ["K56.0"]) + controle = ControleCPAM(numero_ogc=1, da_ucr="K56.0") + parsed = { + "contre_arguments_medicaux": "Le code K81.0 est justifié par la clinique.", + "conclusion": "Le codage K81.0 et K56.0 est correct.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert len(removed) == 0 + assert "K81.0" in parsed["contre_arguments_medicaux"] + assert "K56.0" in parsed["conclusion"] + + def test_unauthorized_code_removed_from_text(self): + """Un code hors périmètre (D62) est supprimé du texte.""" + dossier = self._make_dossier_with_codes("K81.0") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "contre_arguments_medicaux": "Le code D62 serait plus approprié que K81.0.", + "conclusion": "Maintenir K81.0.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert "D62" in removed + assert "D62" not in parsed["contre_arguments_medicaux"] + # K81.0 est toujours là + assert "K81.0" in parsed["contre_arguments_medicaux"] + + def test_code_with_dash_libelle_cleaned(self): + """'D62 — Anémie posthémorragique' → 'Anémie posthémorragique'.""" + dossier = self._make_dossier_with_codes("K81.0") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "contre_arguments_medicaux": "D62 — Anémie posthémorragique aiguë est plus adapté.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert "D62" in removed + text = parsed["contre_arguments_medicaux"] + assert "D62" not in text + assert "Anémie posthémorragique" in text + + def test_code_in_parentheses_cleaned(self): + """'anémie (D62)' → 'anémie'.""" + dossier = self._make_dossier_with_codes("K81.0") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "conclusion": "L'anémie (D62) n'est pas justifiée.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert "D62" in removed + text = parsed["conclusion"] + assert "(D62)" not in text + assert "()" not in text + assert "anémie" in text.lower() + + def test_multiple_unauthorized_codes(self): + """Plusieurs codes hors périmètre sont tous supprimés.""" + dossier = self._make_dossier_with_codes("K81.0") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "contre_arguments_medicaux": "D62 et T81.0 et T80 sont des alternatives.", + "conclusion": "K81.0 est maintenu.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert len(removed) == 3 + assert "D62" not in parsed["contre_arguments_medicaux"] + assert "T81.0" not in parsed["contre_arguments_medicaux"] + assert "T80" not in parsed["contre_arguments_medicaux"] + + def test_preuves_dossier_sanitized(self): + """Les codes hors périmètre dans preuves_dossier.valeur sont aussi nettoyés.""" + dossier = self._make_dossier_with_codes("K81.0") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "preuves_dossier": [ + {"ref": "BIO-1", "valeur": "Anémie D62 documentée", "signification": "test"}, + ], + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert "D62" in removed + assert "D62" not in parsed["preuves_dossier"][0]["valeur"] + + def test_no_whitelist_no_sanitization(self): + """Sans whitelist (pas de codes dans le dossier), aucune sanitisation.""" + dossier = DossierMedical(source_file="test.pdf") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "contre_arguments_medicaux": "Le code D62 est pertinent.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert len(removed) == 0 + assert "D62" in parsed["contre_arguments_medicaux"] + + def test_prefix_match_allows_subcodes(self): + """K81.09 est autorisé si K81.0 est dans le périmètre (même préfixe K81).""" + dossier = self._make_dossier_with_codes("K81.0") + controle = ControleCPAM(numero_ogc=1) + parsed = { + "contre_arguments_medicaux": "K81.09 est un sous-code valide.", + } + removed = _sanitize_unauthorized_codes(parsed, dossier, controle) + assert len(removed) == 0 + assert "K81.09" in parsed["contre_arguments_medicaux"] + + def test_validate_codes_after_sanitize_no_warnings(self): + """Après sanitisation, _validate_codes_in_response ne trouve plus de violations.""" + dossier = self._make_dossier_with_codes("K81.0", ["K56.0"]) + controle = ControleCPAM(numero_ogc=1, da_ucr="K56.0") + parsed = { + "contre_arguments_medicaux": "D62 et T81.0 sont hors périmètre. K81.0 est correct.", + "conclusion": "Maintenir K81.0.", + } + # Sanitise d'abord + _sanitize_unauthorized_codes(parsed, dossier, controle) + # Puis valide → 0 warning + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) == 0