- parse_json_response : réparation JSON tronqué par max_tokens (fermeture auto des structures ouvertes), meilleur stripping des blocs fencés avec texte superflu après la fermeture ``` - call_ollama : retry avec backoff exponentiel (1s/2s/4s) pour les erreurs 429 rate limit, 3 tentatives au lieu de 2 - Validation adversariale : max_tokens 800 → 1500 - Prompt CPAM : whitelist PÉRIMÈTRE DE CODES AUTORISÉS (dossier DP+DAS + UCR) avec interdiction explicite des codes hors périmètre - Tests : 19 tests parse_json/_repair_truncated_json, 6 tests whitelist Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1999 lines
79 KiB
Python
1999 lines
79 KiB
Python
"""Tests pour la génération de contre-argumentation CPAM."""
|
|
|
|
from unittest.mock import patch, call
|
|
|
|
import pytest
|
|
|
|
from src.config import (
|
|
ActeCCAM,
|
|
BiologieCle,
|
|
ControleCPAM,
|
|
Diagnostic,
|
|
DossierMedical,
|
|
Imagerie,
|
|
RAGSource,
|
|
Sejour,
|
|
Traitement,
|
|
)
|
|
from src.control.cpam_response import (
|
|
_build_bio_summary,
|
|
_build_correction_prompt,
|
|
_build_cpam_prompt,
|
|
_build_tagged_context,
|
|
_check_das_bio_coherence,
|
|
_extraction_pass,
|
|
_format_response,
|
|
_get_cim10_definitions,
|
|
_get_code_label,
|
|
_search_rag_for_control,
|
|
_validate_adversarial,
|
|
_validate_codes_in_response,
|
|
_validate_grounding,
|
|
_validate_references,
|
|
_assess_quality_tier,
|
|
generate_cpam_response,
|
|
)
|
|
|
|
|
|
def _make_dossier() -> DossierMedical:
|
|
"""Crée un dossier médical de test."""
|
|
return DossierMedical(
|
|
source_file="test.pdf",
|
|
document_type="crh",
|
|
sejour=Sejour(sexe="M", age=65, duree_sejour=5),
|
|
diagnostic_principal=Diagnostic(
|
|
texte="Cholécystite aiguë",
|
|
cim10_suggestion="K81.0",
|
|
),
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Iléus réflexe", cim10_suggestion="K56.0"),
|
|
],
|
|
)
|
|
|
|
|
|
def _make_dossier_complet() -> DossierMedical:
|
|
"""Crée un dossier médical enrichi avec traitements, imagerie, antécédents."""
|
|
return DossierMedical(
|
|
source_file="test.pdf",
|
|
document_type="crh",
|
|
sejour=Sejour(sexe="M", age=65, duree_sejour=5, imc=31.2),
|
|
diagnostic_principal=Diagnostic(
|
|
texte="Cholécystite aiguë",
|
|
cim10_suggestion="K81.0",
|
|
),
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Iléus réflexe", cim10_suggestion="K56.0"),
|
|
],
|
|
actes_ccam=[
|
|
ActeCCAM(texte="Cholécystectomie", code_ccam_suggestion="HMFC004"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="CRP", valeur="180 mg/L", anomalie=True),
|
|
BiologieCle(test="Créatinine", valeur="450 µmol/L", anomalie=True),
|
|
],
|
|
imagerie=[
|
|
Imagerie(type="Scanner abdominal", conclusion="Lithiase cholédocienne confirmée"),
|
|
],
|
|
traitements_sortie=[
|
|
Traitement(medicament="Augmentin IV", posologie="3g/j"),
|
|
Traitement(medicament="Morphine SC"),
|
|
],
|
|
antecedents=["HTA", "Diabète type 2"],
|
|
)
|
|
|
|
|
|
def _make_controle() -> ControleCPAM:
|
|
"""Crée un contrôle CPAM de test."""
|
|
return ControleCPAM(
|
|
numero_ogc=17,
|
|
titre="Désaccord sur les DAS",
|
|
arg_ucr="L'UCR confirme l'avis des médecins contrôleurs au motif que le DAS K56.0 n'est pas justifié.",
|
|
decision_ucr="UCR confirme avis médecins contrôleurs",
|
|
dp_ucr=None,
|
|
da_ucr="K56.0",
|
|
)
|
|
|
|
|
|
class TestBuildPrompt:
|
|
def test_prompt_contains_dossier_info(self):
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "Cholécystite aiguë" in prompt
|
|
assert "K81.0" in prompt
|
|
assert "Iléus réflexe" in prompt
|
|
assert "65 ans" in prompt
|
|
|
|
def test_prompt_contains_cpam_argument(self):
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert controle.arg_ucr in prompt
|
|
assert controle.decision_ucr in prompt
|
|
|
|
def test_prompt_contains_codes_contestes(self):
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "DA proposés par UCR : K56.0" in prompt
|
|
|
|
def test_prompt_contains_rag_sources(self):
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
sources = [
|
|
{"document": "guide_methodo", "page": 64, "extrait": "Texte du guide..."},
|
|
{"document": "cim10", "code": "K56.0", "extrait": "Iléus paralytique..."},
|
|
]
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, sources)
|
|
|
|
assert "Guide Méthodologique MCO 2026" in prompt
|
|
assert "CIM-10 FR 2026" in prompt
|
|
assert "page 64" in prompt
|
|
|
|
def test_prompt_contains_three_axes(self):
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "AXE MÉDICAL" in prompt
|
|
assert "AXE ASYMÉTRIE D'INFORMATION" in prompt
|
|
assert "AXE RÉGLEMENTAIRE" in prompt
|
|
|
|
def test_prompt_contains_traitements_imagerie_when_present(self):
|
|
dossier = _make_dossier_complet()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "Augmentin IV 3g/j" in prompt
|
|
assert "Morphine SC" in prompt
|
|
assert "Scanner abdominal" in prompt
|
|
assert "Lithiase cholédocienne confirmée" in prompt
|
|
assert "HTA" in prompt
|
|
assert "Diabète type 2" in prompt
|
|
assert "IMC : 31.2" in prompt
|
|
|
|
def test_prompt_asymetrie_section_when_data_present(self):
|
|
dossier = _make_dossier_complet()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "ÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM" in prompt
|
|
assert "CRP: 180 mg/L (anormale)" in prompt
|
|
assert "Cholécystectomie (HMFC004)" in prompt
|
|
|
|
def test_prompt_no_asymetrie_section_when_no_data(self):
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
document_type="crh",
|
|
sejour=Sejour(),
|
|
diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"),
|
|
)
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "ÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM" not in prompt
|
|
|
|
def test_prompt_json_format_new_fields(self):
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "contre_arguments_medicaux" in prompt
|
|
assert "contre_arguments_asymetrie" in prompt
|
|
assert "contre_arguments_reglementaires" in prompt
|
|
|
|
def test_prompt_contains_cite_exacts(self):
|
|
"""Le prompt renforcé demande des preuves exactes."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "CITE" in prompt
|
|
assert "EXACTS" in prompt
|
|
|
|
def test_prompt_contains_interdiction(self):
|
|
"""Le prompt interdit les références inventées."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "INTERDICTION ABSOLUE" in prompt
|
|
|
|
def test_prompt_contains_preuves_dossier_field(self):
|
|
"""Le format JSON demandé inclut preuves_dossier."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "preuves_dossier" in prompt
|
|
|
|
@patch("src.control.cpam_context.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale"))
|
|
@patch("src.control.cpam_context.normalize_code", return_value="K56.0")
|
|
def test_prompt_codes_with_cim10_labels(self, mock_norm, mock_valid):
|
|
"""Les codes contestés affichent le libellé CIM-10."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "Iléus paralytique" in prompt
|
|
assert "DA proposés par UCR" in prompt
|
|
|
|
@patch("src.control.cpam_context.validate_code", return_value=(False, ""))
|
|
@patch("src.control.cpam_context.normalize_code", return_value="Z99.9")
|
|
def test_prompt_codes_invalid_graceful(self, mock_norm, mock_valid):
|
|
"""Les codes invalides ne crashent pas, juste pas de libellé."""
|
|
dossier = _make_dossier()
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Test", arg_ucr="Test",
|
|
decision_ucr="Rejet", dp_ucr="Z99.9", da_ucr=None,
|
|
)
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "Z99.9" in prompt
|
|
# Pas de crash
|
|
|
|
@patch("src.control.cpam_context.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable"))
|
|
@patch("src.control.cpam_context.normalize_code", return_value="Z45.8")
|
|
def test_prompt_dp_fallback_from_ucr(self, mock_norm, mock_valid):
|
|
"""DP absent + dp_ucr → contexte injecté dans le prompt."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
document_type="crh",
|
|
sejour=Sejour(),
|
|
diagnostic_principal=None,
|
|
)
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Désaccord DP", arg_ucr="Test",
|
|
decision_ucr="Rejet", dp_ucr="Z45.8", da_ucr=None,
|
|
)
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "codé par l'établissement" in prompt
|
|
assert "contesté par la CPAM" in prompt
|
|
assert "Z45.8" in prompt
|
|
|
|
|
|
class TestFormatResponse:
|
|
def test_full_response_new_format(self):
|
|
parsed = {
|
|
"analyse_contestation": "La CPAM conteste le DAS K56.0",
|
|
"points_accord": "Aucun",
|
|
"contre_arguments_medicaux": "Le guide méthodologique précise...",
|
|
"contre_arguments_asymetrie": "La biologie montre une CRP à 180...",
|
|
"contre_arguments_reglementaires": "L'UCR interprète restrictivement...",
|
|
"references": "Guide métho p.64, CIM-10 K56.0",
|
|
"conclusion": "Le DAS est justifié",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "ANALYSE DE LA CONTESTATION" in text
|
|
assert "CONTRE-ARGUMENTS MÉDICAUX" in text
|
|
assert "ASYMÉTRIE D'INFORMATION" in text
|
|
assert "CONTRE-ARGUMENTS RÉGLEMENTAIRES" in text
|
|
assert "REFERENCES" in text
|
|
assert "CONCLUSION" in text
|
|
# "Aucun" ne doit pas générer la section points d'accord
|
|
assert "POINTS D'ACCORD" not in text
|
|
# L'ancien champ ne doit pas apparaître
|
|
assert "CONTRE-ARGUMENTS\n" not in text
|
|
|
|
def test_fallback_old_format(self):
|
|
"""L'ancien champ contre_arguments est toujours géré (réponses en cache)."""
|
|
parsed = {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments": "Arguments anciens...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "CONTRE-ARGUMENTS\nArguments anciens..." in text
|
|
assert "CONCLUSION" in text
|
|
|
|
def test_new_fields_override_fallback(self):
|
|
"""Si les nouveaux champs existent, l'ancien contre_arguments est ignoré."""
|
|
parsed = {
|
|
"contre_arguments_medicaux": "Médicaux...",
|
|
"contre_arguments": "Ancien fallback...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "CONTRE-ARGUMENTS MÉDICAUX" in text
|
|
assert "Ancien fallback" not in text
|
|
|
|
def test_partial_response(self):
|
|
parsed = {
|
|
"contre_arguments_medicaux": "Arguments médicaux...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "CONTRE-ARGUMENTS MÉDICAUX" in text
|
|
assert "CONCLUSION" in text
|
|
|
|
def test_empty_response(self):
|
|
text = _format_response({})
|
|
assert text == ""
|
|
|
|
def test_preuves_dossier_formatting(self):
|
|
"""Le nouveau champ preuves_dossier est formaté correctement."""
|
|
parsed = {
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"preuves_dossier": [
|
|
{"element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation sévère"},
|
|
{"element": "imagerie", "valeur": "lithiase cholédocienne", "signification": "confirme le diagnostic"},
|
|
],
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "PREUVES DU DOSSIER" in text
|
|
assert "CRP 180 mg/L" in text
|
|
assert "[biologie]" in text
|
|
assert "[imagerie]" in text
|
|
|
|
def test_structured_references_formatting(self):
|
|
"""Les références structurées sont formatées correctement."""
|
|
parsed = {
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"references": [
|
|
{"document": "Guide Méthodologique MCO 2026", "page": "64", "citation": "Le DAS doit être..."},
|
|
],
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "REFERENCES" in text
|
|
assert "Guide Méthodologique MCO 2026" in text
|
|
assert "p.64" in text
|
|
assert "Le DAS doit être..." in text
|
|
|
|
def test_ref_warnings_appended(self):
|
|
"""Les avertissements de références non vérifiées apparaissent."""
|
|
parsed = {"conclusion": "Conclusion..."}
|
|
warnings = ["Référence non vérifiable : Manuel Imaginaire 2025"]
|
|
text = _format_response(parsed, ref_warnings=warnings)
|
|
|
|
assert "AVERTISSEMENT" in text
|
|
assert "Manuel Imaginaire 2025" in text
|
|
|
|
|
|
class TestValidateReferences:
|
|
def test_valid_reference_no_warning(self):
|
|
parsed = {
|
|
"references": [
|
|
{"document": "Guide Méthodologique MCO 2026", "page": "64", "citation": "..."},
|
|
]
|
|
}
|
|
sources = [{"document": "guide_methodo", "page": 64, "extrait": "..."}]
|
|
warnings = _validate_references(parsed, sources)
|
|
assert len(warnings) == 0
|
|
|
|
def test_invented_reference_detected(self):
|
|
parsed = {
|
|
"references": [
|
|
{"document": "Manuel Inventé 2025", "page": "12", "citation": "..."},
|
|
]
|
|
}
|
|
sources = [{"document": "guide_methodo", "page": 64, "extrait": "..."}]
|
|
warnings = _validate_references(parsed, sources)
|
|
assert len(warnings) == 1
|
|
assert "Manuel Inventé" in warnings[0]
|
|
|
|
def test_old_format_string_no_crash(self):
|
|
"""L'ancien format string pour references ne cause pas de crash."""
|
|
parsed = {"references": "Guide méthodo p.64"}
|
|
sources = [{"document": "guide_methodo"}]
|
|
warnings = _validate_references(parsed, sources)
|
|
assert len(warnings) == 0 # pas de validation sur l'ancien format
|
|
|
|
def test_no_sources_no_validation(self):
|
|
parsed = {
|
|
"references": [
|
|
{"document": "Quelque chose", "page": "1", "citation": "..."},
|
|
]
|
|
}
|
|
warnings = _validate_references(parsed, [])
|
|
assert len(warnings) == 0
|
|
|
|
|
|
class TestGenerateResponse:
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama):
|
|
"""Ollama disponible → 3 passes (extraction + argumentation + validation)."""
|
|
mock_rag.return_value = [
|
|
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
|
|
]
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction...", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments_medicaux": "Contre-arguments médicaux...",
|
|
"contre_arguments_asymetrie": "Asymétrie...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
else:
|
|
return {"coherent": True, "erreurs": [], "score_confiance": 9}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
assert "Contre-arguments médicaux..." in text
|
|
assert response_data is not None
|
|
assert response_data["analyse_contestation"] == "Analyse..."
|
|
assert len(sources) == 1
|
|
assert sources[0].document == "guide_methodo"
|
|
# 3 appels Ollama : extraction + argumentation + validation
|
|
assert call_count["n"] == 3
|
|
mock_anthropic.assert_not_called()
|
|
|
|
@patch("src.control.cpam_validation.call_anthropic")
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_val_anthropic):
|
|
"""Ollama indisponible → fallback Haiku pour les 3 passes."""
|
|
mock_rag.return_value = [
|
|
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
|
|
]
|
|
mock_ollama.return_value = None
|
|
mock_val_ollama.return_value = None
|
|
call_count = {"n": 0}
|
|
|
|
def anthropic_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction Haiku...", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse Haiku...",
|
|
"contre_arguments_medicaux": "Contre-args Haiku...",
|
|
"conclusion": "Conclusion Haiku...",
|
|
}
|
|
else:
|
|
return {"coherent": True, "erreurs": [], "score_confiance": 8}
|
|
|
|
mock_anthropic.side_effect = anthropic_side_effect
|
|
mock_val_anthropic.side_effect = anthropic_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
assert "Contre-args Haiku..." in text
|
|
assert response_data is not None
|
|
# 3 appels Ollama (retourne None) + 3 Anthropic en fallback
|
|
assert call_count["n"] == 3
|
|
|
|
@patch("src.control.cpam_validation.call_anthropic", return_value=None)
|
|
@patch("src.control.cpam_validation.call_ollama", return_value=None)
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama, _mock_val_ollama, _mock_val_anthropic):
|
|
"""Tous LLMs indisponibles → texte vide, response_data None."""
|
|
mock_rag.return_value = []
|
|
mock_anthropic.return_value = None
|
|
mock_ollama.return_value = None
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
assert text == ""
|
|
assert response_data is None
|
|
assert sources == []
|
|
|
|
|
|
class TestSearchRagForControl:
|
|
"""Tests pour la logique de recherche RAG multi-requêtes."""
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_multiple_queries_with_da_ucr(self, mock_search):
|
|
"""Avec da_ucr, on doit avoir au moins 2 requêtes (codes + argument)."""
|
|
mock_search.return_value = [
|
|
{"document": "guide_methodo", "page": 10, "code": None, "score": 0.6,
|
|
"extrait": "Texte guide"},
|
|
]
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
|
|
results = _search_rag_for_control(controle, dossier)
|
|
|
|
# Au moins 2 appels : codes contestés + argument CPAM
|
|
assert mock_search.call_count >= 2
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_query_codes_contains_cma(self, mock_search):
|
|
"""La requête codes contestés doit contenir 'CMA' pour un DA."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# Premier appel = requête codes
|
|
first_call_query = mock_search.call_args_list[0][0][0]
|
|
assert "K56.0" in first_call_query
|
|
assert "CMA" in first_call_query
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_query_argument_contains_titre(self, mock_search):
|
|
"""La requête argument doit contenir le titre du contrôle."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# Deuxième appel = requête argument
|
|
second_call_query = mock_search.call_args_list[1][0][0]
|
|
assert controle.titre in second_call_query
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_deduplication_by_document_code_page(self, mock_search):
|
|
"""Les résultats dupliqués (même document/code/page) sont fusionnés."""
|
|
# Les deux requêtes retournent le même résultat
|
|
shared_result = {
|
|
"document": "guide_methodo", "page": 64, "code": None,
|
|
"score": 0.55, "extrait": "Texte partagé",
|
|
}
|
|
mock_search.return_value = [shared_result.copy()]
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
results = _search_rag_for_control(controle, dossier)
|
|
|
|
# Le résultat ne doit apparaître qu'une seule fois malgré les requêtes multiples
|
|
guide_results = [r for r in results if r["document"] == "guide_methodo" and r.get("page") == 64]
|
|
assert len(guide_results) == 1
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_dedup_keeps_best_score(self, mock_search):
|
|
"""La déduplication garde le meilleur score."""
|
|
def side_effect(query, top_k=8):
|
|
if "CMA" in query:
|
|
return [{"document": "cim10", "code": "K56.0", "page": None,
|
|
"score": 0.5, "extrait": "Iléus"}]
|
|
else:
|
|
return [{"document": "cim10", "code": "K56.0", "page": None,
|
|
"score": 0.7, "extrait": "Iléus"}]
|
|
mock_search.side_effect = side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
results = _search_rag_for_control(controle, dossier)
|
|
|
|
k56_results = [r for r in results if r.get("code") == "K56.0"]
|
|
assert len(k56_results) == 1
|
|
assert k56_results[0]["score"] == 0.7
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_no_codes_contestes_only_argument_query(self, mock_search):
|
|
"""Sans codes contestés, seule la requête argument est lancée."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = ControleCPAM(
|
|
numero_ogc=1,
|
|
titre="Désaccord sur la durée",
|
|
arg_ucr="Séjour trop long selon l'UCR.",
|
|
decision_ucr="Rejet",
|
|
dp_ucr=None,
|
|
da_ucr=None,
|
|
)
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# Un seul appel : requête argument (pas de codes contestés)
|
|
assert mock_search.call_count == 1
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_dp_ucr_query_contains_diagnostic_principal(self, mock_search):
|
|
"""Avec dp_ucr, la requête codes mentionne 'diagnostic principal'."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = ControleCPAM(
|
|
numero_ogc=2,
|
|
titre="Désaccord sur le DP",
|
|
arg_ucr="Le DP devrait être K80.1",
|
|
decision_ucr="Rejet",
|
|
dp_ucr="K81.0",
|
|
da_ucr=None,
|
|
)
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
first_call_query = mock_search.call_args_list[0][0][0]
|
|
assert "K81.0" in first_call_query
|
|
assert "diagnostic principal" in first_call_query
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_max_12_results(self, mock_search):
|
|
"""Le résultat final est limité à 12 entrées."""
|
|
mock_search.return_value = [
|
|
{"document": "guide_methodo", "page": i, "code": None,
|
|
"score": 0.9 - i * 0.01, "extrait": f"Texte {i}"}
|
|
for i in range(8)
|
|
]
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
|
|
results = _search_rag_for_control(controle, dossier)
|
|
|
|
assert len(results) <= 12
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_arg_ucr_not_truncated_200(self, mock_search):
|
|
"""La requête RAG argument utilise jusqu'à 500 chars, pas 200."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
long_arg = "A" * 400
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Test", arg_ucr=long_arg,
|
|
decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
|
|
)
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# La requête argument doit contenir les 400 chars (pas tronquée à 200)
|
|
arg_call_query = mock_search.call_args_list[0][0][0]
|
|
assert len(arg_call_query) > 200
|
|
|
|
@patch("src.control.cpam_rag.validate_code", return_value=(True, "Iléus paralytique"))
|
|
@patch("src.control.cpam_rag.normalize_code", return_value="K56.0")
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_query_cim10_definitions(self, mock_search, mock_norm, mock_valid):
|
|
"""Requête 4 exécutée quand codes contestés présents."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# Chercher la requête contenant "CIM-10" et "définition"
|
|
cim10_queries = [
|
|
c[0][0] for c in mock_search.call_args_list
|
|
if "CIM-10" in c[0][0] and "définition" in c[0][0]
|
|
]
|
|
assert len(cim10_queries) >= 1
|
|
assert "K56.0" in cim10_queries[0]
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_query_rule_extraction(self, mock_search):
|
|
"""Requête 5 exécutée quand arg_ucr contient une règle nommée."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Désaccord DAS",
|
|
arg_ucr="Selon la RègleT7 et l'Annexe-4B, le DAS n'est pas justifié.",
|
|
decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
|
|
)
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# Chercher la requête contenant les règles extraites
|
|
rule_queries = [
|
|
c[0][0] for c in mock_search.call_args_list
|
|
if "guide méthodologique" in c[0][0]
|
|
]
|
|
assert len(rule_queries) >= 1
|
|
assert "RègleT7" in rule_queries[0] or "Annexe" in rule_queries[0]
|
|
|
|
@patch("src.medical.rag_search.search_similar_cpam")
|
|
def test_clinical_query_when_das_match(self, mock_search):
|
|
"""Requête clinique lancée quand da_ucr matche un DAS du dossier."""
|
|
mock_search.return_value = []
|
|
|
|
dossier = _make_dossier() # DAS K56.0 "Iléus réflexe"
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
|
|
_search_rag_for_control(controle, dossier)
|
|
|
|
# Au moins 4 appels : codes + argument + clinique + CIM-10 définitions
|
|
assert mock_search.call_count >= 4
|
|
# La requête clinique contient DP + DAS textes
|
|
clinique_queries = [
|
|
c[0][0] for c in mock_search.call_args_list
|
|
if "Iléus réflexe" in c[0][0] and "Cholécystite aiguë" in c[0][0]
|
|
]
|
|
assert len(clinique_queries) >= 1
|
|
|
|
|
|
class TestGetCim10Definitions:
|
|
"""Tests pour l'injection déterministe des définitions CIM-10."""
|
|
|
|
@patch("src.control.cpam_context.validate_code")
|
|
@patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper())
|
|
def test_definitions_injected_in_prompt(self, mock_norm, mock_valid):
|
|
"""La section DÉFINITIONS CIM-10 apparaît dans le prompt avec les libellés."""
|
|
mock_valid.side_effect = lambda c: {
|
|
"K81.0": (True, "Cholécystite aiguë"),
|
|
"K56.0": (True, "Iléus paralytique et obstruction intestinale"),
|
|
}.get(c, (False, ""))
|
|
|
|
dossier = _make_dossier() # DP=K81.0, DAS=K56.0
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "DÉFINITIONS CIM-10" in prompt
|
|
assert "dictionnaire officiel" in prompt
|
|
assert "Cholécystite aiguë" in prompt
|
|
assert "Iléus paralytique" in prompt
|
|
assert "DP établissement" in prompt
|
|
|
|
@patch("src.control.cpam_context.validate_code")
|
|
@patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper())
|
|
def test_definitions_include_dp_and_ucr_codes(self, mock_norm, mock_valid):
|
|
"""Les codes du dossier ET de l'UCR sont tous inclus."""
|
|
mock_valid.side_effect = lambda c: {
|
|
"K81.0": (True, "Cholécystite aiguë"),
|
|
"K56.0": (True, "Iléus paralytique"),
|
|
"Z45.8": (True, "Ajustement d'un dispositif implantable"),
|
|
}.get(c, (False, ""))
|
|
|
|
dossier = _make_dossier() # DP=K81.0, DAS=K56.0
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Test", arg_ucr="Test",
|
|
decision_ucr="Rejet", dp_ucr="Z45.8", da_ucr="K56.0",
|
|
)
|
|
|
|
result = _get_cim10_definitions(dossier, controle)
|
|
|
|
# Codes dossier
|
|
assert "K81.0" in result
|
|
assert "DP établissement" in result
|
|
assert "K56.0" in result
|
|
# Codes UCR
|
|
assert "Z45.8" in result
|
|
assert "DP proposé UCR" in result
|
|
assert "DA proposé UCR" in result or "DAS établissement" in result
|
|
|
|
@patch("src.control.cpam_context.validate_code", return_value=(False, ""))
|
|
@patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper())
|
|
def test_definitions_graceful_when_code_unknown(self, mock_norm, mock_valid):
|
|
"""Un code inconnu ne crashe pas, affiche un message explicite."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostic_principal=None,
|
|
)
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Test", arg_ucr="Test",
|
|
decision_ucr="Rejet", dp_ucr="Z99.9", da_ucr=None,
|
|
)
|
|
|
|
result = _get_cim10_definitions(dossier, controle)
|
|
|
|
assert "Z99.9" in result
|
|
assert "non trouvé" in result
|
|
|
|
def test_definitions_empty_when_no_codes(self):
|
|
"""Aucun code → chaîne vide."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostic_principal=None,
|
|
)
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Test", arg_ucr="Test",
|
|
decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
|
|
)
|
|
|
|
result = _get_cim10_definitions(dossier, controle)
|
|
|
|
assert result == ""
|
|
|
|
|
|
class TestBuildTaggedContext:
|
|
"""Tests pour le contexte clinique tagué (grounding)."""
|
|
|
|
def test_tagged_context_bio_img_trt(self):
|
|
"""Les tags BIO, IMG, TRT, ACTE sont correctement générés."""
|
|
dossier = _make_dossier_complet()
|
|
text, tag_map = _build_tagged_context(dossier)
|
|
|
|
assert "[BIO-1]" in text
|
|
assert "CRP" in text
|
|
assert "BIO-1" in tag_map
|
|
assert "[IMG-1]" in text
|
|
assert "Scanner abdominal" in text
|
|
assert "IMG-1" in tag_map
|
|
assert "[TRT-1]" in text
|
|
assert "Augmentin IV" in text
|
|
assert "TRT-1" in tag_map
|
|
assert "[ACTE-1]" in text
|
|
assert "Cholécystectomie" in text
|
|
assert "ACTE-1" in tag_map
|
|
|
|
def test_tagged_context_bio_norms_annotated(self):
|
|
"""Les valeurs bio sont annotées avec les normes de référence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
biologie_cle=[
|
|
BiologieCle(test="CRP", valeur="5", anomalie=False),
|
|
BiologieCle(test="CRP", valeur="180", anomalie=True),
|
|
BiologieCle(test="Hémoglobine", valeur="8.5", anomalie=True),
|
|
],
|
|
)
|
|
text, tag_map = _build_tagged_context(dossier)
|
|
|
|
# CRP 5 = normal (norme 0-5)
|
|
assert "NORMAL" in tag_map.get("BIO-1", "")
|
|
# CRP 180 = élevé
|
|
assert "ÉLEVÉ" in tag_map.get("BIO-2", "")
|
|
# Hb 8.5 = bas (norme 12-17)
|
|
assert "BAS" in tag_map.get("BIO-3", "")
|
|
|
|
def test_tagged_context_empty_dossier(self):
|
|
"""Dossier sans données cliniques → texte vide, tag_map vide."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"),
|
|
)
|
|
text, tag_map = _build_tagged_context(dossier)
|
|
|
|
assert text == ""
|
|
assert tag_map == {}
|
|
|
|
def test_tagged_context_in_prompt(self):
|
|
"""Le contexte tagué apparaît dans le prompt généré."""
|
|
dossier = _make_dossier_complet()
|
|
controle = _make_controle()
|
|
prompt, tag_map = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS" in prompt
|
|
assert "[BIO-1]" in prompt
|
|
assert "[IMG-1]" in prompt
|
|
assert len(tag_map) > 0
|
|
|
|
def test_poor_dossier_warning_in_prompt(self):
|
|
"""Dossier sans bio/imagerie → avertissement dans le prompt."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"),
|
|
sejour=Sejour(sexe="M", age=70),
|
|
)
|
|
controle = _make_controle()
|
|
prompt, tag_map = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "DOSSIER PAUVRE" in prompt
|
|
assert "Ne spécule PAS" in prompt
|
|
assert len(tag_map) == 0
|
|
|
|
|
|
class TestValidateGrounding:
|
|
"""Tests pour la validation des preuves grounded."""
|
|
|
|
def test_grounding_valid_refs(self):
|
|
"""Toutes les refs existent → 0 warnings."""
|
|
tag_map = {"BIO-1": "CRP: 180 mg/L", "IMG-1": "Scanner abdominal"}
|
|
response_data = {
|
|
"preuves_dossier": [
|
|
{"ref": "BIO-1", "element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation"},
|
|
{"ref": "IMG-1", "element": "imagerie", "valeur": "Scanner", "signification": "confirme"},
|
|
]
|
|
}
|
|
warnings = _validate_grounding(response_data, tag_map)
|
|
assert len(warnings) == 0
|
|
|
|
def test_grounding_invented_ref(self):
|
|
"""Ref inventée [BIO-99] → warning détecté."""
|
|
tag_map = {"BIO-1": "CRP: 180 mg/L"}
|
|
response_data = {
|
|
"preuves_dossier": [
|
|
{"ref": "BIO-99", "element": "biologie", "valeur": "Albumine 15 g/L", "signification": "inventé"},
|
|
]
|
|
}
|
|
warnings = _validate_grounding(response_data, tag_map)
|
|
assert len(warnings) == 1
|
|
assert "BIO-99" in warnings[0]
|
|
|
|
def test_grounding_no_tag_map_no_validation(self):
|
|
"""Pas de tag_map (dossier vide) → pas de validation."""
|
|
response_data = {
|
|
"preuves_dossier": [
|
|
{"ref": "BIO-1", "element": "biologie", "valeur": "test", "signification": "test"},
|
|
]
|
|
}
|
|
warnings = _validate_grounding(response_data, {})
|
|
assert len(warnings) == 0
|
|
|
|
def test_grounding_no_ref_field_ok(self):
|
|
"""Preuves sans champ ref (ancien format) → pas de warning."""
|
|
tag_map = {"BIO-1": "CRP: 180 mg/L"}
|
|
response_data = {
|
|
"preuves_dossier": [
|
|
{"element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation"},
|
|
]
|
|
}
|
|
warnings = _validate_grounding(response_data, tag_map)
|
|
assert len(warnings) == 0
|
|
|
|
def test_format_response_with_ref(self):
|
|
"""Le formatage inclut le tag ref dans les preuves."""
|
|
parsed = {
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"preuves_dossier": [
|
|
{"ref": "BIO-1", "element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation"},
|
|
],
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
text = _format_response(parsed)
|
|
|
|
assert "[BIO-1]" in text
|
|
assert "[biologie]" in text
|
|
assert "CRP 180 mg/L" in text
|
|
|
|
|
|
class TestCheckDasBioCoherence:
|
|
"""Tests pour la vérification cohérence DAS / biologie."""
|
|
|
|
def test_leucocytose_with_low_leucocytes(self):
|
|
"""DAS 'leucocytose' mais leucocytes bas → incohérence détectée."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Leucocytose", cim10_suggestion="D72.8"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Leucocytes", valeur="3", anomalie=True),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
|
|
assert len(warnings) == 1
|
|
assert "Leucocytose" in warnings[0]
|
|
assert "NORMAL" in warnings[0]
|
|
|
|
def test_anemie_with_normal_hb(self):
|
|
"""DAS 'anémie' mais Hb normale → incohérence détectée."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Anémie ferriprive", cim10_suggestion="D50.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Hémoglobine", valeur="14.5", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
|
|
assert len(warnings) == 1
|
|
assert "anémie" in warnings[0].lower() or "Anémie" in warnings[0]
|
|
|
|
def test_coherent_das_bio_no_warnings(self):
|
|
"""DAS 'anémie' avec Hb basse → pas d'incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Anémie", cim10_suggestion="D64.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Hémoglobine", valeur="8.5", anomalie=True),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
|
|
assert len(warnings) == 0
|
|
|
|
def test_no_bio_no_crash(self):
|
|
"""Pas de biologie → pas de crash, pas de warnings."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Leucocytose", cim10_suggestion="D72.8"),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
|
|
assert len(warnings) == 0
|
|
|
|
def test_coherence_warnings_in_prompt(self):
|
|
"""Les incohérences DAS/bio apparaissent dans le prompt."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
sejour=Sejour(sexe="M", age=65),
|
|
diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"),
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Thrombocytose", cim10_suggestion="D75.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Plaquettes", valeur="200", anomalie=False),
|
|
],
|
|
)
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "ALERTES COHÉRENCE DAS / BIOLOGIE" in prompt
|
|
assert "Thrombocytose" in prompt
|
|
assert "NORMAL" in prompt
|
|
|
|
|
|
class TestPatientContext:
|
|
"""Tests pour le contexte patient dans le prompt."""
|
|
|
|
def test_pediatric_flag(self):
|
|
"""Patient < 18 ans → mention pédiatrie dans le prompt."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
sejour=Sejour(sexe="F", age=9),
|
|
diagnostic_principal=Diagnostic(texte="Appendicite", cim10_suggestion="K35.8"),
|
|
)
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "PÉDIATRIE" in prompt
|
|
assert "9 ans" in prompt
|
|
|
|
def test_elderly_flag(self):
|
|
"""Patient >= 80 ans → mention patient âgé."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
sejour=Sejour(sexe="M", age=85),
|
|
diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"),
|
|
)
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "patient âgé" in prompt
|
|
assert "85 ans" in prompt
|
|
|
|
def test_emergency_admission(self):
|
|
"""Admission en urgence → flag dans le prompt."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
sejour=Sejour(sexe="M", age=50, mode_entree="Autres admissions urgentes"),
|
|
diagnostic_principal=Diagnostic(texte="Test", cim10_suggestion="Z00"),
|
|
)
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "ADMISSION EN URGENCE" in prompt
|
|
|
|
def test_context_consigne_in_prompt(self):
|
|
"""Le prompt contient une consigne sur le contexte clinique."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "CONTEXTE CLINIQUE" in prompt
|
|
assert "ÂGE" in prompt
|
|
assert "MODE D'ENTRÉE" in prompt
|
|
|
|
|
|
class TestExtractionPass:
|
|
"""Tests pour la passe 1 — extraction structurée."""
|
|
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
def test_extraction_pass_returns_structured_json(self, mock_ollama):
|
|
"""Passe 1 retourne les champs attendus."""
|
|
mock_ollama.return_value = {
|
|
"comprehension_contestation": "La CPAM conteste le DAS K56.0",
|
|
"elements_cliniques_pertinents": [
|
|
{"tag": "BIO-1", "pertinence": "CRP élevée confirme inflammation"}
|
|
],
|
|
"points_accord_potentiels": ["Le CRH est succinct"],
|
|
"codes_en_jeu": {
|
|
"dp_etablissement": "K81.0 — Cholécystite aiguë",
|
|
"dp_ucr": "",
|
|
"difference_cle": "contestation porte sur le DAS, pas le DP",
|
|
},
|
|
}
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
result = _extraction_pass(dossier, controle)
|
|
|
|
assert result is not None
|
|
assert "comprehension_contestation" in result
|
|
assert len(result["elements_cliniques_pertinents"]) == 1
|
|
mock_ollama.assert_called_once()
|
|
|
|
@patch("src.control.cpam_response.call_anthropic", return_value=None)
|
|
@patch("src.control.cpam_response.call_ollama", return_value=None)
|
|
def test_extraction_pass_failure_returns_none(self, mock_ollama, mock_anthropic):
|
|
"""Passe 1 échoue → retourne None (fallback single-pass)."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
result = _extraction_pass(dossier, controle)
|
|
|
|
assert result is None
|
|
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
def test_extraction_injected_in_prompt(self, mock_ollama):
|
|
"""Le résultat de passe 1 est injecté dans le prompt de passe 2."""
|
|
extraction = {
|
|
"comprehension_contestation": "La CPAM conteste le DAS K56.0",
|
|
"elements_cliniques_pertinents": [
|
|
{"tag": "BIO-1", "pertinence": "CRP élevée"}
|
|
],
|
|
"points_accord_potentiels": ["Le CRH est succinct"],
|
|
"codes_en_jeu": {
|
|
"difference_cle": "contestation porte sur le DAS",
|
|
},
|
|
}
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [], extraction)
|
|
|
|
assert "PRÉ-ANALYSE" in prompt
|
|
assert "La CPAM conteste le DAS K56.0" in prompt
|
|
assert "CRP élevée" in prompt
|
|
assert "contestation porte sur le DAS" in prompt
|
|
|
|
def test_prompt_without_extraction(self):
|
|
"""Sans extraction, pas de section PRÉ-ANALYSE."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [], None)
|
|
|
|
assert "PRÉ-ANALYSE" not in prompt
|
|
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_generate_calls_three_passes(self, mock_rag, mock_ollama, mock_val_ollama):
|
|
"""L'orchestrateur appelle extraction + argumentation + validation."""
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {
|
|
"comprehension_contestation": "Contestation DAS",
|
|
"elements_cliniques_pertinents": [],
|
|
"points_accord_potentiels": [],
|
|
"codes_en_jeu": {},
|
|
}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
else:
|
|
return {"coherent": True, "erreurs": [], "score_confiance": 9}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
mock_rag.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
# 3 appels Ollama : extraction + argumentation + validation
|
|
assert call_count["n"] == 3
|
|
assert response_data is not None
|
|
assert "Arguments..." in text
|
|
|
|
|
|
class TestValidateAdversarial:
|
|
"""Tests pour la validation adversariale."""
|
|
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
def test_coherent_response_no_warnings(self, mock_ollama):
|
|
"""Réponse cohérente → coherent=true, pas de warnings dans le texte."""
|
|
mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 9}
|
|
|
|
tag_map = {"BIO-1": "CRP: 180 mg/L"}
|
|
response_data = {
|
|
"analyse_contestation": "Analyse...",
|
|
"preuves_dossier": [{"ref": "BIO-1", "valeur": "CRP 180 mg/L"}],
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
controle = _make_controle()
|
|
|
|
result = _validate_adversarial(response_data, tag_map, controle)
|
|
|
|
assert result is not None
|
|
assert result["coherent"] is True
|
|
assert len(result["erreurs"]) == 0
|
|
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
def test_hallucinated_bio_detected(self, mock_ollama):
|
|
"""Valeur bio halluccinée → coherent=false avec erreur."""
|
|
mock_ollama.return_value = {
|
|
"coherent": False,
|
|
"erreurs": ["CRP citée à 250 mg/L mais le dossier indique 180 mg/L"],
|
|
"score_confiance": 3,
|
|
}
|
|
|
|
tag_map = {"BIO-1": "CRP: 180 mg/L"}
|
|
response_data = {
|
|
"preuves_dossier": [{"ref": "BIO-1", "valeur": "CRP 250 mg/L"}],
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
controle = _make_controle()
|
|
|
|
result = _validate_adversarial(response_data, tag_map, controle)
|
|
|
|
assert result is not None
|
|
assert result["coherent"] is False
|
|
assert len(result["erreurs"]) == 1
|
|
assert "CRP" in result["erreurs"][0]
|
|
|
|
@patch("src.control.cpam_validation.call_anthropic", return_value=None)
|
|
@patch("src.control.cpam_validation.call_ollama", return_value=None)
|
|
def test_adversarial_failure_graceful(self, mock_ollama, mock_anthropic):
|
|
"""LLM indisponible → retourne None, pas de crash."""
|
|
tag_map = {"BIO-1": "CRP: 180 mg/L"}
|
|
response_data = {"conclusion": "Conclusion..."}
|
|
controle = _make_controle()
|
|
|
|
result = _validate_adversarial(response_data, tag_map, controle)
|
|
|
|
assert result is None
|
|
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_adversarial_warnings_in_output(self, mock_rag, mock_ollama, mock_val_ollama):
|
|
"""Incohérences détectées → avertissements dans le texte formaté."""
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
else:
|
|
return {
|
|
"coherent": False,
|
|
"erreurs": ["Antibiotiques mentionnés mais absents du dossier"],
|
|
"score_confiance": 4,
|
|
}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
mock_rag.return_value = []
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
assert "Antibiotiques mentionnés" in text
|
|
assert "Score adversarial" in text
|
|
|
|
def test_adversarial_empty_tag_map(self):
|
|
"""Dossier sans tags → validation fonctionne quand même."""
|
|
with patch("src.control.cpam_validation.call_ollama") as mock_ollama:
|
|
mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 7}
|
|
|
|
result = _validate_adversarial(
|
|
{"conclusion": "Test"}, {}, _make_controle()
|
|
)
|
|
|
|
assert result is not None
|
|
assert result["coherent"] is True
|
|
|
|
|
|
class TestValidateCodesInResponse:
|
|
"""Tests pour la validation codes fermée (périmètre dossier + UCR)."""
|
|
|
|
def test_code_in_dossier_no_warning(self):
|
|
"""Code du dossier cité → pas de warning."""
|
|
parsed = {"conclusion": "Le code K81.0 est justifié par la cholécystite."}
|
|
dossier = _make_dossier() # DP K81.0, DAS K56.0
|
|
controle = _make_controle()
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
assert len(warnings) == 0
|
|
|
|
def test_code_from_ucr_no_warning(self):
|
|
"""Code proposé par l'UCR cité → pas de warning."""
|
|
parsed = {"conclusion": "Le code K56.0 contesté par l'UCR est bien justifié."}
|
|
dossier = _make_dossier()
|
|
controle = _make_controle() # da_ucr="K56.0"
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
assert len(warnings) == 0
|
|
|
|
def test_invented_code_detected(self):
|
|
"""Code absent du dossier et de l'UCR → warning."""
|
|
parsed = {"conclusion": "Le code Z45.8 confirme la nécessité du séjour."}
|
|
dossier = _make_dossier() # DP K81.0, DAS K56.0
|
|
controle = _make_controle() # da_ucr=K56.0
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
assert len(warnings) >= 1
|
|
assert any("Z45" in w for w in warnings)
|
|
|
|
def test_subcode_tolerated(self):
|
|
"""K81.09 toléré quand K81.0 est dans la whitelist (même préfixe 3 chars)."""
|
|
parsed = {"contre_arguments_medicaux": "Le sous-code K81.09 est une précision de K81.0."}
|
|
dossier = _make_dossier() # DP K81.0
|
|
controle = _make_controle()
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
# K81.09 partage le préfixe K81 avec K81.0 → toléré
|
|
assert len(warnings) == 0
|
|
|
|
def test_codes_in_citations_excluded(self):
|
|
"""Codes dans references[].citation → pas de validation."""
|
|
parsed = {
|
|
"conclusion": "Le codage est justifié.",
|
|
"references": [
|
|
{"document": "CIM-10", "citation": "Z45.8 — Ajustement d'un dispositif"},
|
|
],
|
|
}
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
# Z45.8 est dans references, pas dans les champs textuels → pas flaggé
|
|
assert len(warnings) == 0
|
|
|
|
def test_no_codes_in_response_no_warning(self):
|
|
"""Réponse sans codes CIM-10 → 0 warnings."""
|
|
parsed = {"conclusion": "Le séjour est justifié par la gravité clinique."}
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
assert len(warnings) == 0
|
|
|
|
def test_multiple_invented_codes(self):
|
|
"""Plusieurs codes hors périmètre → autant de warnings."""
|
|
parsed = {
|
|
"contre_arguments_medicaux": "Les codes Z45.8 et E11.9 confirment le diagnostic.",
|
|
}
|
|
dossier = _make_dossier() # K81.0, K56.0
|
|
controle = _make_controle()
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
assert len(warnings) >= 2
|
|
|
|
def test_no_whitelist_no_validation(self):
|
|
"""Aucun code dans le dossier ni l'UCR → pas de validation (0 warnings)."""
|
|
parsed = {"conclusion": "Le code Z45.8 est justifié."}
|
|
dossier = DossierMedical(source_file="test.pdf", diagnostic_principal=None)
|
|
controle = ControleCPAM(
|
|
numero_ogc=1, titre="Test", arg_ucr="Test",
|
|
decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
|
|
)
|
|
warnings = _validate_codes_in_response(parsed, dossier, controle)
|
|
assert len(warnings) == 0
|
|
|
|
|
|
class TestBuildBioSummary:
|
|
"""Tests pour le résumé biologique déterministe."""
|
|
|
|
def test_bio_summary_interpretation(self):
|
|
"""CRP élevée, Hb basse → résumé correct avec interprétations cliniques."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
biologie_cle=[
|
|
BiologieCle(test="CRP", valeur="180 mg/L", anomalie=True),
|
|
BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True),
|
|
],
|
|
)
|
|
summary = _build_bio_summary(dossier)
|
|
|
|
assert "CRP" in summary
|
|
assert "ÉLEVÉ" in summary
|
|
assert "infection/inflammation active" in summary
|
|
assert "Hémoglobine" in summary
|
|
assert "BAS" in summary
|
|
assert "anémie" in summary
|
|
|
|
def test_bio_summary_normal_values(self):
|
|
"""Valeurs normales → interprétation 'normal' affichée."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
biologie_cle=[
|
|
BiologieCle(test="Plaquettes", valeur="250 G/L", anomalie=False),
|
|
],
|
|
)
|
|
summary = _build_bio_summary(dossier)
|
|
|
|
assert "NORMAL" in summary
|
|
assert "numération normale" in summary
|
|
|
|
def test_bio_summary_in_prompt(self):
|
|
"""Le résumé bio apparaît dans le prompt CPAM."""
|
|
dossier = _make_dossier_complet() # CRP 180, Créatinine 450
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
|
|
assert "FAITS BIOLOGIQUES VÉRIFIÉS" in prompt
|
|
assert "NE PAS MODIFIER" in prompt
|
|
assert "RÈGLE STRICTE" in prompt
|
|
|
|
def test_bio_summary_empty_no_bio(self):
|
|
"""Pas de biologie → résumé vide."""
|
|
dossier = DossierMedical(source_file="test.pdf")
|
|
summary = _build_bio_summary(dossier)
|
|
assert summary == ""
|
|
|
|
def test_bio_summary_unknown_test(self):
|
|
"""Test bio non reconnu (hors BIO_NORMALS) → omis du résumé."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
biologie_cle=[
|
|
BiologieCle(test="Vitamine D", valeur="15 ng/mL", anomalie=True),
|
|
],
|
|
)
|
|
summary = _build_bio_summary(dossier)
|
|
assert summary == ""
|
|
|
|
def test_bio_summary_unparseable_value(self):
|
|
"""Valeur bio non parseable → omise sans crash."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
biologie_cle=[
|
|
BiologieCle(test="CRP", valeur="positif", anomalie=True),
|
|
BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True),
|
|
],
|
|
)
|
|
summary = _build_bio_summary(dossier)
|
|
# CRP "positif" non parseable → omis, mais Hb présente
|
|
assert "Hémoglobine" in summary
|
|
assert "CRP" not in summary
|
|
|
|
|
|
class TestCorrectionLoop:
|
|
"""Tests pour la boucle de correction adversariale."""
|
|
|
|
@patch("src.control.cpam_response.rule_enabled", return_value=True)
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_correction_triggered_when_score_low(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
|
|
"""Score adversarial ≤ 5 → correction relancée (5 appels LLM total)."""
|
|
mock_rag.return_value = []
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
# Passe 1 extraction
|
|
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
# Passe 2 argumentation
|
|
return {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments_medicaux": "Arguments erronés...",
|
|
"conclusion": "Conclusion avec erreurs...",
|
|
}
|
|
elif call_count["n"] == 3:
|
|
# Passe 3 validation adversariale → score bas
|
|
return {"coherent": False, "erreurs": ["CRP citée à 250 mais vaut 180"], "score_confiance": 3}
|
|
elif call_count["n"] == 4:
|
|
# Passe 4 correction
|
|
return {
|
|
"analyse_contestation": "Analyse corrigée...",
|
|
"contre_arguments_medicaux": "Arguments corrigés...",
|
|
"conclusion": "Conclusion corrigée...",
|
|
}
|
|
else:
|
|
# Passe 5 re-validation
|
|
return {"coherent": True, "erreurs": [], "score_confiance": 8}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
# 5 appels Ollama : extraction + argumentation + validation + correction + re-validation
|
|
assert call_count["n"] == 5
|
|
# La correction a été acceptée (score 8 > 3)
|
|
assert "corrigé" in text.lower()
|
|
|
|
@patch("src.control.cpam_response.rule_enabled", return_value=True)
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_no_correction_when_score_high(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
|
|
"""Score adversarial > 5 → pas de correction (3 appels LLM)."""
|
|
mock_rag.return_value = []
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
else:
|
|
return {"coherent": True, "erreurs": [], "score_confiance": 8}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
# Seulement 3 appels : extraction + argumentation + validation
|
|
assert call_count["n"] == 3
|
|
|
|
@patch("src.control.cpam_response.rule_enabled", return_value=True)
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_correction_accepted_when_score_improves(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
|
|
"""Score passe de 3 à 7 → correction acceptée."""
|
|
mock_rag.return_value = []
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse originale...",
|
|
"contre_arguments_medicaux": "Arguments originaux...",
|
|
"conclusion": "Conclusion originale...",
|
|
}
|
|
elif call_count["n"] == 3:
|
|
return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 3}
|
|
elif call_count["n"] == 4:
|
|
return {
|
|
"analyse_contestation": "Analyse améliorée...",
|
|
"contre_arguments_medicaux": "Arguments améliorés...",
|
|
"conclusion": "Conclusion améliorée...",
|
|
}
|
|
else:
|
|
return {"coherent": True, "erreurs": [], "score_confiance": 7}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
# Le résultat final est la correction
|
|
assert response_data["conclusion"] == "Conclusion améliorée..."
|
|
|
|
@patch("src.control.cpam_response.rule_enabled", return_value=True)
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_correction_rejected_when_score_same(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
|
|
"""Score ne s'améliore pas → original conservé."""
|
|
mock_rag.return_value = []
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse originale...",
|
|
"contre_arguments_medicaux": "Arguments originaux...",
|
|
"conclusion": "Conclusion originale...",
|
|
}
|
|
elif call_count["n"] == 3:
|
|
return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 4}
|
|
elif call_count["n"] == 4:
|
|
return {
|
|
"analyse_contestation": "Correction pire...",
|
|
"contre_arguments_medicaux": "Arguments pires...",
|
|
"conclusion": "Conclusion pire...",
|
|
}
|
|
else:
|
|
return {"coherent": False, "erreurs": ["Encore des erreurs"], "score_confiance": 3}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
# Score correction (3) <= score original (4) → original conservé
|
|
assert response_data["conclusion"] == "Conclusion originale..."
|
|
|
|
@patch("src.control.cpam_response.rule_enabled", return_value=False)
|
|
@patch("src.control.cpam_validation.call_ollama")
|
|
@patch("src.control.cpam_response.call_ollama")
|
|
@patch("src.control.cpam_response.call_anthropic")
|
|
@patch("src.control.cpam_response._search_rag_for_control")
|
|
def test_correction_disabled_by_rule(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
|
|
"""RULE-CPAM-CORRECTION-LOOP désactivée → pas de retry."""
|
|
mock_rag.return_value = []
|
|
call_count = {"n": 0}
|
|
|
|
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
|
|
elif call_count["n"] == 2:
|
|
return {
|
|
"analyse_contestation": "Analyse...",
|
|
"contre_arguments_medicaux": "Arguments...",
|
|
"conclusion": "Conclusion...",
|
|
}
|
|
else:
|
|
return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 2}
|
|
|
|
mock_ollama.side_effect = ollama_side_effect
|
|
mock_val_ollama.side_effect = ollama_side_effect
|
|
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
text, response_data, sources = generate_cpam_response(dossier, controle)
|
|
|
|
# Seulement 3 appels, pas de correction (règle désactivée)
|
|
assert call_count["n"] == 3
|
|
|
|
def test_build_correction_prompt_format(self):
|
|
"""Le prompt de correction contient les erreurs et la réponse originale."""
|
|
original_prompt = "Prompt d'argumentation original..."
|
|
original_response = {
|
|
"analyse_contestation": "Analyse avec erreur CRP 250",
|
|
"conclusion": "Conclusion erronée",
|
|
}
|
|
adversarial_result = {
|
|
"coherent": False,
|
|
"erreurs": ["CRP citée à 250 mg/L mais le dossier indique 180 mg/L"],
|
|
"score_confiance": 3,
|
|
}
|
|
|
|
correction = _build_correction_prompt(original_prompt, original_response, adversarial_result)
|
|
|
|
assert "CORRECTION REQUISE" in correction
|
|
assert "CRP citée à 250" in correction
|
|
assert "Prompt d'argumentation original" in correction
|
|
assert "Corrige UNIQUEMENT" in correction
|
|
|
|
|
|
class TestAssessQualityTier:
|
|
"""Tests pour la classification qualité CPAM (A/B/C)."""
|
|
|
|
def test_tier_a_no_warnings_high_score(self):
|
|
"""0 warning, score adversarial >= 7 → tier A, requires_review=False."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=[],
|
|
code_warnings=[],
|
|
adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 9},
|
|
)
|
|
assert tier == "A"
|
|
assert review is False
|
|
assert len(warnings) == 0
|
|
|
|
def test_tier_b_ref_warnings(self):
|
|
"""Warnings de référence → tier B."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=["Référence non vérifiable : Manuel Inventé"],
|
|
grounding_warnings=[],
|
|
code_warnings=[],
|
|
adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 8},
|
|
)
|
|
assert tier == "B"
|
|
assert review is False
|
|
assert any("[MINEUR]" in w for w in warnings)
|
|
|
|
def test_tier_b_medium_adversarial_score(self):
|
|
"""Score adversarial 4-6 → tier B."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=[],
|
|
code_warnings=[],
|
|
adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 5},
|
|
)
|
|
assert tier == "B"
|
|
assert review is False
|
|
|
|
def test_tier_b_one_grounding_warning(self):
|
|
"""1 preuve non traçable → tier B (mineur)."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=["Preuve [BIO-99] non traçable"],
|
|
code_warnings=[],
|
|
adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 8},
|
|
)
|
|
assert tier == "B"
|
|
assert review is False
|
|
assert any("[MINEUR]" in w for w in warnings)
|
|
|
|
def test_tier_c_code_warnings(self):
|
|
"""Code hors périmètre → tier C, requires_review=True."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=[],
|
|
code_warnings=["Code Z45.8 hors périmètre dossier/UCR"],
|
|
adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 7},
|
|
)
|
|
assert tier == "C"
|
|
assert review is True
|
|
assert any("[CRITIQUE]" in w for w in warnings)
|
|
|
|
def test_tier_c_low_adversarial_score(self):
|
|
"""Score adversarial < 4 → tier C."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=[],
|
|
code_warnings=[],
|
|
adversarial_result={"coherent": False, "erreurs": ["Bio inventée"], "score_confiance": 2},
|
|
)
|
|
assert tier == "C"
|
|
assert review is True
|
|
assert any("[CRITIQUE]" in w for w in warnings)
|
|
|
|
def test_tier_c_many_grounding_warnings(self):
|
|
"""3+ preuves non traçables → tier C (critique)."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=[
|
|
"Preuve [BIO-1] non traçable",
|
|
"Preuve [BIO-2] non traçable",
|
|
"Preuve [BIO-3] non traçable",
|
|
],
|
|
code_warnings=[],
|
|
adversarial_result={"coherent": True, "erreurs": [], "score_confiance": 7},
|
|
)
|
|
assert tier == "C"
|
|
assert review is True
|
|
|
|
def test_tier_a_no_adversarial(self):
|
|
"""Pas de validation adversariale (None) + 0 warnings → tier A."""
|
|
tier, review, warnings = _assess_quality_tier(
|
|
parsed={},
|
|
ref_warnings=[],
|
|
grounding_warnings=[],
|
|
code_warnings=[],
|
|
adversarial_result=None,
|
|
)
|
|
assert tier == "A"
|
|
assert review is False
|
|
|
|
|
|
class TestFormatResponseCategorized:
|
|
"""Tests pour le formatage avec warnings catégorisés et quality_tier."""
|
|
|
|
def test_tier_c_banner(self):
|
|
"""Tier C → bandeau REVUE MANUELLE REQUISE."""
|
|
text = _format_response(
|
|
{"conclusion": "Conclusion..."},
|
|
quality_tier="C",
|
|
categorized_warnings=["[CRITIQUE] Code hors périmètre"],
|
|
)
|
|
assert "REVUE MANUELLE REQUISE" in text
|
|
assert "Qualité : C" in text
|
|
assert "AVERTISSEMENTS CRITIQUES" in text
|
|
|
|
def test_tier_a_no_banner(self):
|
|
"""Tier A → pas de bandeau."""
|
|
text = _format_response(
|
|
{"conclusion": "Conclusion..."},
|
|
quality_tier="A",
|
|
categorized_warnings=[],
|
|
)
|
|
assert "REVUE MANUELLE REQUISE" not in text
|
|
|
|
def test_warnings_separated(self):
|
|
"""Warnings critiques et mineurs dans des sections distinctes."""
|
|
text = _format_response(
|
|
{"conclusion": "Conclusion..."},
|
|
quality_tier="C",
|
|
categorized_warnings=[
|
|
"[CRITIQUE] Code Z45.8 hors périmètre",
|
|
"[MINEUR] Référence non vérifiable",
|
|
],
|
|
)
|
|
assert "AVERTISSEMENTS CRITIQUES" in text
|
|
assert "AVERTISSEMENTS MINEURS" in text
|
|
assert text.index("CRITIQUES") < text.index("MINEURS")
|
|
|
|
def test_backward_compat_old_ref_warnings(self):
|
|
"""Sans categorized_warnings, fallback sur ref_warnings."""
|
|
text = _format_response(
|
|
{"conclusion": "Conclusion..."},
|
|
ref_warnings=["Référence non vérifiable : X"],
|
|
)
|
|
assert "AVERTISSEMENT — REFERENCES NON VÉRIFIÉES" in text
|
|
|
|
|
|
class TestCheckDasBioCoherenceExtended:
|
|
"""Tests pour les nouveaux patterns DAS/bio (Phase 5)."""
|
|
|
|
def test_sepsis_with_normal_crp(self):
|
|
"""DAS 'sepsis' mais CRP normale → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Sepsis sévère", cim10_suggestion="A41.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="CRP", valeur="3", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
assert any("Sepsis" in w or "sepsis" in w for w in warnings)
|
|
|
|
def test_infarctus_with_normal_troponine(self):
|
|
"""DAS 'infarctus' mais troponine normale → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Infarctus du myocarde", cim10_suggestion="I21.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Troponine", valeur="0.01", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
|
|
def test_infarctus_with_high_troponine_ok(self):
|
|
"""DAS 'infarctus' + troponine élevée → pas d'incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Infarctus du myocarde", cim10_suggestion="I21.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Troponine", valeur="0.5", anomalie=True),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) == 0
|
|
|
|
def test_denutrition_with_normal_albumine(self):
|
|
"""DAS 'dénutrition' mais albumine normale → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Dénutrition sévère", cim10_suggestion="E43"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Albumine", valeur="42", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
|
|
def test_hypothyroidie_with_normal_tsh(self):
|
|
"""DAS 'hypothyroïdie' mais TSH normale → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Hypothyroïdie", cim10_suggestion="E03.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="TSH", valeur="2.5", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
|
|
def test_diabete_with_normal_glycemie(self):
|
|
"""DAS 'diabète' mais glycémie normale → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Diabète de type 2", cim10_suggestion="E11.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Glycémie", valeur="4.5", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
|
|
def test_embolie_pulmonaire_with_normal_d_dimeres(self):
|
|
"""DAS 'embolie pulmonaire' mais D-dimères normaux → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Embolie pulmonaire", cim10_suggestion="I26.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="D-dimères", valeur="200", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
|
|
def test_insuffisance_renale_with_normal_creatinine(self):
|
|
"""DAS 'insuffisance rénale' mais créatinine normale → incohérence."""
|
|
dossier = DossierMedical(
|
|
source_file="test.pdf",
|
|
diagnostics_associes=[
|
|
Diagnostic(texte="Insuffisance rénale aiguë", cim10_suggestion="N17.9"),
|
|
],
|
|
biologie_cle=[
|
|
BiologieCle(test="Créatinine", valeur="80", anomalie=False),
|
|
],
|
|
)
|
|
warnings = _check_das_bio_coherence(dossier)
|
|
assert len(warnings) >= 1
|
|
|
|
|
|
class TestCodesAutorisesWhitelist:
|
|
"""Tests pour la whitelist de codes autorisés (anti-hallucination)."""
|
|
|
|
def test_whitelist_in_prompt(self):
|
|
"""Le prompt contient la section PÉRIMÈTRE DE CODES AUTORISÉS."""
|
|
dossier = _make_dossier() # DP K81.0, DAS K56.0
|
|
controle = _make_controle() # dp_ucr=K80.1, da_ucr=K56.0
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
assert "PÉRIMÈTRE DE CODES AUTORISÉS" in prompt
|
|
assert "INTERDICTION" in prompt
|
|
|
|
def test_whitelist_contains_dossier_codes(self):
|
|
"""Tous les codes du dossier sont dans la whitelist."""
|
|
dossier = _make_dossier() # DP K81.0, DAS K56.0
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
assert "K81.0" in prompt
|
|
assert "K56.0" in prompt
|
|
|
|
def test_whitelist_contains_ucr_codes(self):
|
|
"""Tous les codes UCR sont dans la whitelist."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
controle.dp_ucr = "K80.1"
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
assert "K80.1" in prompt
|
|
|
|
def test_whitelist_dedup(self):
|
|
"""Les codes en double (dossier + UCR) ne sont listés qu'une fois."""
|
|
dossier = _make_dossier() # K56.0 en DAS
|
|
controle = _make_controle() # da_ucr=K56.0
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
# K56.0 apparaît dans PÉRIMÈTRE mais une seule fois dans cette section
|
|
perimetre_idx = prompt.index("PÉRIMÈTRE DE CODES AUTORISÉS")
|
|
interdit_idx = prompt.index("INTERDICTION")
|
|
perimetre_section = prompt[perimetre_idx:interdit_idx]
|
|
assert perimetre_section.count("K56.0") == 1
|
|
|
|
def test_whitelist_prohibition_message(self):
|
|
"""Le message d'interdiction est clair et complet."""
|
|
dossier = _make_dossier()
|
|
controle = _make_controle()
|
|
prompt, _ = _build_cpam_prompt(dossier, controle, [])
|
|
assert "Ne mentionne AUCUN code CIM-10 qui ne figure pas" in prompt
|