diff --git a/src/control/cpam_context.py b/src/control/cpam_context.py index 66a048c..6447afc 100644 --- a/src/control/cpam_context.py +++ b/src/control/cpam_context.py @@ -392,6 +392,65 @@ def _check_das_bio_coherence(dossier: DossierMedical) -> list[str]: return warnings +def _assess_dossier_strength(dossier: DossierMedical) -> dict: + """Évalue la force probante du dossier pour la contre-argumentation. + + Un dossier « faible » manque d'éléments factuels (biologie, imagerie) + pour soutenir une argumentation solide. L'adversariale scorera + naturellement bas, ce qui est attendu — pas un défaut de la réponse LLM. + + Returns: + {"score": 0-10, "is_weak": bool, "missing": [...], "detail": str} + """ + score = 0 + missing: list[str] = [] + + bio_count = len(dossier.biologie_cle) + img_count = len(dossier.imagerie) + trt_count = len(dossier.traitements_sortie) + acte_count = len(dossier.actes_ccam) + + # Biologie : 0-4 points (donnée la plus probante en médecine) + if bio_count == 0: + missing.append("biologie (aucune valeur)") + elif bio_count <= 2: + score += 1 + missing.append("biologie (< 3 valeurs)") + else: + score += min(bio_count, 4) + + # Imagerie : 0-2 points + if img_count == 0: + missing.append("imagerie") + else: + score += min(img_count, 2) + + # Traitements : 0-2 points + if trt_count > 0: + score += min(trt_count // 3, 2) + + # Actes : 0-2 points + if acte_count > 0: + score += min(acte_count, 2) + + is_weak = score < 3 + + if is_weak: + detail = ( + f"Dossier à éléments probants limités (score {score}/10) : " + + ", ".join(missing) if missing else f"score {score}/10" + ) + else: + detail = f"Dossier suffisamment étayé (score {score}/10)" + + return { + "score": score, + "is_weak": is_weak, + "missing": missing, + "detail": detail, + } + + def _build_cpam_prompt( dossier: DossierMedical, controle: ControleCPAM, @@ -603,6 +662,23 @@ def _build_cpam_prompt( + "\n Prends en compte ces incohérences dans ton analyse." ) + # Évaluation force probante du dossier + strength = _assess_dossier_strength(dossier) + if strength["is_weak"]: + missing_str = ", ".join(strength["missing"]) if strength["missing"] else "éléments insuffisants" + tagged_str += ( + f"\n\nATTENTION — DOSSIER À PREUVES LIMITÉES ({missing_str}) :\n" + "Le dossier ne contient PAS assez d'éléments factuels pour construire " + "une argumentation forte. Tu DOIS :\n" + " 1. Reconnaître EXPLICITEMENT les données manquantes dans ton analyse\n" + " 2. Ne JAMAIS présenter comme fait un élément absent du dossier\n" + " 3. Privilégier les arguments contextuels (mode d'entrée, durée séjour, " + "actes réalisés, antécédents) plutôt que biologiques\n" + " 4. Conclure avec nuance : « en l'absence de [données manquantes], " + "l'argumentation repose sur… »\n" + " 5. NE PAS forcer un argument que les données ne soutiennent pas" + ) + # Sources RAG sources_text = "" for i, src in enumerate(sources, 1): diff --git a/src/control/cpam_response.py b/src/control/cpam_response.py index e50afaa..abef91a 100644 --- a/src/control/cpam_response.py +++ b/src/control/cpam_response.py @@ -17,6 +17,7 @@ from ..prompts import CPAM_EXTRACTION # --- Imports depuis les sous-modules --- from .cpam_rag import _search_rag_for_control from .cpam_context import ( + _assess_dossier_strength, _build_cpam_prompt, _build_tagged_context, ) @@ -37,6 +38,7 @@ from .cpam_context import ( # noqa: F401 _get_code_label, _get_cim10_definitions, _BIO_INTERPRETATION, + _assess_dossier_strength, _build_bio_summary, _check_das_bio_coherence, ) @@ -229,9 +231,16 @@ def generate_cpam_response( all_warnings = ref_warnings + grounding_warnings + code_warnings + adversarial_warnings - # 8c. Classification qualité (A/B/C) + # 8c. Évaluation force probante du dossier + strength = _assess_dossier_strength(dossier) + if strength["is_weak"]: + logger.info(" Dossier à preuves limitées (score %d/10) : %s", + strength["score"], ", ".join(strength["missing"])) + + # 8d. Classification qualité (A/B/C) — seuils relaxés si dossier faible tier, needs_review, cat_warnings = _assess_quality_tier( result, ref_warnings, grounding_warnings, code_warnings, validation, + is_weak_dossier=strength["is_weak"], ) controle.quality_tier = tier controle.requires_review = needs_review diff --git a/src/control/cpam_validation.py b/src/control/cpam_validation.py index f04c26c..7de4d6a 100644 --- a/src/control/cpam_validation.py +++ b/src/control/cpam_validation.py @@ -439,17 +439,24 @@ def _assess_quality_tier( grounding_warnings: list[str], code_warnings: list[str], adversarial_result: dict | None, + is_weak_dossier: bool = False, ) -> tuple[str, bool, list[str]]: """Évalue le tier qualité (A/B/C) et le flag requires_review. Classification : - Tier C (requires_review=True) : score adversarial < 4 OU code_warnings > 0 OU grounding_warnings > 2 + (si dossier faible : seuil adversarial abaissé à < 2) - Tier B : score adversarial 4-6 OU ref_warnings > 0 OU grounding_warnings 1-2 + (si dossier faible : score 2-3 accepté en B) - Tier A : score adversarial >= 7, 0 warning critique, <= 1 warning mineur + Args: + is_weak_dossier: Si True, relaxe les seuils adversariaux car un score bas + est attendu quand le dossier manque d'éléments probants. + Returns: (tier, requires_review, categorized_warnings) """ @@ -458,14 +465,24 @@ def _assess_quality_tier( has_critical = False minor_count = 0 + # Seuil adversarial adapté à la force du dossier + score_critical_threshold = 2 if is_weak_dossier else 4 + # --- Warnings critiques --- for w in code_warnings: categorized.append(f"[CRITIQUE] {w}") has_critical = True - if score != -1 and score <= 3: + if score != -1 and score < score_critical_threshold: categorized.append(f"[CRITIQUE] Score adversarial très bas : {score}/10") has_critical = True + elif score != -1 and score <= 3 and is_weak_dossier: + # Score 2-3 sur dossier faible → warning mineur (pas critique) + categorized.append( + f"[MINEUR] Score adversarial bas ({score}/10) — " + f"attendu pour un dossier à preuves limitées" + ) + minor_count += 1 if len(grounding_warnings) > 2: for w in grounding_warnings: @@ -492,7 +509,7 @@ def _assess_quality_tier( minor_count += 1 # --- Classification --- - if has_critical or (score != -1 and score < 4): + if has_critical or (score != -1 and score < score_critical_threshold): tier = "C" requires_review = True elif minor_count > 0 or (score != -1 and 4 <= score <= 6): diff --git a/tests/test_cpam_response.py b/tests/test_cpam_response.py index f03253f..c6aa5cc 100644 --- a/tests/test_cpam_response.py +++ b/tests/test_cpam_response.py @@ -18,6 +18,7 @@ from src.config import ( Traitement, ) from src.control.cpam_response import ( + _assess_dossier_strength, _build_bio_summary, _build_correction_prompt, _build_cpam_prompt, @@ -2319,3 +2320,163 @@ class TestSanitizeUnauthorizedCodes: # Puis valide → 0 warning warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) == 0 + + +class TestAssessDossierStrength: + """Tests pour l'évaluation de la force probante du dossier.""" + + def test_empty_dossier_is_weak(self): + """Dossier vide → score 0, is_weak=True.""" + dossier = DossierMedical(source_file="test.pdf") + result = _assess_dossier_strength(dossier) + assert result["score"] == 0 + assert result["is_weak"] is True + assert len(result["missing"]) > 0 + + def test_rich_dossier_not_weak(self): + """Dossier complet → is_weak=False, score >= 3.""" + dossier = _make_dossier_complet() + result = _assess_dossier_strength(dossier) + assert result["is_weak"] is False + assert result["score"] >= 3 + + def test_dp_only_dossier_is_weak(self): + """Dossier avec DP seulement (pas de bio/img/trt/actes) → faible.""" + dossier = DossierMedical( + source_file="test.pdf", + diagnostic_principal=Diagnostic(texte="DP test", cim10_suggestion="K81.0"), + ) + result = _assess_dossier_strength(dossier) + assert result["is_weak"] is True + assert result["score"] == 0 + + def test_bio_only_few_values(self): + """Dossier avec 1-2 bio → score faible mais contribue.""" + dossier = DossierMedical( + source_file="test.pdf", + biologie_cle=[ + BiologieCle(test="CRP", valeur="180 mg/L"), + ], + ) + result = _assess_dossier_strength(dossier) + assert result["score"] == 1 # 1 bio = 1 point + assert result["is_weak"] is True + + def test_bio_many_values(self): + """Dossier avec 4+ bio → max 4 points pour la bio.""" + dossier = DossierMedical( + source_file="test.pdf", + biologie_cle=[ + BiologieCle(test="CRP", valeur="180"), + BiologieCle(test="Créatinine", valeur="120"), + BiologieCle(test="Hémoglobine", valeur="12"), + BiologieCle(test="Plaquettes", valeur="200"), + BiologieCle(test="Leucocytes", valeur="10"), + ], + ) + result = _assess_dossier_strength(dossier) + assert result["score"] >= 4 # bio capped at 4 + + def test_missing_categories_reported(self): + """Les catégories manquantes sont listées.""" + dossier = DossierMedical(source_file="test.pdf") + result = _assess_dossier_strength(dossier) + assert "biologie" in " ".join(result["missing"]).lower() + assert "imagerie" in " ".join(result["missing"]).lower() + + def test_actes_contribute(self): + """Les actes CCAM contribuent au score (max 2).""" + dossier = DossierMedical( + source_file="test.pdf", + actes_ccam=[ + ActeCCAM(texte="Cholécystectomie", code_ccam_suggestion="HMFC004"), + ActeCCAM(texte="Drainage biliaire", code_ccam_suggestion="HHFA001"), + ActeCCAM(texte="Exploration"), + ], + ) + result = _assess_dossier_strength(dossier) + assert result["score"] == 2 # actes capped at 2 + + +class TestQualityTierWeakDossier: + """Tests pour les seuils de qualité relaxés sur dossier faible.""" + + def test_score_3_normal_dossier_is_c(self): + """Score adversarial 3 sur dossier normal → tier C (critique).""" + tier, review, warnings = _assess_quality_tier( + parsed={}, + ref_warnings=[], + grounding_warnings=[], + code_warnings=[], + adversarial_result={"coherent": False, "erreurs": ["Bio faible"], "score_confiance": 3}, + is_weak_dossier=False, + ) + assert tier == "C" + assert review is True + assert any("[CRITIQUE]" in w for w in warnings) + + def test_score_3_weak_dossier_is_b(self): + """Score adversarial 3 sur dossier faible → tier B (mineur attendu).""" + tier, review, warnings = _assess_quality_tier( + parsed={}, + ref_warnings=[], + grounding_warnings=[], + code_warnings=[], + adversarial_result={"coherent": False, "erreurs": ["Bio faible"], "score_confiance": 3}, + is_weak_dossier=True, + ) + assert tier == "B" + assert review is False + assert any("attendu" in w.lower() for w in warnings) + + def test_score_2_weak_dossier_is_b(self): + """Score adversarial 2 sur dossier faible → tier B.""" + tier, review, warnings = _assess_quality_tier( + parsed={}, + ref_warnings=[], + grounding_warnings=[], + code_warnings=[], + adversarial_result={"coherent": False, "erreurs": ["Données insuffisantes"], "score_confiance": 2}, + is_weak_dossier=True, + ) + assert tier == "B" + assert review is False + + def test_score_1_weak_dossier_is_c(self): + """Score adversarial 1 sur dossier faible → tier C (même relaxé).""" + tier, review, warnings = _assess_quality_tier( + parsed={}, + ref_warnings=[], + grounding_warnings=[], + code_warnings=[], + adversarial_result={"coherent": False, "erreurs": ["Incohérent"], "score_confiance": 1}, + is_weak_dossier=True, + ) + assert tier == "C" + assert review is True + + def test_code_warnings_override_weak(self): + """Code hors périmètre → tier C même si dossier faible (critique non relaxable).""" + tier, review, warnings = _assess_quality_tier( + parsed={}, + ref_warnings=[], + grounding_warnings=[], + code_warnings=["Code Z45.8 hors périmètre"], + adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 5}, + is_weak_dossier=True, + ) + assert tier == "C" + assert review is True + + def test_score_7_weak_dossier_is_a(self): + """Score adversarial 7 sur dossier faible → tier A (pas de warnings).""" + tier, review, warnings = _assess_quality_tier( + parsed={}, + ref_warnings=[], + grounding_warnings=[], + code_warnings=[], + adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 7}, + is_weak_dossier=True, + ) + assert tier == "A" + assert review is False