diff --git a/src/config.py b/src/config.py
index d650eab..78d79de 100644
--- a/src/config.py
+++ b/src/config.py
@@ -188,6 +188,7 @@ class ControleCPAM(BaseModel):
dr_ucr: Optional[str] = None
actes_ucr: Optional[str] = None
contre_argumentation: Optional[str] = None
+ response_data: Optional[dict] = None
sources_reponse: list[RAGSource] = Field(default_factory=list)
diff --git a/src/control/cpam_response.py b/src/control/cpam_response.py
index 8c890ee..d05573b 100644
--- a/src/control/cpam_response.py
+++ b/src/control/cpam_response.py
@@ -3,8 +3,10 @@
from __future__ import annotations
import logging
+import re
from ..config import ControleCPAM, DossierMedical, RAGSource
+from ..medical.cim10_dict import normalize_code, validate_code
from ..medical.ollama_client import call_anthropic, call_ollama
logger = logging.getLogger(__name__)
@@ -42,7 +44,7 @@ def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) ->
query_parts_arg = []
if controle.titre:
query_parts_arg.append(controle.titre)
- arg_short = controle.arg_ucr[:200] if controle.arg_ucr else ""
+ arg_short = controle.arg_ucr[:500] if controle.arg_ucr else ""
if arg_short:
query_parts_arg.append(arg_short)
query_arg = " ".join(query_parts_arg)
@@ -65,6 +67,43 @@ def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) ->
logger.debug(" RAG requête clinique : %d résultats", len(results_clinique))
all_results.extend(results_clinique)
+ # Requête 4 — Définitions CIM-10 des codes contestés
+ contested_codes = []
+ for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr):
+ if field:
+ contested_codes.extend(re.split(r"[,;\s]+", field.strip()))
+ for raw_code in contested_codes:
+ raw_code = raw_code.strip()
+ if not raw_code:
+ continue
+ norm = normalize_code(raw_code)
+ is_valid, label = validate_code(norm)
+ if is_valid and label:
+ query_def = f"CIM-10 {norm} {label} définition inclusion exclusion"
+ else:
+ query_def = f"CIM-10 {norm} définition codage"
+ results_def = search_similar_cpam(query_def, top_k=3)
+ logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def))
+ all_results.extend(results_def)
+
+ # Requête 5 — Règles explicitement citées dans l'argument CPAM
+ if controle.arg_ucr:
+ rule_patterns = [
+ r'(?:R[eè]gle\s*T?\s*\d+)',
+ r'(?:Annexe[\s-]*\d+[A-Za-z]*)',
+ r'(?:Situation de soins?\s+[^.]{5,40})',
+ ]
+ rules_found = []
+ for pattern in rule_patterns:
+ rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE))
+ if rules_found:
+ rules_unique = list(dict.fromkeys(rules_found))
+ query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI"
+ results_rules = search_similar_cpam(query_rules, top_k=4)
+ logger.debug(" RAG requête règles (%s) : %d résultats",
+ ", ".join(rules_unique), len(results_rules))
+ all_results.extend(results_rules)
+
if not all_results:
return []
@@ -79,7 +118,29 @@ def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) ->
seen[key] = r
merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True)
- return merged[:10]
+ return merged[:12]
+
+
+def _get_code_label(code_str: str) -> str:
+ """Résout le libellé CIM-10 pour un ou plusieurs codes."""
+ codes = re.split(r"[,;\s]+", code_str.strip())
+ labels = []
+ for raw in codes:
+ raw = raw.strip()
+ if not raw:
+ continue
+ norm = normalize_code(raw)
+ is_valid, label = validate_code(norm)
+ if is_valid and label:
+ labels.append(f"{norm} — {label}")
+ else:
+ labels.append(norm)
+ if not labels:
+ return ""
+ if len(labels) == 1:
+ parts = labels[0].split(" — ", 1)
+ return f" — {parts[1]}" if len(parts) > 1 else ""
+ return "\n " + "\n ".join(labels)
def _build_cpam_prompt(
@@ -95,6 +156,12 @@ def _build_cpam_prompt(
dp = dossier.diagnostic_principal
dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else ""
dossier_lines.append(f"- DP : {dp.texte}{dp_code}")
+ elif controle.dp_ucr:
+ dp_label = _get_code_label(controle.dp_ucr)
+ dossier_lines.append(
+ f"- DP : code {controle.dp_ucr}{dp_label} "
+ f"(codé par l'établissement, contesté par la CPAM)"
+ )
if dossier.diagnostics_associes:
das_parts = []
@@ -192,14 +259,14 @@ def _build_cpam_prompt(
+ "\n".join(asymetrie_lines)
)
- # Codes contestés par la CPAM
+ # Codes contestés par la CPAM (avec libellés CIM-10 résolus)
codes_contestes = []
if controle.dp_ucr:
- codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}")
+ codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}")
if controle.da_ucr:
- codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}")
+ codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}")
if controle.dr_ucr:
- codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}")
+ codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}")
if controle.actes_ucr:
codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}")
codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé"
@@ -263,6 +330,10 @@ AXE ASYMÉTRIE D'INFORMATION :
- Démontre en quoi ces éléments complémentaires (biologie, imagerie, traitements, actes) justifient le codage contesté
- Ne mentionne AUCUN élément qui n'est pas dans le dossier fourni
+MISE EN FORME :
+- Structure chaque section avec des tirets pour lister les arguments distincts
+- Un argument par puce, avec la preuve ou la référence associée
+
AXE RÉGLEMENTAIRE :
- Identifie si l'UCR fait une interprétation restrictive non fondée d'une règle
- Confronte le raisonnement CPAM au texte EXACT des sources fournies
@@ -406,7 +477,7 @@ def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str
def generate_cpam_response(
dossier: DossierMedical,
controle: ControleCPAM,
-) -> tuple[str, list[RAGSource]]:
+) -> tuple[str, dict | None, list[RAGSource]]:
"""Génère une contre-argumentation pour un contrôle CPAM.
Args:
@@ -414,7 +485,7 @@ def generate_cpam_response(
controle: Le contrôle CPAM à contester.
Returns:
- Tuple (texte de contre-argumentation, sources RAG utilisées).
+ Tuple (texte de contre-argumentation, dict LLM structuré ou None, sources RAG utilisées).
"""
logger.info("CPAM : génération contre-argumentation pour OGC %d — %s",
controle.numero_ogc, controle.titre)
@@ -449,7 +520,7 @@ def generate_cpam_response(
if result is None:
logger.warning(" LLM non disponible — contre-argumentation non générée")
- return "", rag_sources
+ return "", None, rag_sources
# 5. Validation des références
ref_warnings = _validate_references(result, sources)
@@ -460,4 +531,4 @@ def generate_cpam_response(
text = _format_response(result, ref_warnings)
logger.info(" Contre-argumentation générée (%d caractères)", len(text))
- return text, rag_sources
+ return text, result, rag_sources
diff --git a/src/main.py b/src/main.py
index 3ec94cf..ba29c8a 100644
--- a/src/main.py
+++ b/src/main.py
@@ -378,8 +378,9 @@ def main(input_path: str | None = None) -> None:
if target:
logger.info(" CPAM : %d contrôle(s) pour %s", len(controles), subdir)
for ctrl in controles:
- text, sources = generate_cpam_response(target, ctrl)
+ text, response_data, sources = generate_cpam_response(target, ctrl)
ctrl.contre_argumentation = text
+ ctrl.response_data = response_data
ctrl.sources_reponse = sources
target.controles_cpam = controles
except Exception:
diff --git a/src/viewer/app.py b/src/viewer/app.py
index 985622b..dbea3b2 100644
--- a/src/viewer/app.py
+++ b/src/viewer/app.py
@@ -373,6 +373,37 @@ def format_doc_name(name: str) -> str:
return name
+def format_cpam_text(text: str | None) -> Markup:
+ """Convertit un texte CPAM (section) en HTML avec puces et paragraphes."""
+ if not text:
+ return Markup("")
+ from markupsafe import escape
+ lines = str(text).split("\n")
+ html_parts: list[str] = []
+ in_list = False
+ for line in lines:
+ stripped = line.strip()
+ if not stripped:
+ if in_list:
+ html_parts.append("")
+ in_list = False
+ html_parts.append("
")
+ continue
+ if stripped.startswith("- "):
+ if not in_list:
+ html_parts.append("
{escape(stripped)}
") + if in_list: + html_parts.append("") + return Markup("\n".join(html_parts)) + + # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- @@ -387,6 +418,7 @@ def create_app() -> Flask: app.jinja_env.filters["format_duration"] = format_duration app.jinja_env.filters["format_dossier_name"] = format_dossier_name app.jinja_env.filters["format_doc_name"] = format_doc_name + app.jinja_env.filters["format_cpam_text"] = format_cpam_text ccam_dict = load_ccam_dict() @@ -445,36 +477,119 @@ def create_app() -> Flask: @app.route("/reprocess/+ [{{ ref.document or '' }}{% if ref.page %}, p.{{ ref.page }}{% endif %}] + {{ ref.citation or '' }} ++ {% elif ref is string %} +
{{ ref }}
+ {% endif %} + {% endfor %} +{{ ctrl.contre_argumentation }}
diff --git a/tests/test_cpam_response.py b/tests/test_cpam_response.py
index cc2af89..3e99a39 100644
--- a/tests/test_cpam_response.py
+++ b/tests/test_cpam_response.py
@@ -18,6 +18,7 @@ from src.config import (
from src.control.cpam_response import (
_build_cpam_prompt,
_format_response,
+ _get_code_label,
_search_rag_for_control,
_validate_references,
generate_cpam_response,
@@ -199,6 +200,51 @@ class TestBuildPrompt:
assert "preuves_dossier" in prompt
+ @patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale"))
+ @patch("src.control.cpam_response.normalize_code", return_value="K56.0")
+ def test_prompt_codes_with_cim10_labels(self, mock_norm, mock_valid):
+ """Les codes contestés affichent le libellé CIM-10."""
+ dossier = _make_dossier()
+ controle = _make_controle() # da_ucr="K56.0"
+ prompt = _build_cpam_prompt(dossier, controle, [])
+
+ assert "Iléus paralytique" in prompt
+ assert "DA proposés par UCR" in prompt
+
+ @patch("src.control.cpam_response.validate_code", return_value=(False, ""))
+ @patch("src.control.cpam_response.normalize_code", return_value="Z99.9")
+ def test_prompt_codes_invalid_graceful(self, mock_norm, mock_valid):
+ """Les codes invalides ne crashent pas, juste pas de libellé."""
+ dossier = _make_dossier()
+ controle = ControleCPAM(
+ numero_ogc=1, titre="Test", arg_ucr="Test",
+ decision_ucr="Rejet", dp_ucr="Z99.9", da_ucr=None,
+ )
+ prompt = _build_cpam_prompt(dossier, controle, [])
+
+ assert "Z99.9" in prompt
+ # Pas de crash
+
+ @patch("src.control.cpam_response.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable"))
+ @patch("src.control.cpam_response.normalize_code", return_value="Z45.8")
+ def test_prompt_dp_fallback_from_ucr(self, mock_norm, mock_valid):
+ """DP absent + dp_ucr → contexte injecté dans le prompt."""
+ dossier = DossierMedical(
+ source_file="test.pdf",
+ document_type="crh",
+ sejour=Sejour(),
+ diagnostic_principal=None,
+ )
+ controle = ControleCPAM(
+ numero_ogc=1, titre="Désaccord DP", arg_ucr="Test",
+ decision_ucr="Rejet", dp_ucr="Z45.8", da_ucr=None,
+ )
+ prompt = _build_cpam_prompt(dossier, controle, [])
+
+ assert "codé par l'établissement" in prompt
+ assert "contesté par la CPAM" in prompt
+ assert "Z45.8" in prompt
+
class TestFormatResponse:
def test_full_response_new_format(self):
@@ -349,7 +395,7 @@ class TestGenerateResponse:
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama):
- """Mode hybride : Ollama CPAM (27b) disponible → utilisé en premier."""
+ """Ollama disponible → utilisé en premier, retourne triplet."""
mock_rag.return_value = [
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
]
@@ -363,12 +409,14 @@ class TestGenerateResponse:
dossier = _make_dossier()
controle = _make_controle()
- text, sources = generate_cpam_response(dossier, controle)
+ text, response_data, sources = generate_cpam_response(dossier, controle)
assert "Contre-arguments médicaux..." in text
+ assert response_data is not None
+ assert response_data["analyse_contestation"] == "Analyse..."
+ assert response_data["conclusion"] == "Conclusion..."
assert len(sources) == 1
assert sources[0].document == "guide_methodo"
- # Ollama CPAM appelé en premier (avec model= et timeout=)
mock_ollama.assert_called_once()
mock_anthropic.assert_not_called()
@@ -376,7 +424,7 @@ class TestGenerateResponse:
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama):
- """Ollama CPAM indisponible → fallback Haiku."""
+ """Ollama indisponible → fallback Haiku, retourne triplet."""
mock_rag.return_value = [
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
]
@@ -390,10 +438,11 @@ class TestGenerateResponse:
dossier = _make_dossier()
controle = _make_controle()
- text, sources = generate_cpam_response(dossier, controle)
+ text, response_data, sources = generate_cpam_response(dossier, controle)
assert "Contre-args Haiku..." in text
- # Ollama CPAM appelé d'abord (échec), puis Haiku
+ assert response_data is not None
+ assert response_data["contre_arguments_medicaux"] == "Contre-args Haiku..."
mock_ollama.assert_called_once()
mock_anthropic.assert_called_once()
@@ -401,7 +450,7 @@ class TestGenerateResponse:
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama):
- """Ollama CPAM, Haiku et Ollama défaut tous indisponibles → texte vide."""
+ """Tous LLMs indisponibles → texte vide, response_data None."""
mock_rag.return_value = []
mock_anthropic.return_value = None
mock_ollama.return_value = None
@@ -409,9 +458,10 @@ class TestGenerateResponse:
dossier = _make_dossier()
controle = _make_controle()
- text, sources = generate_cpam_response(dossier, controle)
+ text, response_data, sources = generate_cpam_response(dossier, controle)
assert text == ""
+ assert response_data is None
assert sources == []
@@ -545,8 +595,8 @@ class TestSearchRagForControl:
assert "diagnostic principal" in first_call_query
@patch("src.medical.rag_search.search_similar_cpam")
- def test_max_10_results(self, mock_search):
- """Le résultat final est limité à 10 entrées."""
+ def test_max_12_results(self, mock_search):
+ """Le résultat final est limité à 12 entrées."""
mock_search.return_value = [
{"document": "guide_methodo", "page": i, "code": None,
"score": 0.9 - i * 0.01, "extrait": f"Texte {i}"}
@@ -558,7 +608,67 @@ class TestSearchRagForControl:
results = _search_rag_for_control(controle, dossier)
- assert len(results) <= 10
+ assert len(results) <= 12
+
+ @patch("src.medical.rag_search.search_similar_cpam")
+ def test_arg_ucr_not_truncated_200(self, mock_search):
+ """La requête RAG argument utilise jusqu'à 500 chars, pas 200."""
+ mock_search.return_value = []
+
+ dossier = _make_dossier()
+ long_arg = "A" * 400
+ controle = ControleCPAM(
+ numero_ogc=1, titre="Test", arg_ucr=long_arg,
+ decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
+ )
+
+ _search_rag_for_control(controle, dossier)
+
+ # La requête argument doit contenir les 400 chars (pas tronquée à 200)
+ arg_call_query = mock_search.call_args_list[0][0][0]
+ assert len(arg_call_query) > 200
+
+ @patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique"))
+ @patch("src.control.cpam_response.normalize_code", return_value="K56.0")
+ @patch("src.medical.rag_search.search_similar_cpam")
+ def test_query_cim10_definitions(self, mock_search, mock_norm, mock_valid):
+ """Requête 4 exécutée quand codes contestés présents."""
+ mock_search.return_value = []
+
+ dossier = _make_dossier()
+ controle = _make_controle() # da_ucr="K56.0"
+
+ _search_rag_for_control(controle, dossier)
+
+ # Chercher la requête contenant "CIM-10" et "définition"
+ cim10_queries = [
+ c[0][0] for c in mock_search.call_args_list
+ if "CIM-10" in c[0][0] and "définition" in c[0][0]
+ ]
+ assert len(cim10_queries) >= 1
+ assert "K56.0" in cim10_queries[0]
+
+ @patch("src.medical.rag_search.search_similar_cpam")
+ def test_query_rule_extraction(self, mock_search):
+ """Requête 5 exécutée quand arg_ucr contient une règle nommée."""
+ mock_search.return_value = []
+
+ dossier = _make_dossier()
+ controle = ControleCPAM(
+ numero_ogc=1, titre="Désaccord DAS",
+ arg_ucr="Selon la RègleT7 et l'Annexe-4B, le DAS n'est pas justifié.",
+ decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
+ )
+
+ _search_rag_for_control(controle, dossier)
+
+ # Chercher la requête contenant les règles extraites
+ rule_queries = [
+ c[0][0] for c in mock_search.call_args_list
+ if "guide méthodologique" in c[0][0]
+ ]
+ assert len(rule_queries) >= 1
+ assert "RègleT7" in rule_queries[0] or "Annexe" in rule_queries[0]
@patch("src.medical.rag_search.search_similar_cpam")
def test_clinical_query_when_das_match(self, mock_search):
@@ -570,8 +680,11 @@ class TestSearchRagForControl:
_search_rag_for_control(controle, dossier)
- # 3 appels : codes + argument + clinique
- assert mock_search.call_count == 3
- third_call_query = mock_search.call_args_list[2][0][0]
- assert "Iléus réflexe" in third_call_query
- assert "Cholécystite aiguë" in third_call_query
+ # Au moins 4 appels : codes + argument + clinique + CIM-10 définitions
+ assert mock_search.call_count >= 4
+ # La requête clinique contient DP + DAS textes
+ clinique_queries = [
+ c[0][0] for c in mock_search.call_args_list
+ if "Iléus réflexe" in c[0][0] and "Cholécystite aiguë" in c[0][0]
+ ]
+ assert len(clinique_queries) >= 1
diff --git a/tests/test_viewer.py b/tests/test_viewer.py
index 67df608..f11ad10 100644
--- a/tests/test_viewer.py
+++ b/tests/test_viewer.py
@@ -2,7 +2,7 @@
import pytest
-from src.viewer.app import create_app, compute_group_stats, severity_badge, format_duration
+from src.viewer.app import create_app, compute_group_stats, severity_badge, format_duration, format_cpam_text
from src.config import DossierMedical, Diagnostic, ActeCCAM
@@ -104,6 +104,40 @@ class TestIndexPageLoads:
assert b"Dossiers" in response.data
+class TestFormatCpamText:
+ def test_plain_text(self):
+ result = format_cpam_text("Un simple paragraphe.")
+ assert "Premier argument" in result + assert "
Point A" in result + assert "Conclusion" in result + + def test_none_input(self): + result = format_cpam_text(None) + assert result == "" + + def test_empty_input(self): + result = format_cpam_text("") + assert result == "" + + def test_html_escaping(self): + result = format_cpam_text("Test ") + assert "