"""Tests pour la génération de contre-argumentation CPAM.""" from unittest.mock import patch, call import pytest from src.config import ( ActeCCAM, Antecedent, BiologieCle, Complication, ControleCPAM, Diagnostic, DossierMedical, Imagerie, RAGSource, Sejour, Traitement, ) from src.control.cpam_response import ( _build_bio_summary, _build_correction_prompt, _build_cpam_prompt, _build_tagged_context, _check_das_bio_coherence, _extraction_pass, _format_response, _fuzzy_match_ref, _get_cim10_definitions, _get_code_label, _sanitize_unauthorized_codes, _search_rag_for_control, _validate_adversarial, _validate_codes_in_response, _validate_grounding, _validate_references, _assess_quality_tier, generate_cpam_response, ) def _make_dossier() -> DossierMedical: """Crée un dossier médical de test.""" return DossierMedical( source_file="test.pdf", document_type="crh", sejour=Sejour(sexe="M", age=65, duree_sejour=5), diagnostic_principal=Diagnostic( texte="Cholécystite aiguë", cim10_suggestion="K81.0", ), diagnostics_associes=[ Diagnostic(texte="Iléus réflexe", cim10_suggestion="K56.0"), ], ) def _make_dossier_complet() -> DossierMedical: """Crée un dossier médical enrichi avec traitements, imagerie, antécédents.""" return DossierMedical( source_file="test.pdf", document_type="crh", sejour=Sejour(sexe="M", age=65, duree_sejour=5, imc=31.2), diagnostic_principal=Diagnostic( texte="Cholécystite aiguë", cim10_suggestion="K81.0", ), diagnostics_associes=[ Diagnostic(texte="Iléus réflexe", cim10_suggestion="K56.0"), ], actes_ccam=[ ActeCCAM(texte="Cholécystectomie", code_ccam_suggestion="HMFC004"), ], biologie_cle=[ BiologieCle(test="CRP", valeur="180 mg/L", anomalie=True), BiologieCle(test="Créatinine", valeur="450 µmol/L", anomalie=True), ], imagerie=[ Imagerie(type="Scanner abdominal", conclusion="Lithiase cholédocienne confirmée"), ], traitements_sortie=[ Traitement(medicament="Augmentin IV", posologie="3g/j"), Traitement(medicament="Morphine SC"), ], antecedents=["HTA", "Diabète type 2"], ) def _make_controle() -> ControleCPAM: """Crée un contrôle CPAM de test.""" return ControleCPAM( numero_ogc=17, titre="Désaccord sur les DAS", arg_ucr="L'UCR confirme l'avis des médecins contrôleurs au motif que le DAS K56.0 n'est pas justifié.", decision_ucr="UCR confirme avis médecins contrôleurs", dp_ucr=None, da_ucr="K56.0", ) class TestBuildPrompt: def test_prompt_contains_dossier_info(self): dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "Cholécystite aiguë" in prompt assert "K81.0" in prompt assert "Iléus réflexe" in prompt assert "65 ans" in prompt def test_prompt_contains_cpam_argument(self): dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert controle.arg_ucr in prompt assert controle.decision_ucr in prompt def test_prompt_contains_codes_contestes(self): dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "DA proposés par UCR : K56.0" in prompt def test_prompt_contains_rag_sources(self): dossier = _make_dossier() controle = _make_controle() sources = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte du guide..."}, {"document": "cim10", "code": "K56.0", "extrait": "Iléus paralytique..."}, ] prompt, _ = _build_cpam_prompt(dossier, controle, sources) assert "Guide Méthodologique MCO 2026" in prompt assert "CIM-10 FR 2026" in prompt assert "page 64" in prompt def test_prompt_contains_three_axes(self): dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "AXE MÉDICAL" in prompt assert "AXE ASYMÉTRIE D'INFORMATION" in prompt assert "AXE RÉGLEMENTAIRE" in prompt def test_prompt_contains_traitements_imagerie_when_present(self): dossier = _make_dossier_complet() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "Augmentin IV 3g/j" in prompt assert "Morphine SC" in prompt assert "Scanner abdominal" in prompt assert "Lithiase cholédocienne confirmée" in prompt assert "HTA" in prompt assert "Diabète type 2" in prompt assert "IMC : 31.2" in prompt def test_prompt_asymetrie_section_when_data_present(self): dossier = _make_dossier_complet() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "ÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM" in prompt assert "CRP: 180 mg/L (anormale)" in prompt assert "Cholécystectomie (HMFC004)" in prompt def test_prompt_no_asymetrie_section_when_no_data(self): dossier = DossierMedical( source_file="test.pdf", document_type="crh", sejour=Sejour(), diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"), ) controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "ÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM" not in prompt def test_prompt_json_format_new_fields(self): dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "contre_arguments_medicaux" in prompt assert "contre_arguments_asymetrie" in prompt assert "contre_arguments_reglementaires" in prompt def test_prompt_contains_cite_exacts(self): """Le prompt renforcé demande des preuves exactes.""" dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "CITE" in prompt assert "EXACTS" in prompt def test_prompt_contains_interdiction(self): """Le prompt interdit les références inventées.""" dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "INTERDICTION ABSOLUE" in prompt def test_prompt_contains_preuves_dossier_field(self): """Le format JSON demandé inclut preuves_dossier.""" dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "preuves_dossier" in prompt @patch("src.control.cpam_context.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale")) @patch("src.control.cpam_context.normalize_code", return_value="K56.0") def test_prompt_codes_with_cim10_labels(self, mock_norm, mock_valid): """Les codes contestés affichent le libellé CIM-10.""" dossier = _make_dossier() controle = _make_controle() # da_ucr="K56.0" prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "Iléus paralytique" in prompt assert "DA proposés par UCR" in prompt @patch("src.control.cpam_context.validate_code", return_value=(False, "")) @patch("src.control.cpam_context.normalize_code", return_value="Z99.9") def test_prompt_codes_invalid_graceful(self, mock_norm, mock_valid): """Les codes invalides ne crashent pas, juste pas de libellé.""" dossier = _make_dossier() controle = ControleCPAM( numero_ogc=1, titre="Test", arg_ucr="Test", decision_ucr="Rejet", dp_ucr="Z99.9", da_ucr=None, ) prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "Z99.9" in prompt # Pas de crash @patch("src.control.cpam_context.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable")) @patch("src.control.cpam_context.normalize_code", return_value="Z45.8") def test_prompt_dp_fallback_from_ucr(self, mock_norm, mock_valid): """DP absent + dp_ucr → contexte injecté dans le prompt.""" dossier = DossierMedical( source_file="test.pdf", document_type="crh", sejour=Sejour(), diagnostic_principal=None, ) controle = ControleCPAM( numero_ogc=1, titre="Désaccord DP", arg_ucr="Test", decision_ucr="Rejet", dp_ucr="Z45.8", da_ucr=None, ) prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "codé par l'établissement" in prompt assert "contesté par la CPAM" in prompt assert "Z45.8" in prompt class TestFormatResponse: def test_full_response_new_format(self): parsed = { "analyse_contestation": "La CPAM conteste le DAS K56.0", "points_accord": "Aucun", "contre_arguments_medicaux": "Le guide méthodologique précise...", "contre_arguments_asymetrie": "La biologie montre une CRP à 180...", "contre_arguments_reglementaires": "L'UCR interprète restrictivement...", "references": "Guide métho p.64, CIM-10 K56.0", "conclusion": "Le DAS est justifié", } text = _format_response(parsed) assert "ANALYSE DE LA CONTESTATION" in text assert "CONTRE-ARGUMENTS MÉDICAUX" in text assert "ASYMÉTRIE D'INFORMATION" in text assert "CONTRE-ARGUMENTS RÉGLEMENTAIRES" in text assert "REFERENCES" in text assert "CONCLUSION" in text # "Aucun" ne doit pas générer la section points d'accord assert "POINTS D'ACCORD" not in text # L'ancien champ ne doit pas apparaître assert "CONTRE-ARGUMENTS\n" not in text def test_fallback_old_format(self): """L'ancien champ contre_arguments est toujours géré (réponses en cache).""" parsed = { "analyse_contestation": "Analyse...", "contre_arguments": "Arguments anciens...", "conclusion": "Conclusion...", } text = _format_response(parsed) assert "CONTRE-ARGUMENTS\nArguments anciens..." in text assert "CONCLUSION" in text def test_new_fields_override_fallback(self): """Si les nouveaux champs existent, l'ancien contre_arguments est ignoré.""" parsed = { "contre_arguments_medicaux": "Médicaux...", "contre_arguments": "Ancien fallback...", "conclusion": "Conclusion...", } text = _format_response(parsed) assert "CONTRE-ARGUMENTS MÉDICAUX" in text assert "Ancien fallback" not in text def test_partial_response(self): parsed = { "contre_arguments_medicaux": "Arguments médicaux...", "conclusion": "Conclusion...", } text = _format_response(parsed) assert "CONTRE-ARGUMENTS MÉDICAUX" in text assert "CONCLUSION" in text def test_empty_response(self): text = _format_response({}) assert text == "" def test_preuves_dossier_formatting(self): """Le nouveau champ preuves_dossier est formaté correctement.""" parsed = { "contre_arguments_medicaux": "Arguments...", "preuves_dossier": [ {"element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation sévère"}, {"element": "imagerie", "valeur": "lithiase cholédocienne", "signification": "confirme le diagnostic"}, ], "conclusion": "Conclusion...", } text = _format_response(parsed) assert "PREUVES DU DOSSIER" in text assert "CRP 180 mg/L" in text assert "[biologie]" in text assert "[imagerie]" in text def test_structured_references_formatting(self): """Les références structurées sont formatées correctement.""" parsed = { "contre_arguments_medicaux": "Arguments...", "references": [ {"document": "Guide Méthodologique MCO 2026", "page": "64", "citation": "Le DAS doit être..."}, ], "conclusion": "Conclusion...", } text = _format_response(parsed) assert "REFERENCES" in text assert "Guide Méthodologique MCO 2026" in text assert "p.64" in text assert "Le DAS doit être..." in text def test_ref_warnings_appended(self): """Les avertissements de références non vérifiées apparaissent.""" parsed = {"conclusion": "Conclusion..."} warnings = ["Référence non vérifiable : Manuel Imaginaire 2025"] text = _format_response(parsed, ref_warnings=warnings) assert "AVERTISSEMENT" in text assert "Manuel Imaginaire 2025" in text class TestValidateReferences: def test_valid_reference_no_warning(self): parsed = { "references": [ {"document": "Guide Méthodologique MCO 2026", "page": "64", "citation": "..."}, ] } sources = [{"document": "guide_methodo", "page": 64, "extrait": "..."}] warnings = _validate_references(parsed, sources) assert len(warnings) == 0 def test_invented_reference_detected(self): parsed = { "references": [ {"document": "Manuel Inventé 2025", "page": "12", "citation": "..."}, ] } sources = [{"document": "guide_methodo", "page": 64, "extrait": "..."}] warnings = _validate_references(parsed, sources) assert len(warnings) == 1 assert "Manuel Inventé" in warnings[0] def test_old_format_string_no_crash(self): """L'ancien format string pour references ne cause pas de crash.""" parsed = {"references": "Guide méthodo p.64"} sources = [{"document": "guide_methodo"}] warnings = _validate_references(parsed, sources) assert len(warnings) == 0 # pas de validation sur l'ancien format def test_no_sources_no_validation(self): parsed = { "references": [ {"document": "Quelque chose", "page": "1", "citation": "..."}, ] } warnings = _validate_references(parsed, []) assert len(warnings) == 0 class TestGenerateResponse: @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama): """Ollama disponible → 3 passes (extraction + argumentation + validation).""" mock_rag.return_value = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte guide"}, ] call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction...", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse...", "contre_arguments_medicaux": "Contre-arguments médicaux...", "contre_arguments_asymetrie": "Asymétrie...", "conclusion": "Conclusion...", } else: return {"coherent": True, "erreurs": [], "score_confiance": 9} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) assert "Contre-arguments médicaux..." in text assert response_data is not None assert response_data["analyse_contestation"] == "Analyse..." assert len(sources) == 1 assert sources[0].document == "guide_methodo" # 3 appels Ollama : extraction + argumentation + validation assert call_count["n"] == 3 mock_anthropic.assert_not_called() @patch("src.control.cpam_validation.call_anthropic") @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_val_anthropic): """Ollama indisponible → fallback Haiku pour les 3 passes.""" mock_rag.return_value = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte guide"}, ] mock_ollama.return_value = None mock_val_ollama.return_value = None call_count = {"n": 0} def anthropic_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction Haiku...", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse Haiku...", "contre_arguments_medicaux": "Contre-args Haiku...", "conclusion": "Conclusion Haiku...", } else: return {"coherent": True, "erreurs": [], "score_confiance": 8} mock_anthropic.side_effect = anthropic_side_effect mock_val_anthropic.side_effect = anthropic_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) assert "Contre-args Haiku..." in text assert response_data is not None # 3 appels Ollama (retourne None) + 3 Anthropic en fallback assert call_count["n"] == 3 @patch("src.control.cpam_validation.call_anthropic", return_value=None) @patch("src.control.cpam_validation.call_ollama", return_value=None) @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama, _mock_val_ollama, _mock_val_anthropic): """Tous LLMs indisponibles → texte vide, response_data None.""" mock_rag.return_value = [] mock_anthropic.return_value = None mock_ollama.return_value = None dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) assert text == "" assert response_data is None assert sources == [] class TestSearchRagForControl: """Tests pour la logique de recherche RAG multi-requêtes.""" @patch("src.medical.rag_search.search_similar_cpam") def test_multiple_queries_with_da_ucr(self, mock_search): """Avec da_ucr, on doit avoir au moins 2 requêtes (codes + argument).""" mock_search.return_value = [ {"document": "guide_methodo", "page": 10, "code": None, "score": 0.6, "extrait": "Texte guide"}, ] dossier = _make_dossier() controle = _make_controle() # da_ucr="K56.0" results = _search_rag_for_control(controle, dossier) # Au moins 2 appels : codes contestés + argument CPAM assert mock_search.call_count >= 2 @patch("src.medical.rag_search.search_similar_cpam") def test_query_codes_contains_cma(self, mock_search): """La requête codes contestés doit contenir 'CMA' pour un DA.""" mock_search.return_value = [] dossier = _make_dossier() controle = _make_controle() # da_ucr="K56.0" _search_rag_for_control(controle, dossier) # Premier appel = requête codes first_call_query = mock_search.call_args_list[0][0][0] assert "K56.0" in first_call_query assert "CMA" in first_call_query @patch("src.medical.rag_search.search_similar_cpam") def test_query_argument_contains_titre(self, mock_search): """La requête argument doit contenir le titre du contrôle.""" mock_search.return_value = [] dossier = _make_dossier() controle = _make_controle() _search_rag_for_control(controle, dossier) # Deuxième appel = requête argument second_call_query = mock_search.call_args_list[1][0][0] assert controle.titre in second_call_query @patch("src.medical.rag_search.search_similar_cpam") def test_deduplication_by_document_code_page(self, mock_search): """Les résultats dupliqués (même document/code/page) sont fusionnés.""" # Les deux requêtes retournent le même résultat shared_result = { "document": "guide_methodo", "page": 64, "code": None, "score": 0.55, "extrait": "Texte partagé", } mock_search.return_value = [shared_result.copy()] dossier = _make_dossier() controle = _make_controle() results = _search_rag_for_control(controle, dossier) # Le résultat ne doit apparaître qu'une seule fois malgré les requêtes multiples guide_results = [r for r in results if r["document"] == "guide_methodo" and r.get("page") == 64] assert len(guide_results) == 1 @patch("src.medical.rag_search.search_similar_cpam") def test_dedup_keeps_best_score(self, mock_search): """La déduplication garde le meilleur score.""" def side_effect(query, top_k=8): if "CMA" in query: return [{"document": "cim10", "code": "K56.0", "page": None, "score": 0.5, "extrait": "Iléus"}] else: return [{"document": "cim10", "code": "K56.0", "page": None, "score": 0.7, "extrait": "Iléus"}] mock_search.side_effect = side_effect dossier = _make_dossier() controle = _make_controle() results = _search_rag_for_control(controle, dossier) k56_results = [r for r in results if r.get("code") == "K56.0"] assert len(k56_results) == 1 assert k56_results[0]["score"] == 0.7 @patch("src.medical.rag_search.search_similar_cpam") def test_no_codes_contestes_only_argument_query(self, mock_search): """Sans codes contestés, seule la requête argument est lancée.""" mock_search.return_value = [] dossier = _make_dossier() controle = ControleCPAM( numero_ogc=1, titre="Désaccord sur la durée", arg_ucr="Séjour trop long selon l'UCR.", decision_ucr="Rejet", dp_ucr=None, da_ucr=None, ) _search_rag_for_control(controle, dossier) # Un seul appel : requête argument (pas de codes contestés) assert mock_search.call_count == 1 @patch("src.medical.rag_search.search_similar_cpam") def test_dp_ucr_query_contains_diagnostic_principal(self, mock_search): """Avec dp_ucr, la requête codes mentionne 'diagnostic principal'.""" mock_search.return_value = [] dossier = _make_dossier() controle = ControleCPAM( numero_ogc=2, titre="Désaccord sur le DP", arg_ucr="Le DP devrait être K80.1", decision_ucr="Rejet", dp_ucr="K81.0", da_ucr=None, ) _search_rag_for_control(controle, dossier) first_call_query = mock_search.call_args_list[0][0][0] assert "K81.0" in first_call_query assert "diagnostic principal" in first_call_query @patch("src.medical.rag_search.search_similar_cpam") def test_max_12_results(self, mock_search): """Le résultat final est limité à 12 entrées.""" mock_search.return_value = [ {"document": "guide_methodo", "page": i, "code": None, "score": 0.9 - i * 0.01, "extrait": f"Texte {i}"} for i in range(8) ] dossier = _make_dossier() controle = _make_controle() results = _search_rag_for_control(controle, dossier) assert len(results) <= 12 @patch("src.medical.rag_search.search_similar_cpam") def test_arg_ucr_not_truncated_200(self, mock_search): """La requête RAG argument utilise jusqu'à 500 chars, pas 200.""" mock_search.return_value = [] dossier = _make_dossier() long_arg = "A" * 400 controle = ControleCPAM( numero_ogc=1, titre="Test", arg_ucr=long_arg, decision_ucr="Rejet", dp_ucr=None, da_ucr=None, ) _search_rag_for_control(controle, dossier) # La requête argument doit contenir les 400 chars (pas tronquée à 200) arg_call_query = mock_search.call_args_list[0][0][0] assert len(arg_call_query) > 200 @patch("src.control.cpam_rag.validate_code", return_value=(True, "Iléus paralytique")) @patch("src.control.cpam_rag.normalize_code", return_value="K56.0") @patch("src.medical.rag_search.search_similar_cpam") def test_query_cim10_definitions(self, mock_search, mock_norm, mock_valid): """Requête 4 exécutée quand codes contestés présents.""" mock_search.return_value = [] dossier = _make_dossier() controle = _make_controle() # da_ucr="K56.0" _search_rag_for_control(controle, dossier) # Chercher la requête contenant "CIM-10" et "définition" cim10_queries = [ c[0][0] for c in mock_search.call_args_list if "CIM-10" in c[0][0] and "définition" in c[0][0] ] assert len(cim10_queries) >= 1 assert "K56.0" in cim10_queries[0] @patch("src.medical.rag_search.search_similar_cpam") def test_query_rule_extraction(self, mock_search): """Requête 5 exécutée quand arg_ucr contient une règle nommée.""" mock_search.return_value = [] dossier = _make_dossier() controle = ControleCPAM( numero_ogc=1, titre="Désaccord DAS", arg_ucr="Selon la RègleT7 et l'Annexe-4B, le DAS n'est pas justifié.", decision_ucr="Rejet", dp_ucr=None, da_ucr=None, ) _search_rag_for_control(controle, dossier) # Chercher la requête contenant les règles extraites rule_queries = [ c[0][0] for c in mock_search.call_args_list if "guide méthodologique" in c[0][0] ] assert len(rule_queries) >= 1 assert "RègleT7" in rule_queries[0] or "Annexe" in rule_queries[0] @patch("src.medical.rag_search.search_similar_cpam") def test_clinical_query_when_das_match(self, mock_search): """Requête clinique lancée quand da_ucr matche un DAS du dossier.""" mock_search.return_value = [] dossier = _make_dossier() # DAS K56.0 "Iléus réflexe" controle = _make_controle() # da_ucr="K56.0" _search_rag_for_control(controle, dossier) # Au moins 4 appels : codes + argument + clinique + CIM-10 définitions assert mock_search.call_count >= 4 # La requête clinique contient DP + DAS textes clinique_queries = [ c[0][0] for c in mock_search.call_args_list if "Iléus réflexe" in c[0][0] and "Cholécystite aiguë" in c[0][0] ] assert len(clinique_queries) >= 1 class TestGetCim10Definitions: """Tests pour l'injection déterministe des définitions CIM-10.""" @patch("src.control.cpam_context.validate_code") @patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper()) def test_definitions_injected_in_prompt(self, mock_norm, mock_valid): """La section DÉFINITIONS CIM-10 apparaît dans le prompt avec les libellés.""" mock_valid.side_effect = lambda c: { "K81.0": (True, "Cholécystite aiguë"), "K56.0": (True, "Iléus paralytique et obstruction intestinale"), }.get(c, (False, "")) dossier = _make_dossier() # DP=K81.0, DAS=K56.0 controle = _make_controle() # da_ucr="K56.0" prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "DÉFINITIONS CIM-10" in prompt assert "dictionnaire officiel" in prompt assert "Cholécystite aiguë" in prompt assert "Iléus paralytique" in prompt assert "DP établissement" in prompt @patch("src.control.cpam_context.validate_code") @patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper()) def test_definitions_include_dp_and_ucr_codes(self, mock_norm, mock_valid): """Les codes du dossier ET de l'UCR sont tous inclus.""" mock_valid.side_effect = lambda c: { "K81.0": (True, "Cholécystite aiguë"), "K56.0": (True, "Iléus paralytique"), "Z45.8": (True, "Ajustement d'un dispositif implantable"), }.get(c, (False, "")) dossier = _make_dossier() # DP=K81.0, DAS=K56.0 controle = ControleCPAM( numero_ogc=1, titre="Test", arg_ucr="Test", decision_ucr="Rejet", dp_ucr="Z45.8", da_ucr="K56.0", ) result = _get_cim10_definitions(dossier, controle) # Codes dossier assert "K81.0" in result assert "DP établissement" in result assert "K56.0" in result # Codes UCR assert "Z45.8" in result assert "DP proposé UCR" in result assert "DA proposé UCR" in result or "DAS établissement" in result @patch("src.control.cpam_context.validate_code", return_value=(False, "")) @patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper()) def test_definitions_graceful_when_code_unknown(self, mock_norm, mock_valid): """Un code inconnu ne crashe pas, affiche un message explicite.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=None, ) controle = ControleCPAM( numero_ogc=1, titre="Test", arg_ucr="Test", decision_ucr="Rejet", dp_ucr="Z99.9", da_ucr=None, ) result = _get_cim10_definitions(dossier, controle) assert "Z99.9" in result assert "non trouvé" in result def test_definitions_empty_when_no_codes(self): """Aucun code → chaîne vide.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=None, ) controle = ControleCPAM( numero_ogc=1, titre="Test", arg_ucr="Test", decision_ucr="Rejet", dp_ucr=None, da_ucr=None, ) result = _get_cim10_definitions(dossier, controle) assert result == "" class TestBuildTaggedContext: """Tests pour le contexte clinique tagué (grounding).""" def test_tagged_context_bio_img_trt(self): """Les tags BIO, IMG, TRT, ACTE sont correctement générés.""" dossier = _make_dossier_complet() text, tag_map = _build_tagged_context(dossier) assert "[BIO-1]" in text assert "CRP" in text assert "BIO-1" in tag_map assert "[IMG-1]" in text assert "Scanner abdominal" in text assert "IMG-1" in tag_map assert "[TRT-1]" in text assert "Augmentin IV" in text assert "TRT-1" in tag_map assert "[ACTE-1]" in text assert "Cholécystectomie" in text assert "ACTE-1" in tag_map def test_tagged_context_bio_norms_annotated(self): """Les valeurs bio sont annotées avec les normes de référence.""" dossier = DossierMedical( source_file="test.pdf", biologie_cle=[ BiologieCle(test="CRP", valeur="5", anomalie=False), BiologieCle(test="CRP", valeur="180", anomalie=True), BiologieCle(test="Hémoglobine", valeur="8.5", anomalie=True), ], ) text, tag_map = _build_tagged_context(dossier) # CRP 5 = normal (norme 0-5) assert "NORMAL" in tag_map.get("BIO-1", "") # CRP 180 = élevé assert "ÉLEVÉ" in tag_map.get("BIO-2", "") # Hb 8.5 = bas (norme 12-17) assert "BAS" in tag_map.get("BIO-3", "") def test_tagged_context_empty_dossier(self): """Dossier sans aucune donnée clinique → texte vide, tag_map vide.""" dossier = DossierMedical(source_file="test.pdf") text, tag_map = _build_tagged_context(dossier) assert text == "" assert tag_map == {} def test_tagged_context_dp_only_dossier(self): """Dossier avec DP mais sans bio/img/trt → tag [DP] généré.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"), ) text, tag_map = _build_tagged_context(dossier) assert "DP" in tag_map assert "[DP]" in text def test_tagged_context_in_prompt(self): """Le contexte tagué apparaît dans le prompt généré.""" dossier = _make_dossier_complet() controle = _make_controle() prompt, tag_map = _build_cpam_prompt(dossier, controle, []) assert "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS" in prompt assert "[BIO-1]" in prompt assert "[IMG-1]" in prompt assert len(tag_map) > 0 def test_poor_dossier_warning_in_prompt(self): """Dossier totalement vide → avertissement DOSSIER PAUVRE dans le prompt.""" dossier = DossierMedical( source_file="test.pdf", sejour=Sejour(sexe="M", age=70), ) controle = _make_controle() prompt, tag_map = _build_cpam_prompt(dossier, controle, []) assert "DOSSIER PAUVRE" in prompt assert "Ne spécule PAS" in prompt assert len(tag_map) == 0 def test_dp_only_dossier_not_poor(self): """Dossier avec DP mais sans bio/img → PAS de warning DOSSIER PAUVRE (DP génère un tag).""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"), sejour=Sejour(sexe="M", age=70), ) controle = _make_controle() prompt, tag_map = _build_cpam_prompt(dossier, controle, []) assert "DOSSIER PAUVRE" not in prompt assert "DP" in tag_map class TestValidateGrounding: """Tests pour la validation des preuves grounded.""" def test_grounding_valid_refs(self): """Toutes les refs existent → 0 warnings.""" tag_map = {"BIO-1": "CRP: 180 mg/L", "IMG-1": "Scanner abdominal"} response_data = { "preuves_dossier": [ {"ref": "BIO-1", "element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation"}, {"ref": "IMG-1", "element": "imagerie", "valeur": "Scanner", "signification": "confirme"}, ] } warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 0 def test_grounding_invented_ref(self): """Ref inventée [BIO-99] → warning détecté.""" tag_map = {"BIO-1": "CRP: 180 mg/L"} response_data = { "preuves_dossier": [ {"ref": "BIO-99", "element": "biologie", "valeur": "Albumine 15 g/L", "signification": "inventé"}, ] } warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 1 assert "BIO-99" in warnings[0] def test_grounding_no_tag_map_no_validation(self): """Pas de tag_map (dossier vide) → pas de validation.""" response_data = { "preuves_dossier": [ {"ref": "BIO-1", "element": "biologie", "valeur": "test", "signification": "test"}, ] } warnings = _validate_grounding(response_data, {}) assert len(warnings) == 0 def test_grounding_no_ref_field_ok(self): """Preuves sans champ ref (ancien format) → pas de warning.""" tag_map = {"BIO-1": "CRP: 180 mg/L"} response_data = { "preuves_dossier": [ {"element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation"}, ] } warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 0 def test_format_response_with_ref(self): """Le formatage inclut le tag ref dans les preuves.""" parsed = { "contre_arguments_medicaux": "Arguments...", "preuves_dossier": [ {"ref": "BIO-1", "element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation"}, ], "conclusion": "Conclusion...", } text = _format_response(parsed) assert "[BIO-1]" in text assert "[biologie]" in text assert "CRP 180 mg/L" in text class TestCheckDasBioCoherence: """Tests pour la vérification cohérence DAS / biologie.""" def test_leucocytose_with_low_leucocytes(self): """DAS 'leucocytose' mais leucocytes bas → incohérence détectée.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Leucocytose", cim10_suggestion="D72.8"), ], biologie_cle=[ BiologieCle(test="Leucocytes", valeur="3", anomalie=True), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) == 1 assert "Leucocytose" in warnings[0] assert "NORMAL" in warnings[0] def test_anemie_with_normal_hb(self): """DAS 'anémie' mais Hb normale → incohérence détectée.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Anémie ferriprive", cim10_suggestion="D50.9"), ], biologie_cle=[ BiologieCle(test="Hémoglobine", valeur="14.5", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) == 1 assert "anémie" in warnings[0].lower() or "Anémie" in warnings[0] def test_coherent_das_bio_no_warnings(self): """DAS 'anémie' avec Hb basse → pas d'incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Anémie", cim10_suggestion="D64.9"), ], biologie_cle=[ BiologieCle(test="Hémoglobine", valeur="8.5", anomalie=True), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) == 0 def test_no_bio_no_crash(self): """Pas de biologie → pas de crash, pas de warnings.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Leucocytose", cim10_suggestion="D72.8"), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) == 0 def test_coherence_warnings_in_prompt(self): """Les incohérences DAS/bio apparaissent dans le prompt.""" dossier = DossierMedical( source_file="test.pdf", sejour=Sejour(sexe="M", age=65), diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"), diagnostics_associes=[ Diagnostic(texte="Thrombocytose", cim10_suggestion="D75.9"), ], biologie_cle=[ BiologieCle(test="Plaquettes", valeur="200", anomalie=False), ], ) controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "ALERTES COHÉRENCE DAS / BIOLOGIE" in prompt assert "Thrombocytose" in prompt assert "NORMAL" in prompt class TestPatientContext: """Tests pour le contexte patient dans le prompt.""" def test_pediatric_flag(self): """Patient < 18 ans → mention pédiatrie dans le prompt.""" dossier = DossierMedical( source_file="test.pdf", sejour=Sejour(sexe="F", age=9), diagnostic_principal=Diagnostic(texte="Appendicite", cim10_suggestion="K35.8"), ) controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "PÉDIATRIE" in prompt assert "9 ans" in prompt def test_elderly_flag(self): """Patient >= 80 ans → mention patient âgé.""" dossier = DossierMedical( source_file="test.pdf", sejour=Sejour(sexe="M", age=85), diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"), ) controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "patient âgé" in prompt assert "85 ans" in prompt def test_emergency_admission(self): """Admission en urgence → flag dans le prompt.""" dossier = DossierMedical( source_file="test.pdf", sejour=Sejour(sexe="M", age=50, mode_entree="Autres admissions urgentes"), diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"), ) controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "ADMISSION EN URGENCE" in prompt def test_context_consigne_in_prompt(self): """Le prompt contient une consigne sur le contexte clinique.""" dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "CONTEXTE CLINIQUE" in prompt assert "ÂGE" in prompt assert "MODE D'ENTRÉE" in prompt class TestExtractionPass: """Tests pour la passe 1 — extraction structurée.""" @patch("src.control.cpam_response.call_ollama") def test_extraction_pass_returns_structured_json(self, mock_ollama): """Passe 1 retourne les champs attendus.""" mock_ollama.return_value = { "comprehension_contestation": "La CPAM conteste le DAS K56.0", "elements_cliniques_pertinents": [ {"tag": "BIO-1", "pertinence": "CRP élevée confirme inflammation"} ], "points_accord_potentiels": ["Le CRH est succinct"], "codes_en_jeu": { "dp_etablissement": "K81.0 — Cholécystite aiguë", "dp_ucr": "", "difference_cle": "contestation porte sur le DAS, pas le DP", }, } dossier = _make_dossier() controle = _make_controle() result = _extraction_pass(dossier, controle) assert result is not None assert "comprehension_contestation" in result assert len(result["elements_cliniques_pertinents"]) == 1 mock_ollama.assert_called_once() @patch("src.control.cpam_response.call_anthropic", return_value=None) @patch("src.control.cpam_response.call_ollama", return_value=None) def test_extraction_pass_failure_returns_none(self, mock_ollama, mock_anthropic): """Passe 1 échoue → retourne None (fallback single-pass).""" dossier = _make_dossier() controle = _make_controle() result = _extraction_pass(dossier, controle) assert result is None @patch("src.control.cpam_response.call_ollama") def test_extraction_injected_in_prompt(self, mock_ollama): """Le résultat de passe 1 est injecté dans le prompt de passe 2.""" extraction = { "comprehension_contestation": "La CPAM conteste le DAS K56.0", "elements_cliniques_pertinents": [ {"tag": "BIO-1", "pertinence": "CRP élevée"} ], "points_accord_potentiels": ["Le CRH est succinct"], "codes_en_jeu": { "difference_cle": "contestation porte sur le DAS", }, } dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, [], extraction) assert "PRÉ-ANALYSE" in prompt assert "La CPAM conteste le DAS K56.0" in prompt assert "CRP élevée" in prompt assert "contestation porte sur le DAS" in prompt def test_prompt_without_extraction(self): """Sans extraction, pas de section PRÉ-ANALYSE.""" dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, [], None) assert "PRÉ-ANALYSE" not in prompt @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_calls_three_passes(self, mock_rag, mock_ollama, mock_val_ollama): """L'orchestrateur appelle extraction + argumentation + validation.""" call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return { "comprehension_contestation": "Contestation DAS", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}, } elif call_count["n"] == 2: return { "analyse_contestation": "Analyse...", "contre_arguments_medicaux": "Arguments...", "conclusion": "Conclusion...", } else: return {"coherent": True, "erreurs": [], "score_confiance": 9} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect mock_rag.return_value = [] dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) # 3 appels Ollama : extraction + argumentation + validation assert call_count["n"] == 3 assert response_data is not None assert "Arguments..." in text class TestValidateAdversarial: """Tests pour la validation adversariale.""" @patch("src.control.cpam_validation.call_ollama") def test_coherent_response_no_warnings(self, mock_ollama): """Réponse cohérente → coherent=true, pas de warnings dans le texte.""" mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 9} tag_map = {"BIO-1": "CRP: 180 mg/L"} response_data = { "analyse_contestation": "Analyse...", "preuves_dossier": [{"ref": "BIO-1", "valeur": "CRP 180 mg/L"}], "conclusion": "Conclusion...", } controle = _make_controle() result = _validate_adversarial(response_data, tag_map, controle) assert result is not None assert result["coherent"] is True assert len(result["erreurs"]) == 0 @patch("src.control.cpam_validation.call_ollama") def test_hallucinated_bio_detected(self, mock_ollama): """Valeur bio halluccinée → coherent=false avec erreur.""" mock_ollama.return_value = { "coherent": False, "erreurs": ["CRP citée à 250 mg/L mais le dossier indique 180 mg/L"], "score_confiance": 3, } tag_map = {"BIO-1": "CRP: 180 mg/L"} response_data = { "preuves_dossier": [{"ref": "BIO-1", "valeur": "CRP 250 mg/L"}], "conclusion": "Conclusion...", } controle = _make_controle() result = _validate_adversarial(response_data, tag_map, controle) assert result is not None assert result["coherent"] is False assert len(result["erreurs"]) == 1 assert "CRP" in result["erreurs"][0] @patch("src.control.cpam_validation.call_anthropic", return_value=None) @patch("src.control.cpam_validation.call_ollama", return_value=None) def test_adversarial_failure_graceful(self, mock_ollama, mock_anthropic): """LLM indisponible → retourne None, pas de crash.""" tag_map = {"BIO-1": "CRP: 180 mg/L"} response_data = {"conclusion": "Conclusion..."} controle = _make_controle() result = _validate_adversarial(response_data, tag_map, controle) assert result is None @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response._search_rag_for_control") def test_adversarial_warnings_in_output(self, mock_rag, mock_ollama, mock_val_ollama): """Incohérences détectées → avertissements dans le texte formaté.""" call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse...", "contre_arguments_medicaux": "Arguments...", "conclusion": "Conclusion...", } else: return { "coherent": False, "erreurs": ["Antibiotiques mentionnés mais absents du dossier"], "score_confiance": 4, } mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect mock_rag.return_value = [] dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) assert "Antibiotiques mentionnés" in text assert "Score adversarial" in text def test_adversarial_empty_tag_map(self): """Dossier sans tags → validation fonctionne quand même.""" with patch("src.control.cpam_validation.call_ollama") as mock_ollama: mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 7} result = _validate_adversarial( {"conclusion": "Test"}, {}, _make_controle() ) assert result is not None assert result["coherent"] is True class TestValidateCodesInResponse: """Tests pour la validation codes fermée (périmètre dossier + UCR).""" def test_code_in_dossier_no_warning(self): """Code du dossier cité → pas de warning.""" parsed = {"conclusion": "Le code K81.0 est justifié par la cholécystite."} dossier = _make_dossier() # DP K81.0, DAS K56.0 controle = _make_controle() warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) == 0 def test_code_from_ucr_no_warning(self): """Code proposé par l'UCR cité → pas de warning.""" parsed = {"conclusion": "Le code K56.0 contesté par l'UCR est bien justifié."} dossier = _make_dossier() controle = _make_controle() # da_ucr="K56.0" warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) == 0 def test_invented_code_detected(self): """Code absent du dossier et de l'UCR → warning.""" parsed = {"conclusion": "Le code Z45.8 confirme la nécessité du séjour."} dossier = _make_dossier() # DP K81.0, DAS K56.0 controle = _make_controle() # da_ucr=K56.0 warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) >= 1 assert any("Z45" in w for w in warnings) def test_subcode_tolerated(self): """K81.09 toléré quand K81.0 est dans la whitelist (même préfixe 3 chars).""" parsed = {"contre_arguments_medicaux": "Le sous-code K81.09 est une précision de K81.0."} dossier = _make_dossier() # DP K81.0 controle = _make_controle() warnings = _validate_codes_in_response(parsed, dossier, controle) # K81.09 partage le préfixe K81 avec K81.0 → toléré assert len(warnings) == 0 def test_codes_in_citations_excluded(self): """Codes dans references[].citation → pas de validation.""" parsed = { "conclusion": "Le codage est justifié.", "references": [ {"document": "CIM-10", "citation": "Z45.8 — Ajustement d'un dispositif"}, ], } dossier = _make_dossier() controle = _make_controle() warnings = _validate_codes_in_response(parsed, dossier, controle) # Z45.8 est dans references, pas dans les champs textuels → pas flaggé assert len(warnings) == 0 def test_no_codes_in_response_no_warning(self): """Réponse sans codes CIM-10 → 0 warnings.""" parsed = {"conclusion": "Le séjour est justifié par la gravité clinique."} dossier = _make_dossier() controle = _make_controle() warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) == 0 def test_multiple_invented_codes(self): """Plusieurs codes hors périmètre → autant de warnings.""" parsed = { "contre_arguments_medicaux": "Les codes Z45.8 et E11.9 confirment le diagnostic.", } dossier = _make_dossier() # K81.0, K56.0 controle = _make_controle() warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) >= 2 def test_no_whitelist_no_validation(self): """Aucun code dans le dossier ni l'UCR → pas de validation (0 warnings).""" parsed = {"conclusion": "Le code Z45.8 est justifié."} dossier = DossierMedical(source_file="test.pdf", diagnostic_principal=None) controle = ControleCPAM( numero_ogc=1, titre="Test", arg_ucr="Test", decision_ucr="Rejet", dp_ucr=None, da_ucr=None, ) warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) == 0 class TestBuildBioSummary: """Tests pour le résumé biologique déterministe.""" def test_bio_summary_interpretation(self): """CRP élevée, Hb basse → résumé correct avec interprétations cliniques.""" dossier = DossierMedical( source_file="test.pdf", biologie_cle=[ BiologieCle(test="CRP", valeur="180 mg/L", anomalie=True), BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True), ], ) summary = _build_bio_summary(dossier) assert "CRP" in summary assert "ÉLEVÉ" in summary assert "infection/inflammation active" in summary assert "Hémoglobine" in summary assert "BAS" in summary assert "anémie" in summary def test_bio_summary_normal_values(self): """Valeurs normales → interprétation 'normal' affichée.""" dossier = DossierMedical( source_file="test.pdf", biologie_cle=[ BiologieCle(test="Plaquettes", valeur="250 G/L", anomalie=False), ], ) summary = _build_bio_summary(dossier) assert "NORMAL" in summary assert "numération normale" in summary def test_bio_summary_in_prompt(self): """Le résumé bio apparaît dans le prompt CPAM.""" dossier = _make_dossier_complet() # CRP 180, Créatinine 450 controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "FAITS BIOLOGIQUES VÉRIFIÉS" in prompt assert "NE PAS MODIFIER" in prompt assert "RÈGLE STRICTE" in prompt def test_bio_summary_empty_no_bio(self): """Pas de biologie → résumé vide.""" dossier = DossierMedical(source_file="test.pdf") summary = _build_bio_summary(dossier) assert summary == "" def test_bio_summary_unknown_test(self): """Test bio non reconnu (hors BIO_NORMALS) → omis du résumé.""" dossier = DossierMedical( source_file="test.pdf", biologie_cle=[ BiologieCle(test="Vitamine D", valeur="15 ng/mL", anomalie=True), ], ) summary = _build_bio_summary(dossier) assert summary == "" def test_bio_summary_unparseable_value(self): """Valeur bio non parseable → omise sans crash.""" dossier = DossierMedical( source_file="test.pdf", biologie_cle=[ BiologieCle(test="CRP", valeur="positif", anomalie=True), BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True), ], ) summary = _build_bio_summary(dossier) # CRP "positif" non parseable → omis, mais Hb présente assert "Hémoglobine" in summary assert "CRP" not in summary class TestCorrectionLoop: """Tests pour la boucle de correction adversariale.""" @patch("src.control.cpam_response.rule_enabled", return_value=True) @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_correction_triggered_when_score_low(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): """Score adversarial ≤ 5 → correction relancée (5 appels LLM total).""" mock_rag.return_value = [] call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: # Passe 1 extraction return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: # Passe 2 argumentation return { "analyse_contestation": "Analyse...", "contre_arguments_medicaux": "Arguments erronés...", "conclusion": "Conclusion avec erreurs...", } elif call_count["n"] == 3: # Passe 3 validation adversariale → score bas return {"coherent": False, "erreurs": ["CRP citée à 250 mais vaut 180"], "score_confiance": 3} elif call_count["n"] == 4: # Passe 4 correction return { "analyse_contestation": "Analyse corrigée...", "contre_arguments_medicaux": "Arguments corrigés...", "conclusion": "Conclusion corrigée...", } else: # Passe 5 re-validation return {"coherent": True, "erreurs": [], "score_confiance": 8} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) # 5 appels Ollama : extraction + argumentation + validation + correction + re-validation assert call_count["n"] == 5 # La correction a été acceptée (score 8 > 3) assert "corrigé" in text.lower() @patch("src.control.cpam_response.rule_enabled", return_value=True) @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_no_correction_when_score_high(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): """Score adversarial > 5 → pas de correction (3 appels LLM).""" mock_rag.return_value = [] call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse...", "contre_arguments_medicaux": "Arguments...", "conclusion": "Conclusion...", } else: return {"coherent": True, "erreurs": [], "score_confiance": 8} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) # Seulement 3 appels : extraction + argumentation + validation assert call_count["n"] == 3 @patch("src.control.cpam_response.rule_enabled", return_value=True) @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_correction_accepted_when_score_improves(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): """Score passe de 3 à 7 → correction acceptée.""" mock_rag.return_value = [] call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse originale...", "contre_arguments_medicaux": "Arguments originaux...", "conclusion": "Conclusion originale...", } elif call_count["n"] == 3: return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 3} elif call_count["n"] == 4: return { "analyse_contestation": "Analyse améliorée...", "contre_arguments_medicaux": "Arguments améliorés...", "conclusion": "Conclusion améliorée...", } else: return {"coherent": True, "erreurs": [], "score_confiance": 7} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) # Le résultat final est la correction assert response_data["conclusion"] == "Conclusion améliorée..." @patch("src.control.cpam_response.rule_enabled", return_value=True) @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_correction_rejected_when_score_same(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): """Score ne s'améliore pas → original conservé.""" mock_rag.return_value = [] call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse originale...", "contre_arguments_medicaux": "Arguments originaux...", "conclusion": "Conclusion originale...", } elif call_count["n"] == 3: return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 4} elif call_count["n"] == 4: return { "analyse_contestation": "Correction pire...", "contre_arguments_medicaux": "Arguments pires...", "conclusion": "Conclusion pire...", } else: return {"coherent": False, "erreurs": ["Encore des erreurs"], "score_confiance": 3} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) # Score correction (3) <= score original (4) → original conservé assert response_data["conclusion"] == "Conclusion originale..." @patch("src.control.cpam_response.rule_enabled", return_value=False) @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_correction_disabled_by_rule(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): """RULE-CPAM-CORRECTION-LOOP désactivée → pas de retry.""" mock_rag.return_value = [] call_count = {"n": 0} def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} elif call_count["n"] == 2: return { "analyse_contestation": "Analyse...", "contre_arguments_medicaux": "Arguments...", "conclusion": "Conclusion...", } else: return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 2} mock_ollama.side_effect = ollama_side_effect mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() text, response_data, sources = generate_cpam_response(dossier, controle) # Seulement 3 appels, pas de correction (règle désactivée) assert call_count["n"] == 3 def test_build_correction_prompt_format(self): """Le prompt de correction contient les erreurs et la réponse originale.""" original_prompt = "Prompt d'argumentation original..." original_response = { "analyse_contestation": "Analyse avec erreur CRP 250", "conclusion": "Conclusion erronée", } adversarial_result = { "coherent": False, "erreurs": ["CRP citée à 250 mg/L mais le dossier indique 180 mg/L"], "score_confiance": 3, } correction = _build_correction_prompt(original_prompt, original_response, adversarial_result) assert "CORRECTION REQUISE" in correction assert "CRP citée à 250" in correction assert "Prompt d'argumentation original" in correction assert "Corrige UNIQUEMENT" in correction class TestAssessQualityTier: """Tests pour la classification qualité CPAM (A/B/C).""" def test_tier_a_no_warnings_high_score(self): """0 warning, score adversarial >= 7 → tier A, requires_review=False.""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=[], code_warnings=[], adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 9}, ) assert tier == "A" assert review is False assert len(warnings) == 0 def test_tier_b_ref_warnings(self): """Warnings de référence → tier B.""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=["Référence non vérifiable : Manuel Inventé"], grounding_warnings=[], code_warnings=[], adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 8}, ) assert tier == "B" assert review is False assert any("[MINEUR]" in w for w in warnings) def test_tier_b_medium_adversarial_score(self): """Score adversarial 4-6 → tier B.""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=[], code_warnings=[], adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 5}, ) assert tier == "B" assert review is False def test_tier_b_one_grounding_warning(self): """1 preuve non traçable → tier B (mineur).""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=["Preuve [BIO-99] non traçable"], code_warnings=[], adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 8}, ) assert tier == "B" assert review is False assert any("[MINEUR]" in w for w in warnings) def test_tier_c_code_warnings(self): """Code hors périmètre → tier C, requires_review=True.""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=[], code_warnings=["Code Z45.8 hors périmètre dossier/UCR"], adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 7}, ) assert tier == "C" assert review is True assert any("[CRITIQUE]" in w for w in warnings) def test_tier_c_low_adversarial_score(self): """Score adversarial < 4 → tier C.""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=[], code_warnings=[], adversarial_result={"coherent": False, "erreurs": ["Bio inventée"], "score_confiance": 2}, ) assert tier == "C" assert review is True assert any("[CRITIQUE]" in w for w in warnings) def test_tier_c_many_grounding_warnings(self): """3+ preuves non traçables → tier C (critique).""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=[ "Preuve [BIO-1] non traçable", "Preuve [BIO-2] non traçable", "Preuve [BIO-3] non traçable", ], code_warnings=[], adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 7}, ) assert tier == "C" assert review is True def test_tier_a_no_adversarial(self): """Pas de validation adversariale (None) + 0 warnings → tier A.""" tier, review, warnings = _assess_quality_tier( parsed={}, ref_warnings=[], grounding_warnings=[], code_warnings=[], adversarial_result=None, ) assert tier == "A" assert review is False class TestFormatResponseCategorized: """Tests pour le formatage avec warnings catégorisés et quality_tier.""" def test_tier_c_banner(self): """Tier C → bandeau REVUE MANUELLE REQUISE.""" text = _format_response( {"conclusion": "Conclusion..."}, quality_tier="C", categorized_warnings=["[CRITIQUE] Code hors périmètre"], ) assert "REVUE MANUELLE REQUISE" in text assert "Qualité : C" in text assert "AVERTISSEMENTS CRITIQUES" in text def test_tier_a_no_banner(self): """Tier A → pas de bandeau.""" text = _format_response( {"conclusion": "Conclusion..."}, quality_tier="A", categorized_warnings=[], ) assert "REVUE MANUELLE REQUISE" not in text def test_warnings_separated(self): """Warnings critiques et mineurs dans des sections distinctes.""" text = _format_response( {"conclusion": "Conclusion..."}, quality_tier="C", categorized_warnings=[ "[CRITIQUE] Code Z45.8 hors périmètre", "[MINEUR] Référence non vérifiable", ], ) assert "AVERTISSEMENTS CRITIQUES" in text assert "AVERTISSEMENTS MINEURS" in text assert text.index("CRITIQUES") < text.index("MINEURS") def test_backward_compat_old_ref_warnings(self): """Sans categorized_warnings, fallback sur ref_warnings.""" text = _format_response( {"conclusion": "Conclusion..."}, ref_warnings=["Référence non vérifiable : X"], ) assert "AVERTISSEMENT — REFERENCES NON VÉRIFIÉES" in text class TestCheckDasBioCoherenceExtended: """Tests pour les nouveaux patterns DAS/bio (Phase 5).""" def test_sepsis_with_normal_crp(self): """DAS 'sepsis' mais CRP normale → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Sepsis sévère", cim10_suggestion="A41.9"), ], biologie_cle=[ BiologieCle(test="CRP", valeur="3", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 assert any("Sepsis" in w or "sepsis" in w for w in warnings) def test_infarctus_with_normal_troponine(self): """DAS 'infarctus' mais troponine normale → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Infarctus du myocarde", cim10_suggestion="I21.9"), ], biologie_cle=[ BiologieCle(test="Troponine", valeur="0.01", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 def test_infarctus_with_high_troponine_ok(self): """DAS 'infarctus' + troponine élevée → pas d'incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Infarctus du myocarde", cim10_suggestion="I21.9"), ], biologie_cle=[ BiologieCle(test="Troponine", valeur="0.5", anomalie=True), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) == 0 def test_denutrition_with_normal_albumine(self): """DAS 'dénutrition' mais albumine normale → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Dénutrition sévère", cim10_suggestion="E43"), ], biologie_cle=[ BiologieCle(test="Albumine", valeur="42", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 def test_hypothyroidie_with_normal_tsh(self): """DAS 'hypothyroïdie' mais TSH normale → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Hypothyroïdie", cim10_suggestion="E03.9"), ], biologie_cle=[ BiologieCle(test="TSH", valeur="2.5", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 def test_diabete_with_normal_glycemie(self): """DAS 'diabète' mais glycémie normale → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Diabète de type 2", cim10_suggestion="E11.9"), ], biologie_cle=[ BiologieCle(test="Glycémie", valeur="4.5", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 def test_embolie_pulmonaire_with_normal_d_dimeres(self): """DAS 'embolie pulmonaire' mais D-dimères normaux → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Embolie pulmonaire", cim10_suggestion="I26.9"), ], biologie_cle=[ BiologieCle(test="D-dimères", valeur="200", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 def test_insuffisance_renale_with_normal_creatinine(self): """DAS 'insuffisance rénale' mais créatinine normale → incohérence.""" dossier = DossierMedical( source_file="test.pdf", diagnostics_associes=[ Diagnostic(texte="Insuffisance rénale aiguë", cim10_suggestion="N17.9"), ], biologie_cle=[ BiologieCle(test="Créatinine", valeur="80", anomalie=False), ], ) warnings = _check_das_bio_coherence(dossier) assert len(warnings) >= 1 class TestCodesAutorisesWhitelist: """Tests pour la whitelist de codes autorisés (anti-hallucination).""" def test_whitelist_in_prompt(self): """Le prompt contient la section PÉRIMÈTRE DE CODES AUTORISÉS.""" dossier = _make_dossier() # DP K81.0, DAS K56.0 controle = _make_controle() # dp_ucr=K80.1, da_ucr=K56.0 prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "PÉRIMÈTRE DE CODES AUTORISÉS" in prompt assert "INTERDICTION" in prompt def test_whitelist_contains_dossier_codes(self): """Tous les codes du dossier sont dans la whitelist.""" dossier = _make_dossier() # DP K81.0, DAS K56.0 controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "K81.0" in prompt assert "K56.0" in prompt def test_whitelist_contains_ucr_codes(self): """Tous les codes UCR sont dans la whitelist.""" dossier = _make_dossier() controle = _make_controle() controle.dp_ucr = "K80.1" prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "K80.1" in prompt def test_whitelist_dedup(self): """Les codes en double (dossier + UCR) ne sont listés qu'une fois.""" dossier = _make_dossier() # K56.0 en DAS controle = _make_controle() # da_ucr=K56.0 prompt, _ = _build_cpam_prompt(dossier, controle, []) # K56.0 apparaît dans PÉRIMÈTRE mais une seule fois dans cette section perimetre_idx = prompt.index("PÉRIMÈTRE DE CODES AUTORISÉS") interdit_idx = prompt.index("INTERDICTION") perimetre_section = prompt[perimetre_idx:interdit_idx] assert perimetre_section.count("K56.0") == 1 def test_whitelist_prohibition_message(self): """Le message d'interdiction est clair et complet.""" dossier = _make_dossier() controle = _make_controle() prompt, _ = _build_cpam_prompt(dossier, controle, []) assert "Ne mentionne AUCUN code CIM-10 qui ne figure pas" in prompt class TestTaggedContextNewTags: """Tests pour les tags DP, DAS-N, ANT-N, COMPL-N dans _build_tagged_context().""" def test_dp_tag_generated(self): """Le tag [DP] est généré pour le diagnostic principal.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="Cholécystite aiguë", cim10_suggestion="K81.0"), biologie_cle=[BiologieCle(test="CRP", valeur="180 mg/L")], ) text, tag_map = _build_tagged_context(dossier) assert "DP" in tag_map assert "Cholécystite aiguë (K81.0)" in tag_map["DP"] assert "[DP]" in text def test_dp_without_code(self): """Le tag [DP] fonctionne même sans code CIM-10.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="Infection urinaire"), biologie_cle=[BiologieCle(test="CRP", valeur="50 mg/L")], ) text, tag_map = _build_tagged_context(dossier) assert "DP" in tag_map assert "Infection urinaire" in tag_map["DP"] assert "()" not in tag_map["DP"] def test_das_tags_generated(self): """Les tags [DAS-1], [DAS-2] sont générés pour les diagnostics associés.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="DP test"), diagnostics_associes=[ Diagnostic(texte="Iléus réflexe", cim10_suggestion="K56.0"), Diagnostic(texte="HTA", cim10_suggestion="I10"), ], ) text, tag_map = _build_tagged_context(dossier) assert "DAS-1" in tag_map assert "DAS-2" in tag_map assert "K56.0" in tag_map["DAS-1"] assert "[DAS-1]" in text assert "[DAS-2]" in text def test_ant_tags_generated(self): """Les tags [ANT-1], [ANT-2] sont générés pour les antécédents.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="DP test"), antecedents=[ Antecedent(texte="Diabète type 2"), Antecedent(texte="HTA"), ], ) text, tag_map = _build_tagged_context(dossier) assert "ANT-1" in tag_map assert "ANT-2" in tag_map assert "Diabète type 2" in tag_map["ANT-1"] assert "[ANT-1]" in text assert "[ANT-2]" in text def test_ant_tags_capped_at_10(self): """Les antécédents sont limités à 10 tags maximum.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="DP test"), antecedents=[Antecedent(texte=f"Antécédent {i}") for i in range(15)], ) _, tag_map = _build_tagged_context(dossier) assert "ANT-10" in tag_map assert "ANT-11" not in tag_map def test_compl_tags_generated(self): """Les tags [COMPL-1] sont générés pour les complications.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="DP test"), complications=[ Complication(texte="Infection de paroi"), Complication(texte="Hémorragie post-op"), ], ) text, tag_map = _build_tagged_context(dossier) assert "COMPL-1" in tag_map assert "COMPL-2" in tag_map assert "Infection de paroi" in tag_map["COMPL-1"] assert "[COMPL-1]" in text def test_all_new_tags_in_complet_dossier(self): """Un dossier complet génère tous les types de tags.""" dossier = DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="Cholécystite", cim10_suggestion="K81.0"), diagnostics_associes=[Diagnostic(texte="Iléus", cim10_suggestion="K56.0")], biologie_cle=[BiologieCle(test="CRP", valeur="180 mg/L")], imagerie=[Imagerie(type="Scanner")], traitements_sortie=[Traitement(medicament="Augmentin")], actes_ccam=[ActeCCAM(texte="Cholécystectomie")], antecedents=[Antecedent(texte="HTA")], complications=[Complication(texte="Hémorragie")], ) text, tag_map = _build_tagged_context(dossier) for expected_tag in ["BIO-1", "IMG-1", "TRT-1", "ACTE-1", "DP", "DAS-1", "ANT-1", "COMPL-1"]: assert expected_tag in tag_map, f"Tag {expected_tag} manquant" def test_grounding_das_ref_valid(self): """Ref DAS-1 dans preuves_dossier → pas de warning.""" tag_map = {"DAS-1": "Iléus réflexe (K56.0)", "DP": "Cholécystite (K81.0)"} response_data = { "preuves_dossier": [ {"ref": "DAS-1", "element": "diagnostic", "valeur": "Iléus réflexe", "signification": "DAS justifié"}, {"ref": "DP", "element": "diagnostic", "valeur": "Cholécystite", "signification": "DP confirmé"}, ] } warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 0 class TestFuzzyMatchRef: """Tests pour le fuzzy matching de refs CIM-10 dans _fuzzy_match_ref().""" def test_cim10_code_matches_das_content(self): """Un code CIM-10 nu (C83.3) est résolu vers le DAS qui le contient.""" tag_map = { "DAS-1": "Lymphome folliculaire (C83.3)", "BIO-1": "CRP: 180 mg/L", } result = _fuzzy_match_ref("C83.3", tag_map) assert result == "DAS-1" def test_cim10_code_matches_dp(self): """Un code CIM-10 résolu vers le tag DP.""" tag_map = {"DP": "Cholécystite aiguë (K81.0)"} result = _fuzzy_match_ref("K81.0", tag_map) assert result == "DP" def test_cim10_code_no_match(self): """Un code CIM-10 absent du tag_map → None.""" tag_map = {"BIO-1": "CRP: 180 mg/L", "DAS-1": "Iléus (K56.0)"} result = _fuzzy_match_ref("Z45.8", tag_map) assert result is None def test_non_cim10_ref_no_match(self): """Une ref non-CIM-10 (ex: 'Antécédents') → None.""" tag_map = {"ANT-1": "HTA", "DP": "Test (K81.0)"} result = _fuzzy_match_ref("Antécédents", tag_map) assert result is None def test_grounding_fuzzy_resolves_cim10(self): """_validate_grounding résout une ref CIM-10 via fuzzy matching → pas de warning.""" tag_map = {"DAS-1": "Lymphome (C83.3)", "BIO-1": "CRP: 180"} response_data = { "preuves_dossier": [ {"ref": "C83.3", "element": "clinique", "valeur": "Lymphome", "signification": "onco"}, ] } warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 0 def test_grounding_category_name_still_warns(self): """Une ref catégorielle ('Antécédents') n'est pas résolue → warning maintenu.""" tag_map = {"ANT-1": "HTA", "BIO-1": "CRP: 5"} response_data = { "preuves_dossier": [ {"ref": "Antécédents", "element": "clinique", "valeur": "HTA", "signification": "contexte"}, ] } warnings = _validate_grounding(response_data, tag_map) assert len(warnings) == 1 assert "Antécédents" in warnings[0] class TestSanitizeUnauthorizedCodes: """Tests pour la sanitisation déterministe des codes CIM-10 hors périmètre.""" def _make_dossier_with_codes(self, dp_code="K81.0", das_codes=None): das = [Diagnostic(texte=f"DAS {c}", cim10_suggestion=c) for c in (das_codes or [])] return DossierMedical( source_file="test.pdf", diagnostic_principal=Diagnostic(texte="DP test", cim10_suggestion=dp_code), diagnostics_associes=das, ) def test_authorized_codes_kept(self): """Les codes dans le périmètre ne sont pas modifiés.""" dossier = self._make_dossier_with_codes("K81.0", ["K56.0"]) controle = ControleCPAM(numero_ogc=1, da_ucr="K56.0") parsed = { "contre_arguments_medicaux": "Le code K81.0 est justifié par la clinique.", "conclusion": "Le codage K81.0 et K56.0 est correct.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert len(removed) == 0 assert "K81.0" in parsed["contre_arguments_medicaux"] assert "K56.0" in parsed["conclusion"] def test_unauthorized_code_removed_from_text(self): """Un code hors périmètre (D62) est supprimé du texte.""" dossier = self._make_dossier_with_codes("K81.0") controle = ControleCPAM(numero_ogc=1) parsed = { "contre_arguments_medicaux": "Le code D62 serait plus approprié que K81.0.", "conclusion": "Maintenir K81.0.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert "D62" in removed assert "D62" not in parsed["contre_arguments_medicaux"] # K81.0 est toujours là assert "K81.0" in parsed["contre_arguments_medicaux"] def test_code_with_dash_libelle_cleaned(self): """'D62 — Anémie posthémorragique' → 'Anémie posthémorragique'.""" dossier = self._make_dossier_with_codes("K81.0") controle = ControleCPAM(numero_ogc=1) parsed = { "contre_arguments_medicaux": "D62 — Anémie posthémorragique aiguë est plus adapté.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert "D62" in removed text = parsed["contre_arguments_medicaux"] assert "D62" not in text assert "Anémie posthémorragique" in text def test_code_in_parentheses_cleaned(self): """'anémie (D62)' → 'anémie'.""" dossier = self._make_dossier_with_codes("K81.0") controle = ControleCPAM(numero_ogc=1) parsed = { "conclusion": "L'anémie (D62) n'est pas justifiée.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert "D62" in removed text = parsed["conclusion"] assert "(D62)" not in text assert "()" not in text assert "anémie" in text.lower() def test_multiple_unauthorized_codes(self): """Plusieurs codes hors périmètre sont tous supprimés.""" dossier = self._make_dossier_with_codes("K81.0") controle = ControleCPAM(numero_ogc=1) parsed = { "contre_arguments_medicaux": "D62 et T81.0 et T80 sont des alternatives.", "conclusion": "K81.0 est maintenu.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert len(removed) == 3 assert "D62" not in parsed["contre_arguments_medicaux"] assert "T81.0" not in parsed["contre_arguments_medicaux"] assert "T80" not in parsed["contre_arguments_medicaux"] def test_preuves_dossier_sanitized(self): """Les codes hors périmètre dans preuves_dossier.valeur sont aussi nettoyés.""" dossier = self._make_dossier_with_codes("K81.0") controle = ControleCPAM(numero_ogc=1) parsed = { "preuves_dossier": [ {"ref": "BIO-1", "valeur": "Anémie D62 documentée", "signification": "test"}, ], } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert "D62" in removed assert "D62" not in parsed["preuves_dossier"][0]["valeur"] def test_no_whitelist_no_sanitization(self): """Sans whitelist (pas de codes dans le dossier), aucune sanitisation.""" dossier = DossierMedical(source_file="test.pdf") controle = ControleCPAM(numero_ogc=1) parsed = { "contre_arguments_medicaux": "Le code D62 est pertinent.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert len(removed) == 0 assert "D62" in parsed["contre_arguments_medicaux"] def test_prefix_match_allows_subcodes(self): """K81.09 est autorisé si K81.0 est dans le périmètre (même préfixe K81).""" dossier = self._make_dossier_with_codes("K81.0") controle = ControleCPAM(numero_ogc=1) parsed = { "contre_arguments_medicaux": "K81.09 est un sous-code valide.", } removed = _sanitize_unauthorized_codes(parsed, dossier, controle) assert len(removed) == 0 assert "K81.09" in parsed["contre_arguments_medicaux"] def test_validate_codes_after_sanitize_no_warnings(self): """Après sanitisation, _validate_codes_in_response ne trouve plus de violations.""" dossier = self._make_dossier_with_codes("K81.0", ["K56.0"]) controle = ControleCPAM(numero_ogc=1, da_ucr="K56.0") parsed = { "contre_arguments_medicaux": "D62 et T81.0 sont hors périmètre. K81.0 est correct.", "conclusion": "Maintenir K81.0.", } # Sanitise d'abord _sanitize_unauthorized_codes(parsed, dossier, controle) # Puis valide → 0 warning warnings = _validate_codes_in_response(parsed, dossier, controle) assert len(warnings) == 0