From bc0ccbef7cca2c6900913a062821041592233a56 Mon Sep 17 00:00:00 2001 From: dom Date: Tue, 17 Feb 2026 23:24:10 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20enrichissement=20contre-argumentation?= =?UTF-8?q?=20CPAM=20=E2=80=94=20libell=C3=A9s=20CIM-10,=20RAG=20cibl?= =?UTF-8?q?=C3=A9,=20reprocess=20complet?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Résolution des libellés CIM-10 pour les codes contestés (dp_ucr, da_ucr, dr_ucr) - Fallback DP depuis dp_ucr quand le pipeline n'extrait pas de diagnostic principal - Troncature arg_ucr augmentée de 200 à 500 chars pour conserver les citations de règles - Requête RAG 4 : définitions CIM-10 (inclusion/exclusion) des codes contestés - Requête RAG 5 : extraction et recherche des règles nommées (RègleT7, Annexe, etc.) - Cap résultats RAG de 10 à 12 pour absorber les nouvelles requêtes - Reprocess viewer : pipeline complet (fusion + GHM + CPAM) pour dossiers multi-PDF - Affichage structuré response_data dans le viewer (analyse, preuves, références) - 7 nouveaux tests CPAM, 6 nouveaux tests viewer Co-Authored-By: Claude Opus 4.6 --- src/config.py | 1 + src/control/cpam_response.py | 91 ++++++++++++++++-- src/main.py | 3 +- src/viewer/app.py | 157 ++++++++++++++++++++++++++----- src/viewer/templates/detail.html | 82 +++++++++++++++- tests/test_cpam_response.py | 145 ++++++++++++++++++++++++---- tests/test_viewer.py | 36 ++++++- 7 files changed, 464 insertions(+), 51 deletions(-) diff --git a/src/config.py b/src/config.py index d650eab..78d79de 100644 --- a/src/config.py +++ b/src/config.py @@ -188,6 +188,7 @@ class ControleCPAM(BaseModel): dr_ucr: Optional[str] = None actes_ucr: Optional[str] = None contre_argumentation: Optional[str] = None + response_data: Optional[dict] = None sources_reponse: list[RAGSource] = Field(default_factory=list) diff --git a/src/control/cpam_response.py b/src/control/cpam_response.py index 8c890ee..d05573b 100644 --- a/src/control/cpam_response.py +++ b/src/control/cpam_response.py @@ -3,8 +3,10 @@ from __future__ import annotations import logging +import re from ..config import ControleCPAM, DossierMedical, RAGSource +from ..medical.cim10_dict import normalize_code, validate_code from ..medical.ollama_client import call_anthropic, call_ollama logger = logging.getLogger(__name__) @@ -42,7 +44,7 @@ def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> query_parts_arg = [] if controle.titre: query_parts_arg.append(controle.titre) - arg_short = controle.arg_ucr[:200] if controle.arg_ucr else "" + arg_short = controle.arg_ucr[:500] if controle.arg_ucr else "" if arg_short: query_parts_arg.append(arg_short) query_arg = " ".join(query_parts_arg) @@ -65,6 +67,43 @@ def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> logger.debug(" RAG requête clinique : %d résultats", len(results_clinique)) all_results.extend(results_clinique) + # Requête 4 — Définitions CIM-10 des codes contestés + contested_codes = [] + for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): + if field: + contested_codes.extend(re.split(r"[,;\s]+", field.strip())) + for raw_code in contested_codes: + raw_code = raw_code.strip() + if not raw_code: + continue + norm = normalize_code(raw_code) + is_valid, label = validate_code(norm) + if is_valid and label: + query_def = f"CIM-10 {norm} {label} définition inclusion exclusion" + else: + query_def = f"CIM-10 {norm} définition codage" + results_def = search_similar_cpam(query_def, top_k=3) + logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def)) + all_results.extend(results_def) + + # Requête 5 — Règles explicitement citées dans l'argument CPAM + if controle.arg_ucr: + rule_patterns = [ + r'(?:R[eè]gle\s*T?\s*\d+)', + r'(?:Annexe[\s-]*\d+[A-Za-z]*)', + r'(?:Situation de soins?\s+[^.]{5,40})', + ] + rules_found = [] + for pattern in rule_patterns: + rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE)) + if rules_found: + rules_unique = list(dict.fromkeys(rules_found)) + query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI" + results_rules = search_similar_cpam(query_rules, top_k=4) + logger.debug(" RAG requête règles (%s) : %d résultats", + ", ".join(rules_unique), len(results_rules)) + all_results.extend(results_rules) + if not all_results: return [] @@ -79,7 +118,29 @@ def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> seen[key] = r merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True) - return merged[:10] + return merged[:12] + + +def _get_code_label(code_str: str) -> str: + """Résout le libellé CIM-10 pour un ou plusieurs codes.""" + codes = re.split(r"[,;\s]+", code_str.strip()) + labels = [] + for raw in codes: + raw = raw.strip() + if not raw: + continue + norm = normalize_code(raw) + is_valid, label = validate_code(norm) + if is_valid and label: + labels.append(f"{norm} — {label}") + else: + labels.append(norm) + if not labels: + return "" + if len(labels) == 1: + parts = labels[0].split(" — ", 1) + return f" — {parts[1]}" if len(parts) > 1 else "" + return "\n " + "\n ".join(labels) def _build_cpam_prompt( @@ -95,6 +156,12 @@ def _build_cpam_prompt( dp = dossier.diagnostic_principal dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else "" dossier_lines.append(f"- DP : {dp.texte}{dp_code}") + elif controle.dp_ucr: + dp_label = _get_code_label(controle.dp_ucr) + dossier_lines.append( + f"- DP : code {controle.dp_ucr}{dp_label} " + f"(codé par l'établissement, contesté par la CPAM)" + ) if dossier.diagnostics_associes: das_parts = [] @@ -192,14 +259,14 @@ def _build_cpam_prompt( + "\n".join(asymetrie_lines) ) - # Codes contestés par la CPAM + # Codes contestés par la CPAM (avec libellés CIM-10 résolus) codes_contestes = [] if controle.dp_ucr: - codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}") + codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}") if controle.da_ucr: - codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}") + codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}") if controle.dr_ucr: - codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}") + codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}") if controle.actes_ucr: codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}") codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé" @@ -263,6 +330,10 @@ AXE ASYMÉTRIE D'INFORMATION : - Démontre en quoi ces éléments complémentaires (biologie, imagerie, traitements, actes) justifient le codage contesté - Ne mentionne AUCUN élément qui n'est pas dans le dossier fourni +MISE EN FORME : +- Structure chaque section avec des tirets pour lister les arguments distincts +- Un argument par puce, avec la preuve ou la référence associée + AXE RÉGLEMENTAIRE : - Identifie si l'UCR fait une interprétation restrictive non fondée d'une règle - Confronte le raisonnement CPAM au texte EXACT des sources fournies @@ -406,7 +477,7 @@ def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str def generate_cpam_response( dossier: DossierMedical, controle: ControleCPAM, -) -> tuple[str, list[RAGSource]]: +) -> tuple[str, dict | None, list[RAGSource]]: """Génère une contre-argumentation pour un contrôle CPAM. Args: @@ -414,7 +485,7 @@ def generate_cpam_response( controle: Le contrôle CPAM à contester. Returns: - Tuple (texte de contre-argumentation, sources RAG utilisées). + Tuple (texte de contre-argumentation, dict LLM structuré ou None, sources RAG utilisées). """ logger.info("CPAM : génération contre-argumentation pour OGC %d — %s", controle.numero_ogc, controle.titre) @@ -449,7 +520,7 @@ def generate_cpam_response( if result is None: logger.warning(" LLM non disponible — contre-argumentation non générée") - return "", rag_sources + return "", None, rag_sources # 5. Validation des références ref_warnings = _validate_references(result, sources) @@ -460,4 +531,4 @@ def generate_cpam_response( text = _format_response(result, ref_warnings) logger.info(" Contre-argumentation générée (%d caractères)", len(text)) - return text, rag_sources + return text, result, rag_sources diff --git a/src/main.py b/src/main.py index 3ec94cf..ba29c8a 100644 --- a/src/main.py +++ b/src/main.py @@ -378,8 +378,9 @@ def main(input_path: str | None = None) -> None: if target: logger.info(" CPAM : %d contrôle(s) pour %s", len(controles), subdir) for ctrl in controles: - text, sources = generate_cpam_response(target, ctrl) + text, response_data, sources = generate_cpam_response(target, ctrl) ctrl.contre_argumentation = text + ctrl.response_data = response_data ctrl.sources_reponse = sources target.controles_cpam = controles except Exception: diff --git a/src/viewer/app.py b/src/viewer/app.py index 985622b..dbea3b2 100644 --- a/src/viewer/app.py +++ b/src/viewer/app.py @@ -373,6 +373,37 @@ def format_doc_name(name: str) -> str: return name +def format_cpam_text(text: str | None) -> Markup: + """Convertit un texte CPAM (section) en HTML avec puces et paragraphes.""" + if not text: + return Markup("") + from markupsafe import escape + lines = str(text).split("\n") + html_parts: list[str] = [] + in_list = False + for line in lines: + stripped = line.strip() + if not stripped: + if in_list: + html_parts.append("") + in_list = False + html_parts.append("
") + continue + if stripped.startswith("- "): + if not in_list: + html_parts.append("") + in_list = False + html_parts.append(f"

{escape(stripped)}

") + if in_list: + html_parts.append("") + return Markup("\n".join(html_parts)) + + # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- @@ -387,6 +418,7 @@ def create_app() -> Flask: app.jinja_env.filters["format_duration"] = format_duration app.jinja_env.filters["format_dossier_name"] = format_dossier_name app.jinja_env.filters["format_doc_name"] = format_doc_name + app.jinja_env.filters["format_cpam_text"] = format_cpam_text ccam_dict = load_ccam_dict() @@ -445,36 +477,119 @@ def create_app() -> Flask: @app.route("/reprocess/", methods=["POST"]) def reprocess(filepath: str): - """Relance le traitement d'un dossier.""" + """Relance le pipeline complet : process PDFs + fusion + GHM + CPAM.""" from ..main import process_pdf, write_outputs + from ..medical.ghm import estimate_ghm dossier = load_dossier(filepath) - source_file = dossier.source_file - if not source_file: + input_dir = Path(__file__).parent.parent.parent / "input" + + # Collecter les PDFs sources (fusionné → source_files, simple → source_file) + source_names = [] + if dossier.source_files: + source_names = list(dossier.source_files) + elif dossier.source_file: + source_names = [dossier.source_file] + + if not source_names: return jsonify({"error": "Fichier source introuvable"}), 400 - # Chercher le PDF source dans input/ - input_dir = Path(__file__).parent.parent.parent / "input" - pdf_path = None - for p in input_dir.rglob(source_file): - if p.is_file(): - pdf_path = p - break + # Résoudre les chemins PDF dans input/ + pdf_paths = [] + missing = [] + for name in source_names: + found = None + for p in input_dir.rglob(name): + if p.is_file(): + found = p + break + if found: + pdf_paths.append(found) + else: + missing.append(name) - if not pdf_path: - return jsonify({"error": f"PDF source '{source_file}' introuvable"}), 404 + if not pdf_paths: + return jsonify({"error": f"PDF sources introuvables : {', '.join(missing)}"}), 404 try: - pdf_results = process_pdf(pdf_path) - stem = pdf_path.stem.replace(" ", "_") + # Déterminer le subdir depuis le premier PDF trouvé subdir = None - if pdf_path.parent != input_dir: - subdir = pdf_path.parent.name - multi = len(pdf_results) > 1 - for part_idx, (anonymized_text, new_dossier, report) in enumerate(pdf_results): - part_stem = f"{stem}_part{part_idx + 1}" if multi else stem - write_outputs(part_stem, anonymized_text, new_dossier, report, subdir=subdir) - return jsonify({"ok": True, "message": f"Traitement terminé ({len(pdf_results)} dossier(s))"}) + if pdf_paths[0].parent != input_dir: + subdir = pdf_paths[0].parent.name + + # 1. Traiter chaque PDF + group_dossiers = [] + for pdf_path in pdf_paths: + pdf_results = process_pdf(pdf_path) + stem = pdf_path.stem.replace(" ", "_") + multi = len(pdf_results) > 1 + for part_idx, (anonymized_text, new_dossier, report) in enumerate(pdf_results): + part_stem = f"{stem}_part{part_idx + 1}" if multi else stem + write_outputs(part_stem, anonymized_text, new_dossier, report, subdir=subdir) + group_dossiers.append(new_dossier) + + # 2. Fusion multi-PDF + merged = None + if len(group_dossiers) > 1 and subdir: + try: + from ..medical.fusion import merge_dossiers + merged = merge_dossiers(group_dossiers) + try: + ghm = estimate_ghm(merged) + merged.ghm_estimation = ghm + except Exception: + logger.warning("Erreur estimation GHM fusionné", exc_info=True) + except Exception: + logger.exception("Erreur fusion groupe %s", subdir) + + # 3. Contrôle CPAM (auto-détection Excel) + target = merged if merged else (group_dossiers[-1] if group_dossiers else None) + if target and subdir: + cpam_dir = input_dir / "Control_cpam" + cpam_path = None + if cpam_dir.is_dir(): + xlsx_files = sorted(cpam_dir.glob("*.xlsx")) + if xlsx_files: + cpam_path = xlsx_files[0] + if cpam_path: + try: + from ..control.cpam_parser import parse_cpam_excel, match_dossier_ogc + from ..control.cpam_response import generate_cpam_response + cpam_data = parse_cpam_excel(str(cpam_path)) + if cpam_data: + controles = match_dossier_ogc(subdir, cpam_data) + if controles: + logger.info("CPAM reprocess : %d contrôle(s) pour %s", + len(controles), subdir) + for ctrl in controles: + text, response_data, sources = generate_cpam_response(target, ctrl) + ctrl.contre_argumentation = text + ctrl.response_data = response_data + ctrl.sources_reponse = sources + target.controles_cpam = controles + except Exception: + logger.exception("Erreur CPAM reprocess pour %s", subdir) + + # 4. Écrire le dossier fusionné (après CPAM) + if merged is not None and subdir: + struct_dir = STRUCTURED_DIR / subdir + struct_dir.mkdir(parents=True, exist_ok=True) + merged_path = struct_dir / f"{subdir}_fusionne_cim10.json" + merged_path.write_text( + merged.model_dump_json(indent=2, exclude_none=True), + encoding="utf-8", + ) + logger.info("Dossier fusionné réécrit : %s", merged_path) + + msg = f"Traitement terminé ({len(group_dossiers)} dossier(s)" + if merged: + msg += ", fusionné" + if target and getattr(target, "controles_cpam", None): + msg += f", {len(target.controles_cpam)} contrôle(s) CPAM" + if missing: + msg += f", {len(missing)} PDF(s) manquant(s)" + msg += ")" + return jsonify({"ok": True, "message": msg}) except Exception as e: logger.exception("Erreur lors du retraitement") return jsonify({"error": str(e)}), 500 diff --git a/src/viewer/templates/detail.html b/src/viewer/templates/detail.html index cbbff14..72427b5 100644 --- a/src/viewer/templates/detail.html +++ b/src/viewer/templates/detail.html @@ -165,8 +165,86 @@ {% endif %} - {# Contre-argumentation #} - {% if ctrl.contre_argumentation %} + {# Contre-argumentation structurée ou fallback texte brut #} + {% if ctrl.response_data %} +
+
Contre-argumentation
+ + {% if ctrl.response_data.analyse_contestation %} +
+
Analyse de la contestation
+ {{ ctrl.response_data.analyse_contestation | format_cpam_text }} +
+ {% endif %} + + {% if ctrl.response_data.points_accord and ctrl.response_data.points_accord|lower not in ['aucun', 'non applicable', 'n/a', ''] %} +
+
Points d'accord
+ {{ ctrl.response_data.points_accord | format_cpam_text }} +
+ {% endif %} + + {% if ctrl.response_data.contre_arguments_medicaux %} +
+
Contre-arguments médicaux
+ {{ ctrl.response_data.contre_arguments_medicaux | format_cpam_text }} +
+ {% endif %} + + {% if ctrl.response_data.preuves_dossier %} +
+
Preuves du dossier
+
    + {% for p in ctrl.response_data.preuves_dossier %} + {% if p is mapping %} +
  • + {{ p.element or p.get('type', '') }} + {{ p.valeur or '' }} → {{ p.signification or '' }} +
  • + {% endif %} + {% endfor %} +
+
+ {% endif %} + + {% if ctrl.response_data.contre_arguments_asymetrie %} +
+
Asymétrie d'information
+ {{ ctrl.response_data.contre_arguments_asymetrie | format_cpam_text }} +
+ {% endif %} + + {% if ctrl.response_data.contre_arguments_reglementaires %} +
+
Contre-arguments réglementaires
+ {{ ctrl.response_data.contre_arguments_reglementaires | format_cpam_text }} +
+ {% endif %} + + {% if ctrl.response_data.references %} +
+
Références
+ {% for ref in ctrl.response_data.references %} + {% if ref is mapping %} +
+ [{{ ref.document or '' }}{% if ref.page %}, p.{{ ref.page }}{% endif %}] + {{ ref.citation or '' }} +
+ {% elif ref is string %} +

{{ ref }}

+ {% endif %} + {% endfor %} +
+ {% endif %} + + {% if ctrl.response_data.conclusion %} +
+
Conclusion
+ {{ ctrl.response_data.conclusion | format_cpam_text }} +
+ {% endif %} +
+ {% elif ctrl.contre_argumentation %}
Contre-argumentation
{{ ctrl.contre_argumentation }}
diff --git a/tests/test_cpam_response.py b/tests/test_cpam_response.py index cc2af89..3e99a39 100644 --- a/tests/test_cpam_response.py +++ b/tests/test_cpam_response.py @@ -18,6 +18,7 @@ from src.config import ( from src.control.cpam_response import ( _build_cpam_prompt, _format_response, + _get_code_label, _search_rag_for_control, _validate_references, generate_cpam_response, @@ -199,6 +200,51 @@ class TestBuildPrompt: assert "preuves_dossier" in prompt + @patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale")) + @patch("src.control.cpam_response.normalize_code", return_value="K56.0") + def test_prompt_codes_with_cim10_labels(self, mock_norm, mock_valid): + """Les codes contestés affichent le libellé CIM-10.""" + dossier = _make_dossier() + controle = _make_controle() # da_ucr="K56.0" + prompt = _build_cpam_prompt(dossier, controle, []) + + assert "Iléus paralytique" in prompt + assert "DA proposés par UCR" in prompt + + @patch("src.control.cpam_response.validate_code", return_value=(False, "")) + @patch("src.control.cpam_response.normalize_code", return_value="Z99.9") + def test_prompt_codes_invalid_graceful(self, mock_norm, mock_valid): + """Les codes invalides ne crashent pas, juste pas de libellé.""" + dossier = _make_dossier() + controle = ControleCPAM( + numero_ogc=1, titre="Test", arg_ucr="Test", + decision_ucr="Rejet", dp_ucr="Z99.9", da_ucr=None, + ) + prompt = _build_cpam_prompt(dossier, controle, []) + + assert "Z99.9" in prompt + # Pas de crash + + @patch("src.control.cpam_response.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable")) + @patch("src.control.cpam_response.normalize_code", return_value="Z45.8") + def test_prompt_dp_fallback_from_ucr(self, mock_norm, mock_valid): + """DP absent + dp_ucr → contexte injecté dans le prompt.""" + dossier = DossierMedical( + source_file="test.pdf", + document_type="crh", + sejour=Sejour(), + diagnostic_principal=None, + ) + controle = ControleCPAM( + numero_ogc=1, titre="Désaccord DP", arg_ucr="Test", + decision_ucr="Rejet", dp_ucr="Z45.8", da_ucr=None, + ) + prompt = _build_cpam_prompt(dossier, controle, []) + + assert "codé par l'établissement" in prompt + assert "contesté par la CPAM" in prompt + assert "Z45.8" in prompt + class TestFormatResponse: def test_full_response_new_format(self): @@ -349,7 +395,7 @@ class TestGenerateResponse: @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama): - """Mode hybride : Ollama CPAM (27b) disponible → utilisé en premier.""" + """Ollama disponible → utilisé en premier, retourne triplet.""" mock_rag.return_value = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte guide"}, ] @@ -363,12 +409,14 @@ class TestGenerateResponse: dossier = _make_dossier() controle = _make_controle() - text, sources = generate_cpam_response(dossier, controle) + text, response_data, sources = generate_cpam_response(dossier, controle) assert "Contre-arguments médicaux..." in text + assert response_data is not None + assert response_data["analyse_contestation"] == "Analyse..." + assert response_data["conclusion"] == "Conclusion..." assert len(sources) == 1 assert sources[0].document == "guide_methodo" - # Ollama CPAM appelé en premier (avec model= et timeout=) mock_ollama.assert_called_once() mock_anthropic.assert_not_called() @@ -376,7 +424,7 @@ class TestGenerateResponse: @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama): - """Ollama CPAM indisponible → fallback Haiku.""" + """Ollama indisponible → fallback Haiku, retourne triplet.""" mock_rag.return_value = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte guide"}, ] @@ -390,10 +438,11 @@ class TestGenerateResponse: dossier = _make_dossier() controle = _make_controle() - text, sources = generate_cpam_response(dossier, controle) + text, response_data, sources = generate_cpam_response(dossier, controle) assert "Contre-args Haiku..." in text - # Ollama CPAM appelé d'abord (échec), puis Haiku + assert response_data is not None + assert response_data["contre_arguments_medicaux"] == "Contre-args Haiku..." mock_ollama.assert_called_once() mock_anthropic.assert_called_once() @@ -401,7 +450,7 @@ class TestGenerateResponse: @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama): - """Ollama CPAM, Haiku et Ollama défaut tous indisponibles → texte vide.""" + """Tous LLMs indisponibles → texte vide, response_data None.""" mock_rag.return_value = [] mock_anthropic.return_value = None mock_ollama.return_value = None @@ -409,9 +458,10 @@ class TestGenerateResponse: dossier = _make_dossier() controle = _make_controle() - text, sources = generate_cpam_response(dossier, controle) + text, response_data, sources = generate_cpam_response(dossier, controle) assert text == "" + assert response_data is None assert sources == [] @@ -545,8 +595,8 @@ class TestSearchRagForControl: assert "diagnostic principal" in first_call_query @patch("src.medical.rag_search.search_similar_cpam") - def test_max_10_results(self, mock_search): - """Le résultat final est limité à 10 entrées.""" + def test_max_12_results(self, mock_search): + """Le résultat final est limité à 12 entrées.""" mock_search.return_value = [ {"document": "guide_methodo", "page": i, "code": None, "score": 0.9 - i * 0.01, "extrait": f"Texte {i}"} @@ -558,7 +608,67 @@ class TestSearchRagForControl: results = _search_rag_for_control(controle, dossier) - assert len(results) <= 10 + assert len(results) <= 12 + + @patch("src.medical.rag_search.search_similar_cpam") + def test_arg_ucr_not_truncated_200(self, mock_search): + """La requête RAG argument utilise jusqu'à 500 chars, pas 200.""" + mock_search.return_value = [] + + dossier = _make_dossier() + long_arg = "A" * 400 + controle = ControleCPAM( + numero_ogc=1, titre="Test", arg_ucr=long_arg, + decision_ucr="Rejet", dp_ucr=None, da_ucr=None, + ) + + _search_rag_for_control(controle, dossier) + + # La requête argument doit contenir les 400 chars (pas tronquée à 200) + arg_call_query = mock_search.call_args_list[0][0][0] + assert len(arg_call_query) > 200 + + @patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique")) + @patch("src.control.cpam_response.normalize_code", return_value="K56.0") + @patch("src.medical.rag_search.search_similar_cpam") + def test_query_cim10_definitions(self, mock_search, mock_norm, mock_valid): + """Requête 4 exécutée quand codes contestés présents.""" + mock_search.return_value = [] + + dossier = _make_dossier() + controle = _make_controle() # da_ucr="K56.0" + + _search_rag_for_control(controle, dossier) + + # Chercher la requête contenant "CIM-10" et "définition" + cim10_queries = [ + c[0][0] for c in mock_search.call_args_list + if "CIM-10" in c[0][0] and "définition" in c[0][0] + ] + assert len(cim10_queries) >= 1 + assert "K56.0" in cim10_queries[0] + + @patch("src.medical.rag_search.search_similar_cpam") + def test_query_rule_extraction(self, mock_search): + """Requête 5 exécutée quand arg_ucr contient une règle nommée.""" + mock_search.return_value = [] + + dossier = _make_dossier() + controle = ControleCPAM( + numero_ogc=1, titre="Désaccord DAS", + arg_ucr="Selon la RègleT7 et l'Annexe-4B, le DAS n'est pas justifié.", + decision_ucr="Rejet", dp_ucr=None, da_ucr=None, + ) + + _search_rag_for_control(controle, dossier) + + # Chercher la requête contenant les règles extraites + rule_queries = [ + c[0][0] for c in mock_search.call_args_list + if "guide méthodologique" in c[0][0] + ] + assert len(rule_queries) >= 1 + assert "RègleT7" in rule_queries[0] or "Annexe" in rule_queries[0] @patch("src.medical.rag_search.search_similar_cpam") def test_clinical_query_when_das_match(self, mock_search): @@ -570,8 +680,11 @@ class TestSearchRagForControl: _search_rag_for_control(controle, dossier) - # 3 appels : codes + argument + clinique - assert mock_search.call_count == 3 - third_call_query = mock_search.call_args_list[2][0][0] - assert "Iléus réflexe" in third_call_query - assert "Cholécystite aiguë" in third_call_query + # Au moins 4 appels : codes + argument + clinique + CIM-10 définitions + assert mock_search.call_count >= 4 + # La requête clinique contient DP + DAS textes + clinique_queries = [ + c[0][0] for c in mock_search.call_args_list + if "Iléus réflexe" in c[0][0] and "Cholécystite aiguë" in c[0][0] + ] + assert len(clinique_queries) >= 1 diff --git a/tests/test_viewer.py b/tests/test_viewer.py index 67df608..f11ad10 100644 --- a/tests/test_viewer.py +++ b/tests/test_viewer.py @@ -2,7 +2,7 @@ import pytest -from src.viewer.app import create_app, compute_group_stats, severity_badge, format_duration +from src.viewer.app import create_app, compute_group_stats, severity_badge, format_duration, format_cpam_text from src.config import DossierMedical, Diagnostic, ActeCCAM @@ -104,6 +104,40 @@ class TestIndexPageLoads: assert b"Dossiers" in response.data +class TestFormatCpamText: + def test_plain_text(self): + result = format_cpam_text("Un simple paragraphe.") + assert "Premier argument" in result + assert "
  • Deuxième argument
  • " in result + + def test_mixed_text_and_bullets(self): + text = "Introduction\n- Point A\n- Point B\nConclusion" + result = format_cpam_text(text) + assert "Point A" in result + assert "Conclusion" in result + + def test_none_input(self): + result = format_cpam_text(None) + assert result == "" + + def test_empty_input(self): + result = format_cpam_text("") + assert result == "" + + def test_html_escaping(self): + result = format_cpam_text("Test ") + assert "