feat: mode hybride Ollama — gemma3:27b pour CPAM, 12b pour codage

Le pipeline utilise désormais gemma3:12b (rapide) pour le codage CIM-10
et gemma3:27b (meilleur raisonnement) pour la contre-argumentation CPAM.
Configurable via OLLAMA_MODEL_CPAM et OLLAMA_TIMEOUT_CPAM.

Inclut aussi : traçabilité source/page DAS, niveaux CMA ATIH, sévérité,
page tracker PDF, améliorations fusion et filtres DAS.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-17 17:53:53 +01:00
parent 4ef42dd3d3
commit 01d47f3c4b
20 changed files with 1025 additions and 98 deletions

View File

@@ -19,6 +19,7 @@ from src.control.cpam_response import (
_build_cpam_prompt,
_format_response,
_search_rag_for_control,
_validate_references,
generate_cpam_response,
)
@@ -173,6 +174,31 @@ class TestBuildPrompt:
assert "contre_arguments_asymetrie" in prompt
assert "contre_arguments_reglementaires" in prompt
def test_prompt_contains_cite_exacts(self):
"""Le prompt renforcé demande des preuves exactes."""
dossier = _make_dossier()
controle = _make_controle()
prompt = _build_cpam_prompt(dossier, controle, [])
assert "CITE" in prompt
assert "EXACTS" in prompt
def test_prompt_contains_interdiction(self):
"""Le prompt interdit les références inventées."""
dossier = _make_dossier()
controle = _make_controle()
prompt = _build_cpam_prompt(dossier, controle, [])
assert "INTERDICTION ABSOLUE" in prompt
def test_prompt_contains_preuves_dossier_field(self):
"""Le format JSON demandé inclut preuves_dossier."""
dossier = _make_dossier()
controle = _make_controle()
prompt = _build_cpam_prompt(dossier, controle, [])
assert "preuves_dossier" in prompt
class TestFormatResponse:
def test_full_response_new_format(self):
@@ -236,11 +262,94 @@ class TestFormatResponse:
text = _format_response({})
assert text == ""
def test_preuves_dossier_formatting(self):
"""Le nouveau champ preuves_dossier est formaté correctement."""
parsed = {
"contre_arguments_medicaux": "Arguments...",
"preuves_dossier": [
{"element": "biologie", "valeur": "CRP 180 mg/L", "signification": "inflammation sévère"},
{"element": "imagerie", "valeur": "lithiase cholédocienne", "signification": "confirme le diagnostic"},
],
"conclusion": "Conclusion...",
}
text = _format_response(parsed)
assert "PREUVES DU DOSSIER" in text
assert "CRP 180 mg/L" in text
assert "[biologie]" in text
assert "[imagerie]" in text
def test_structured_references_formatting(self):
"""Les références structurées sont formatées correctement."""
parsed = {
"contre_arguments_medicaux": "Arguments...",
"references": [
{"document": "Guide Méthodologique MCO 2026", "page": "64", "citation": "Le DAS doit être..."},
],
"conclusion": "Conclusion...",
}
text = _format_response(parsed)
assert "REFERENCES" in text
assert "Guide Méthodologique MCO 2026" in text
assert "p.64" in text
assert "Le DAS doit être..." in text
def test_ref_warnings_appended(self):
"""Les avertissements de références non vérifiées apparaissent."""
parsed = {"conclusion": "Conclusion..."}
warnings = ["Référence non vérifiable : Manuel Imaginaire 2025"]
text = _format_response(parsed, ref_warnings=warnings)
assert "AVERTISSEMENT" in text
assert "Manuel Imaginaire 2025" in text
class TestValidateReferences:
def test_valid_reference_no_warning(self):
parsed = {
"references": [
{"document": "Guide Méthodologique MCO 2026", "page": "64", "citation": "..."},
]
}
sources = [{"document": "guide_methodo", "page": 64, "extrait": "..."}]
warnings = _validate_references(parsed, sources)
assert len(warnings) == 0
def test_invented_reference_detected(self):
parsed = {
"references": [
{"document": "Manuel Inventé 2025", "page": "12", "citation": "..."},
]
}
sources = [{"document": "guide_methodo", "page": 64, "extrait": "..."}]
warnings = _validate_references(parsed, sources)
assert len(warnings) == 1
assert "Manuel Inventé" in warnings[0]
def test_old_format_string_no_crash(self):
"""L'ancien format string pour references ne cause pas de crash."""
parsed = {"references": "Guide méthodo p.64"}
sources = [{"document": "guide_methodo"}]
warnings = _validate_references(parsed, sources)
assert len(warnings) == 0 # pas de validation sur l'ancien format
def test_no_sources_no_validation(self):
parsed = {
"references": [
{"document": "Quelque chose", "page": "1", "citation": "..."},
]
}
warnings = _validate_references(parsed, [])
assert len(warnings) == 0
class TestGenerateResponse:
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_success(self, mock_rag, mock_ollama):
def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama):
"""Mode hybride : Ollama CPAM (27b) disponible → utilisé en premier."""
mock_rag.return_value = [
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
]
@@ -259,12 +368,42 @@ class TestGenerateResponse:
assert "Contre-arguments médicaux..." in text
assert len(sources) == 1
assert sources[0].document == "guide_methodo"
# Ollama CPAM appelé en premier (avec model= et timeout=)
mock_ollama.assert_called_once()
mock_anthropic.assert_not_called()
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_ollama_unavailable(self, mock_rag, mock_ollama):
def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama):
"""Ollama CPAM indisponible → fallback Haiku."""
mock_rag.return_value = [
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
]
mock_ollama.return_value = None
mock_anthropic.return_value = {
"analyse_contestation": "Analyse Haiku...",
"contre_arguments_medicaux": "Contre-args Haiku...",
"conclusion": "Conclusion Haiku...",
}
dossier = _make_dossier()
controle = _make_controle()
text, sources = generate_cpam_response(dossier, controle)
assert "Contre-args Haiku..." in text
# Ollama CPAM appelé d'abord (échec), puis Haiku
mock_ollama.assert_called_once()
mock_anthropic.assert_called_once()
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama):
"""Ollama CPAM, Haiku et Ollama défaut tous indisponibles → texte vide."""
mock_rag.return_value = []
mock_anthropic.return_value = None
mock_ollama.return_value = None
dossier = _make_dossier()

View File

@@ -2,7 +2,12 @@
import pytest
from src.medical.das_filter import clean_diagnostic_text, is_valid_diagnostic_text, correct_known_miscodes
from src.medical.das_filter import (
clean_diagnostic_text,
is_valid_diagnostic_text,
correct_known_miscodes,
SEMANTIC_REDUNDANCIES,
)
class TestCleanDiagnosticText:
@@ -223,3 +228,24 @@ class TestCorrectKnownMiscodes:
def test_d64_9_pas_corrige(self):
"""D64.9 lui-même → pas de correction."""
assert correct_known_miscodes("D64.9", "Anémie") is None
class TestSemanticRedundanciesStructure:
"""Vérifie le format de la constante SEMANTIC_REDUNDANCIES."""
def test_is_list_of_tuples(self):
assert isinstance(SEMANTIC_REDUNDANCIES, list)
for item in SEMANTIC_REDUNDANCIES:
assert isinstance(item, tuple)
assert len(item) == 2
dominated, dominants = item
assert isinstance(dominated, str)
assert isinstance(dominants, list)
for d in dominants:
assert isinstance(d, str)
def test_has_known_rules(self):
prefixes = {item[0] for item in SEMANTIC_REDUNDANCIES}
assert "I10" in prefixes
assert "N30" in prefixes
assert "J18" in prefixes

View File

@@ -20,6 +20,7 @@ from src.medical.fusion import (
_dedup_actes,
_is_enriched,
)
from src.medical.das_filter import apply_semantic_dedup
class TestCIM10Specificity:
@@ -354,3 +355,139 @@ class TestDedupPreferEnriched:
result = _dedup_diagnostics(das)
assert len(result) == 1
assert result[0].cim10_confidence == "high"
class TestDasFamilyDpRemoved:
"""Vérifie la dédup DAS vs DP par famille CIM-10 (3 premiers caractères)."""
def test_same_family_removed(self):
"""DP=K85.1, DAS=[K85.0, K85.9, E66.0] → seul E66.0 reste."""
d1 = DossierMedical(
diagnostic_principal=Diagnostic(texte="Pancréatite biliaire", cim10_suggestion="K85.1"),
diagnostics_associes=[
Diagnostic(texte="Pancréatite SAI", cim10_suggestion="K85.0"),
Diagnostic(texte="Pancréatite aiguë", cim10_suggestion="K85.9"),
Diagnostic(texte="Obésité", cim10_suggestion="E66.0"),
],
)
result = merge_dossiers([d1])
das_codes = {d.cim10_suggestion for d in result.diagnostics_associes}
assert "K85.0" not in das_codes
assert "K85.9" not in das_codes
assert "E66.0" in das_codes
def test_trauma_siblings_kept(self):
"""S/T : sites anatomiques différents → tous gardés."""
d1 = DossierMedical(
diagnostic_principal=Diagnostic(texte="Fracture col fémoral", cim10_suggestion="S72.1"),
diagnostics_associes=[
Diagnostic(texte="Fracture trochanter", cim10_suggestion="S72.0"),
Diagnostic(texte="Fracture sous-troch", cim10_suggestion="S72.3"),
],
)
result = merge_dossiers([d1])
das_codes = {d.cim10_suggestion for d in result.diagnostics_associes}
assert "S72.0" in das_codes
assert "S72.3" in das_codes
def test_diabetes_complications_kept(self):
"""E10-E14 : complications distinctes → tous gardés."""
d1 = DossierMedical(
diagnostic_principal=Diagnostic(texte="Diabète avec complications oculaires", cim10_suggestion="E11.6"),
diagnostics_associes=[
Diagnostic(texte="Diabète avec complications rénales", cim10_suggestion="E11.2"),
Diagnostic(texte="HTA essentielle", cim10_suggestion="I10"),
],
)
result = merge_dossiers([d1])
das_codes = {d.cim10_suggestion for d in result.diagnostics_associes}
assert "E11.2" in das_codes
assert "I10" in das_codes
def test_parent_child_removed(self):
"""DP=K85.1, DAS=[K85] → K85 (parent) retiré."""
d1 = DossierMedical(
diagnostic_principal=Diagnostic(texte="Pancréatite biliaire", cim10_suggestion="K85.1"),
diagnostics_associes=[
Diagnostic(texte="Pancréatite", cim10_suggestion="K85"),
],
)
result = merge_dossiers([d1])
das_codes = {d.cim10_suggestion for d in result.diagnostics_associes}
assert len(das_codes) == 0
def test_ocr_dp_not_promoted(self):
"""Fusion avec DP artefact OCR 'À 09' → pas promu en DAS."""
d1 = DossierMedical(
diagnostic_principal=Diagnostic(texte="Pancréatite biliaire", cim10_suggestion="K85.1"),
)
d2 = DossierMedical(
diagnostic_principal=Diagnostic(texte="À 09", cim10_suggestion="A41.9"),
)
result = merge_dossiers([d1, d2])
das_codes = {d.cim10_suggestion for d in result.diagnostics_associes}
assert "A41.9" not in das_codes
class TestSemanticDedup:
"""Vérifie les redondances sémantiques entre DAS."""
def test_i10_removed_when_i11_present(self):
"""I10 (HTA essentielle) retiré si I11.9 (cardiopathie hypertensive) présent."""
das = [
Diagnostic(texte="HTA essentielle", cim10_suggestion="I10"),
Diagnostic(texte="Cardiopathie hypertensive", cim10_suggestion="I11.9"),
Diagnostic(texte="Obésité", cim10_suggestion="E66.0"),
]
result = apply_semantic_dedup(das)
codes = {d.cim10_suggestion for d in result}
assert "I10" not in codes
assert "I11.9" in codes
assert "E66.0" in codes
def test_n30_removed_when_n39_present(self):
"""N30.9 (cystite) retiré si N39.0 (infection urinaire) présent."""
das = [
Diagnostic(texte="Infection urinaire", cim10_suggestion="N39.0"),
Diagnostic(texte="Cystite SAI", cim10_suggestion="N30.9"),
]
result = apply_semantic_dedup(das)
codes = {d.cim10_suggestion for d in result}
assert "N39.0" in codes
assert "N30.9" not in codes
def test_j18_removed_when_j15_present(self):
"""J18.9 (pneumonie SAI) retiré si J15.1 (pneumonie spécifique) présent."""
das = [
Diagnostic(texte="Pneumonie SAI", cim10_suggestion="J18.9"),
Diagnostic(texte="Pneumonie à Klebsiella", cim10_suggestion="J15.1"),
]
result = apply_semantic_dedup(das)
codes = {d.cim10_suggestion for d in result}
assert "J15.1" in codes
assert "J18.9" not in codes
def test_no_removal_without_dominant(self):
"""I10 conservé si aucun code dominant I11/I12/I13."""
das = [
Diagnostic(texte="HTA essentielle", cim10_suggestion="I10"),
Diagnostic(texte="Obésité", cim10_suggestion="E66.0"),
]
result = apply_semantic_dedup(das)
codes = {d.cim10_suggestion for d in result}
assert "I10" in codes
assert "E66.0" in codes
def test_semantic_dedup_in_merge(self):
"""Vérifie que la dédup sémantique est appliquée lors de la fusion."""
d1 = DossierMedical(
diagnostic_principal=Diagnostic(texte="Sepsis", cim10_suggestion="A41.9"),
diagnostics_associes=[
Diagnostic(texte="HTA essentielle", cim10_suggestion="I10"),
Diagnostic(texte="Cardiopathie hypertensive", cim10_suggestion="I11.9"),
],
)
result = merge_dossiers([d1])
das_codes = {d.cim10_suggestion for d in result.diagnostics_associes}
assert "I10" not in das_codes
assert "I11.9" in das_codes

View File

@@ -0,0 +1,95 @@
"""Tests pour le module PageTracker (traçabilité source)."""
import pytest
from src.extraction.page_tracker import PageTracker
class TestCharToPage:
def test_first_page(self):
pt = PageTracker([(0, 100), (102, 200)])
assert pt.char_to_page(0) == 1
assert pt.char_to_page(50) == 1
assert pt.char_to_page(99) == 1
def test_second_page(self):
pt = PageTracker([(0, 100), (102, 200)])
assert pt.char_to_page(102) == 2
assert pt.char_to_page(150) == 2
def test_beyond_last_page(self):
pt = PageTracker([(0, 100), (102, 200)])
assert pt.char_to_page(300) == 2
def test_single_page(self):
pt = PageTracker([(0, 500)])
assert pt.char_to_page(250) == 1
def test_empty_offsets(self):
pt = PageTracker([])
assert pt.char_to_page(0) == 1
class TestFindPageForText:
def _make_tracker(self):
"""Simule un document 3 pages."""
page1 = "Pancréatite aiguë biliaire"
page2 = "Cholécystectomie par coelioscopie"
page3 = "TTT de sortie: Augmentin IV"
sep = "\n\n"
full = sep.join([page1, page2, page3])
offsets = []
offset = 0
for text in [page1, page2, page3]:
offsets.append((offset, offset + len(text)))
offset += len(text) + len(sep)
return PageTracker(offsets), full
def test_exact_match_page1(self):
pt, full = self._make_tracker()
assert pt.find_page_for_text("Pancréatite", full) == 1
def test_exact_match_page2(self):
pt, full = self._make_tracker()
assert pt.find_page_for_text("Cholécystectomie", full) == 2
def test_exact_match_page3(self):
pt, full = self._make_tracker()
assert pt.find_page_for_text("Augmentin", full) == 3
def test_case_insensitive(self):
pt, full = self._make_tracker()
assert pt.find_page_for_text("pancréatite", full) == 1
def test_not_found(self):
pt, full = self._make_tracker()
assert pt.find_page_for_text("inexistant", full) is None
def test_empty_text(self):
pt, full = self._make_tracker()
assert pt.find_page_for_text("", full) is None
class TestExtractExcerpt:
def test_returns_excerpt(self):
text = "A" * 200 + "Pancréatite aiguë" + "B" * 200
pt = PageTracker([(0, len(text))])
excerpt = pt.extract_excerpt("Pancréatite aiguë", text, context_chars=50)
assert excerpt is not None
assert "Pancréatite aiguë" in excerpt
assert excerpt.startswith("...")
assert excerpt.endswith("...")
def test_at_start(self):
text = "Pancréatite aiguë biliaire " + "X" * 200
pt = PageTracker([(0, len(text))])
excerpt = pt.extract_excerpt("Pancréatite", text, context_chars=50)
assert excerpt is not None
assert not excerpt.startswith("...")
def test_not_found(self):
text = "Texte sans rapport"
pt = PageTracker([(0, len(text))])
assert pt.extract_excerpt("inexistant", text) is None

View File

@@ -8,6 +8,7 @@ from src.medical.severity import (
enrich_dossier_severity,
_detect_severity_markers,
_is_heuristic_cma,
_load_cma_levels,
)
@@ -59,6 +60,49 @@ class TestHeuristicCMA:
assert _is_heuristic_cma(None) is False
class TestCMALevels:
"""Tests pour le lookup CMA officiel ATIH."""
def test_load_cma_levels(self):
levels = _load_cma_levels()
assert len(levels) > 0
# A01.0 est severity 2 dans cocoa_entries
assert levels.get("A01.0") == 2
def test_official_level_4(self):
"""Un code CMA niveau 4 est bien détecté."""
levels = _load_cma_levels()
level4_codes = [k for k, v in levels.items() if v == 4]
assert len(level4_codes) > 0
def test_official_level_propagated(self):
"""evaluate_severity propage le niveau CMA officiel."""
levels = _load_cma_levels()
# Prendre un code de niveau 3
code_lv3 = next((k for k, v in levels.items() if v == 3), None)
if code_lv3:
diag = Diagnostic(texte="Test diagnostic", cim10_suggestion=code_lv3)
info = evaluate_severity(diag)
assert info.niveau_cma == 3
assert info.est_cma_probable is True
def test_heuristic_fallback_level_2(self):
"""Un code heuristique CMA sans entrée officielle → niveau 2."""
# E11.9 est dans les racines heuristiques ET dans le fichier officiel
# Testons avec un code heuristique qui n'est pas dans le fichier officiel
diag = Diagnostic(texte="Test", cim10_suggestion="E11.9")
info = evaluate_severity(diag)
assert info.est_cma_probable is True
assert info.niveau_cma >= 2
def test_non_cma_remains_level_1(self):
"""Un code non-CMA reste au niveau 1."""
diag = Diagnostic(texte="Grippe", cim10_suggestion="J11.1")
info = evaluate_severity(diag)
if not info.est_cma_probable:
assert info.niveau_cma == 1
class TestEvaluateSeverity:
def test_cma_code_detected(self):
diag = Diagnostic(texte="Diabète type 2", cim10_suggestion="E11.9")
@@ -66,7 +110,8 @@ class TestEvaluateSeverity:
assert info.est_cma_probable is True
def test_non_cma_code(self):
diag = Diagnostic(texte="Pancréatite aiguë biliaire", cim10_suggestion="K85.1")
"""Un code non CMA (J11.1 grippe) n'est pas détecté comme CMA."""
diag = Diagnostic(texte="Grippe", cim10_suggestion="J11.1")
info = evaluate_severity(diag)
assert info.est_cma_probable is False
@@ -82,6 +127,12 @@ class TestEvaluateSeverity:
info = evaluate_severity(diag)
assert info.est_cma_probable is True
def test_niveau_cma_in_result(self):
"""Le champ niveau_cma est toujours renseigné."""
diag = Diagnostic(texte="Sepsis", cim10_suggestion="A41.9")
info = evaluate_severity(diag)
assert info.niveau_cma >= 1
class TestEnrichDossierSeverity:
def test_enriches_das_in_place(self):
@@ -119,3 +170,22 @@ class TestEnrichDossierSeverity:
assert das[0].est_cma is True
assert das[0].est_cms is True
assert cms_count == 1
def test_niveau_cma_set_on_das(self):
"""enrich_dossier_severity propage niveau_cma sur chaque DAS."""
dp = Diagnostic(texte="Pancréatite", cim10_suggestion="K85.1")
das = [
Diagnostic(texte="Fibrillation auriculaire", cim10_suggestion="I48.9"),
]
enrich_dossier_severity(dp, das)
assert das[0].niveau_cma is not None
assert das[0].niveau_cma >= 2
def test_alertes_contain_cma_level(self):
"""Les alertes mentionnent le niveau CMA."""
dp = Diagnostic(texte="Test", cim10_suggestion="K85.1")
das = [
Diagnostic(texte="Sepsis", cim10_suggestion="A41.9"),
]
alertes, _, _ = enrich_dossier_severity(dp, das)
assert any("CMA niveau" in a for a in alertes)

View File

@@ -192,7 +192,7 @@ class TestSplitDocuments:
# --- Test intégration process_pdf ---
class TestProcessPdfMulti:
@patch("src.main.extract_text")
@patch("src.main.extract_text_with_pages")
@patch("src.main.extract_medical_info")
@patch("src.main._run_edsnlp", return_value=None)
@patch("src.main._use_edsnlp", False)
@@ -202,9 +202,10 @@ class TestProcessPdfMulti:
from pathlib import Path
from src.main import process_pdf
from src.config import DossierMedical, Diagnostic
from src.extraction.page_tracker import PageTracker
# Mock extract_text retournant un texte multi-épisodes Trackare
mock_extract.return_value = TRACKARE_MULTI
# Mock extract_text_with_pages retournant un texte multi-épisodes Trackare
mock_extract.return_value = (TRACKARE_MULTI, PageTracker([(0, len(TRACKARE_MULTI))]))
# Mock extract_medical_info retournant un DossierMedical minimal
mock_medical.return_value = DossierMedical(