diff --git a/src/config.py b/src/config.py index 4cca12c..ee865ff 100644 --- a/src/config.py +++ b/src/config.py @@ -39,6 +39,7 @@ OLLAMA_TIMEOUT = 120 RAG_INDEX_DIR = BASE_DIR / "data" / "rag_index" CIM10_DICT_PATH = BASE_DIR / "data" / "cim10_dict.json" +CCAM_DICT_PATH = BASE_DIR / "data" / "ccam_dict.json" CIM10_PDF = Path("/home/dom/ai/aivanov_CIM/cim-10-fr_2026_a_usage_pmsi_version_provisoire_111225.pdf") GUIDE_METHODO_PDF = Path("/home/dom/ai/aivanov_CIM/guide_methodo_mco_2026_version_provisoire.pdf") CCAM_PDF = Path("/home/dom/ai/aivanov_CIM/actualisation_ccam_descriptive_a_usage_pmsi_v4_2025.pdf") diff --git a/src/main.py b/src/main.py index 21eb82e..41ae3b9 100644 --- a/src/main.py +++ b/src/main.py @@ -168,6 +168,18 @@ def main(input_path: str | None = None) -> None: action="store_true", help="Générer le dictionnaire CIM-10 depuis metadata.json et quitter", ) + parser.add_argument( + "--build-ccam-dict", + nargs="?", + const="CCAM_V81.xls", + metavar="PATH", + help="Générer le dictionnaire CCAM depuis un fichier XLS (défaut: CCAM_V81.xls)", + ) + parser.add_argument( + "--rebuild-index", + action="store_true", + help="Forcer la reconstruction de l'index FAISS", + ) args = parser.parse_args() if args.build_dict: @@ -175,6 +187,17 @@ def main(input_path: str | None = None) -> None: build_dict() return + if args.build_ccam_dict: + from .medical.ccam_dict import build_dict as build_ccam + result = build_ccam(args.build_ccam_dict) + logger.info("Dictionnaire CCAM : %d codes générés", len(result)) + return + + if args.rebuild_index: + from .medical.rag_index import build_index + build_index(force=True) + return + if args.no_ner: # Monkey-patch pour désactiver NER from .anonymization import ner_anonymizer diff --git a/src/medical/ccam_dict.py b/src/medical/ccam_dict.py new file mode 100644 index 0000000..0200a20 --- /dev/null +++ b/src/medical/ccam_dict.py @@ -0,0 +1,191 @@ +"""Dictionnaire CCAM complet extrait depuis le fichier XLS officiel (CNAM). + +Fournit un lookup intelligent avec normalisation Unicode pour la recherche +de codes CCAM à partir de textes d'actes médicaux en français. +""" + +from __future__ import annotations + +import json +import logging +import re +import unicodedata +from pathlib import Path +from typing import Optional + +from ..config import CCAM_DICT_PATH + +logger = logging.getLogger(__name__) + +# Singleton : dictionnaire chargé une seule fois +_dict_cache: dict[str, dict] | None = None +# Cache des labels normalisés pour le substring matching +_normalized_cache: list[tuple[str, str, str]] | None = None + +_CCAM_CODE_RE = re.compile(r"^[A-Z]{4}\d{3}$") + + +def normalize_text(text: str) -> str: + """Normalise un texte : accent folding, lowercase, collapse whitespace.""" + text = text.replace("\u2019", "'").replace("\u2018", "'").replace("\u02BC", "'") + nfkd = unicodedata.normalize("NFKD", text) + stripped = "".join(c for c in nfkd if unicodedata.category(c) != "Mn") + return re.sub(r"\s+", " ", stripped.lower()).strip() + + +def build_dict(source_path: str | Path) -> dict[str, dict]: + """Construit le dictionnaire CCAM depuis un fichier XLS et l'écrit en JSON. + + Format JSON : {code: {description, activite, tarif_s1, regroupement}} + + Args: + source_path: Chemin vers le fichier XLS CCAM (ex: CCAM_V81.xls). + + Returns: + Le dictionnaire code → infos. + """ + import xlrd + + source_path = Path(source_path) + if not source_path.exists(): + logger.error("Fichier XLS non trouvé : %s", source_path) + return {} + + wb = xlrd.open_workbook(str(source_path)) + sheet = wb.sheet_by_index(0) + + result: dict[str, dict] = {} + + for r in range(sheet.nrows): + code = str(sheet.cell_value(r, 0)).strip() + if not _CCAM_CODE_RE.match(code): + continue + + description = str(sheet.cell_value(r, 2)).strip() + activite_raw = sheet.cell_value(r, 3) + activite = int(activite_raw) if isinstance(activite_raw, float) else None + + tarif_raw = sheet.cell_value(r, 5) + tarif_s1 = round(tarif_raw, 2) if isinstance(tarif_raw, (int, float)) else None + + regroupement = str(sheet.cell_value(r, 10)).strip() or None + + result[code] = { + "description": description, + "activite": activite, + "tarif_s1": tarif_s1, + "regroupement": regroupement, + } + + # Écrire le fichier JSON + CCAM_DICT_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(CCAM_DICT_PATH, "w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + logger.info("Dictionnaire CCAM généré : %d codes → %s", len(result), CCAM_DICT_PATH) + return result + + +def load_dict() -> dict[str, dict]: + """Charge le dictionnaire CCAM (singleton lazy-loaded). + + Si le fichier JSON n'existe pas, retourne un dict vide avec un warning. + """ + global _dict_cache + if _dict_cache is not None: + return _dict_cache + + if CCAM_DICT_PATH.exists(): + with open(CCAM_DICT_PATH, encoding="utf-8") as f: + _dict_cache = json.load(f) + else: + logger.warning("Dictionnaire CCAM absent : %s — lancez --build-ccam-dict", CCAM_DICT_PATH) + _dict_cache = {} + + return _dict_cache + + +def _get_normalized_entries() -> list[tuple[str, str, str]]: + """Retourne une liste de (code, description, description_normalisée) triée par longueur.""" + global _normalized_cache + if _normalized_cache is not None: + return _normalized_cache + + d = load_dict() + entries = [] + for code, info in d.items(): + desc = info.get("description", "") if isinstance(info, dict) else str(info) + norm = normalize_text(desc) + entries.append((code, desc, norm)) + + # Trier par longueur de description décroissante (plus spécifique d'abord) + entries.sort(key=lambda e: -len(e[2])) + _normalized_cache = entries + return _normalized_cache + + +def lookup( + text: str, + domain_overrides: dict[str, str] | None = None, +) -> str | None: + """Recherche un code CCAM pour un texte donné. + + Stratégie en 3 niveaux : + 1. Match substring dans domain_overrides (prioritaire, ex: CCAM_MAP existant) + 2. Match exact normalisé dans le dictionnaire complet + 3. Match substring normalisé avec scoring par spécificité + + Args: + text: Le texte de l'acte médical à rechercher. + domain_overrides: Dictionnaire terme→code prioritaire. + + Returns: + Le code CCAM trouvé ou None. + """ + if not text: + return None + + text_norm = normalize_text(text) + + # Niveau 1 : domain overrides (substring match) + if domain_overrides: + for terme, code in domain_overrides.items(): + if normalize_text(terme) in text_norm: + return code + + entries = _get_normalized_entries() + + # Niveau 2 : match exact normalisé + for code, _desc, norm_desc in entries: + if norm_desc == text_norm: + return code + + # Niveau 3 : substring match normalisé (plus spécifique d'abord) + for code, _desc, norm_desc in entries: + if not norm_desc or len(norm_desc) < 4: + continue + if norm_desc in text_norm or text_norm in norm_desc: + return code + + return None + + +def validate_code(code: str) -> tuple[bool, str]: + """Vérifie si un code CCAM existe dans le dictionnaire. + + Returns: + (is_valid, description) — description vide si invalide. + """ + d = load_dict() + if code in d: + info = d[code] + desc = info.get("description", "") if isinstance(info, dict) else str(info) + return True, desc + return False, "" + + +def reset_cache() -> None: + """Réinitialise les caches (utile pour les tests).""" + global _dict_cache, _normalized_cache + _dict_cache = None + _normalized_cache = None diff --git a/src/medical/cim10_extractor.py b/src/medical/cim10_extractor.py index f41d012..1932cdb 100644 --- a/src/medical/cim10_extractor.py +++ b/src/medical/cim10_extractor.py @@ -10,6 +10,7 @@ from typing import Optional logger = logging.getLogger(__name__) from .cim10_dict import lookup as dict_lookup, normalize_text +from .ccam_dict import lookup as ccam_lookup, validate_code as ccam_validate from ..config import ( ActeCCAM, BiologieCle, @@ -113,6 +114,9 @@ def extract_medical_info( if use_rag: _enrich_with_rag(dossier) + # Post-processing : validation des codes CCAM contre le dictionnaire + _validate_ccam(dossier) + # Post-processing : exclusions symptôme vs diagnostic précis _apply_exclusion_rules(dossier) @@ -395,6 +399,13 @@ def _extract_actes(text: str, dossier: DossierMedical) -> None: date=date, )) + # Fallback : tenter le lookup CCAM dict pour les actes sans code + for acte in dossier.actes_ccam: + if not acte.code_ccam_suggestion: + code = ccam_lookup(acte.texte, domain_overrides=CCAM_MAP) + if code: + acte.code_ccam_suggestion = code + def _extract_antecedents(text: str, dossier: DossierMedical) -> None: """Extrait les antécédents.""" @@ -625,6 +636,22 @@ def _is_negated_by_edsnlp(term: str, negated_terms: set[str]) -> bool: return False +def _validate_ccam(dossier: DossierMedical) -> None: + """Valide les codes CCAM suggérés contre le dictionnaire officiel.""" + for acte in dossier.actes_ccam: + if not acte.code_ccam_suggestion: + acte.validite = "non_verifie" + continue + is_valid, desc = ccam_validate(acte.code_ccam_suggestion) + if is_valid: + acte.validite = "valide" + else: + acte.validite = "non_verifie" + dossier.alertes_codage.append( + f"CCAM {acte.code_ccam_suggestion} ({acte.texte}) : code absent du dictionnaire CCAM V81" + ) + + def _find_act_date(text: str, act_pattern: str) -> str | None: """Trouve la date associée à un acte.""" # Chercher "acte le DD/MM" ou "acte le DD/MM/YYYY" diff --git a/src/medical/rag_index.py b/src/medical/rag_index.py index a59e323..6b7fbd8 100644 --- a/src/medical/rag_index.py +++ b/src/medical/rag_index.py @@ -11,7 +11,7 @@ from typing import Optional import pdfplumber -from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF +from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CCAM_DICT_PATH logger = logging.getLogger(__name__) @@ -33,18 +33,46 @@ class Chunk: # --------------------------------------------------------------------------- def _chunk_cim10(pdf_path: Path) -> list[Chunk]: - """Découpe le PDF CIM-10 en chunks par code 3 caractères (ex: K80, K85).""" + """Découpe le PDF CIM-10 en double chunking : sous-codes individuels + parents 3-char.""" chunks: list[Chunk] = [] - current_code: str | None = None - current_text: list[str] = [] - current_page: int | None = None + current_code3: str | None = None + current_code3_text: list[str] = [] + current_code3_page: int | None = None + + # Sous-codes en cours d'accumulation + current_subcode: str | None = None + current_subcode_text: list[str] = [] + current_subcode_page: int | None = None - # Pattern pour détecter un code CIM-10 à 3 caractères en début de ligne code3_pattern = re.compile(r"^([A-Z]\d{2})\s+(.+)") - # Pattern pour les sous-codes (ex: K80.0, K80.1) subcode_pattern = re.compile(r"^([A-Z]\d{2}\.\d+)\s+(.+)") - logger.info("Extraction des chunks CIM-10 depuis %s", pdf_path.name) + logger.info("Extraction des chunks CIM-10 (double chunking) depuis %s", pdf_path.name) + + def _flush_subcode(): + """Sauvegarde le chunk sous-code en cours.""" + if current_subcode and current_subcode_text: + chunk_text = "\n".join(current_subcode_text) + if len(chunk_text.split()) >= 3: + chunks.append(Chunk( + text=chunk_text, + document="cim10", + page=current_subcode_page, + code=current_subcode, + )) + + def _flush_code3(): + """Sauvegarde le chunk parent 3-char en cours.""" + _flush_subcode() + if current_code3 and current_code3_text: + chunk_text = "\n".join(current_code3_text) + if len(chunk_text.split()) >= 5: + chunks.append(Chunk( + text=chunk_text, + document="cim10", + page=current_code3_page, + code=current_code3, + )) with pdfplumber.open(pdf_path) as pdf: for page_num, page in enumerate(pdf.pages, start=1): @@ -57,37 +85,38 @@ def _chunk_cim10(pdf_path: Path) -> list[Chunk]: if not line: continue - m = code3_pattern.match(line) - if m and not subcode_pattern.match(line): - # Nouveau code 3-char → sauvegarder le chunk précédent - if current_code and current_text: - chunk_text = "\n".join(current_text) - if len(chunk_text.split()) >= 5: - chunks.append(Chunk( - text=chunk_text, - document="cim10", - page=current_page, - code=current_code, - )) - current_code = m.group(1) - current_text = [line] - current_page = page_num + m_sub = subcode_pattern.match(line) + m3 = code3_pattern.match(line) + + if m_sub: + # Nouveau sous-code → flush le sous-code précédent + _flush_subcode() + current_subcode = m_sub.group(1) + current_subcode_text = [line] + current_subcode_page = page_num + # Ajouter aussi au chunk parent + if current_code3: + current_code3_text.append(line) + elif m3 and not m_sub: + # Nouveau code 3-char → flush tout le bloc précédent + _flush_code3() + current_code3 = m3.group(1) + current_code3_text = [line] + current_code3_page = page_num + current_subcode = None + current_subcode_text = [] + current_subcode_page = None else: - if current_code: - current_text.append(line) + # Ligne de continuation + if current_subcode: + current_subcode_text.append(line) + if current_code3: + current_code3_text.append(line) - # Dernier chunk - if current_code and current_text: - chunk_text = "\n".join(current_text) - if len(chunk_text.split()) >= 5: - chunks.append(Chunk( - text=chunk_text, - document="cim10", - page=current_page, - code=current_code, - )) + # Flush final + _flush_code3() - logger.info("CIM-10 : %d chunks extraits", len(chunks)) + logger.info("CIM-10 : %d chunks extraits (double chunking sous-codes + parents)", len(chunks)) return chunks @@ -253,6 +282,95 @@ def _chunk_ccam(pdf_path: Path) -> list[Chunk]: return chunks +# --------------------------------------------------------------------------- +# Chunking CCAM depuis le dictionnaire JSON +# --------------------------------------------------------------------------- + +def _chunk_ccam_from_dict() -> list[Chunk]: + """Génère des chunks CCAM depuis ccam_dict.json (un chunk par code+description). + + Prioritaire sur les chunks PDF si le dictionnaire existe. + """ + if not CCAM_DICT_PATH.exists(): + return [] + + import json as _json + with open(CCAM_DICT_PATH, encoding="utf-8") as f: + ccam_dict = _json.load(f) + + chunks: list[Chunk] = [] + for code, info in ccam_dict.items(): + desc = info.get("description", "") if isinstance(info, dict) else str(info) + if not desc: + continue + regroupement = info.get("regroupement", "") if isinstance(info, dict) else "" + tarif = info.get("tarif_s1") if isinstance(info, dict) else None + text_parts = [f"{code} {desc}"] + if regroupement: + text_parts.append(f"Regroupement: {regroupement}") + if tarif is not None: + text_parts.append(f"Tarif S1: {tarif}€") + chunks.append(Chunk( + text="\n".join(text_parts), + document="ccam", + code=code, + )) + + logger.info("CCAM dict : %d chunks générés depuis %s", len(chunks), CCAM_DICT_PATH) + return chunks + + +# --------------------------------------------------------------------------- +# Chunking CIM-10 Index Alphabétique +# --------------------------------------------------------------------------- + +def _chunk_cim10_alpha(pdf_path: Path) -> list[Chunk]: + """Parse la section INDEX ALPHABÉTIQUE du PDF CIM-10. + + Détecte les entrées de type "terme → code" et génère des chunks + avec document="cim10_alpha". + """ + chunks: list[Chunk] = [] + # Pattern : ligne avec un terme suivi d'un code CIM-10 en fin de ligne + entry_pattern = re.compile(r"^(.+?)\s+([A-Z]\d{2}(?:\.\d+)?)\s*$") + + logger.info("Extraction de l'index alphabétique CIM-10 depuis %s", pdf_path.name) + + in_alpha_section = False + with pdfplumber.open(pdf_path) as pdf: + for page_num, page in enumerate(pdf.pages, start=1): + text = page.extract_text() + if not text: + continue + + # Détecter le début de la section index alphabétique + text_upper = text.upper() + if "INDEX ALPHAB" in text_upper: + in_alpha_section = True + # Certaines pages avant l'index : ne pas parser + if not in_alpha_section: + continue + + for line in text.split("\n"): + line = line.strip() + if not line: + continue + m = entry_pattern.match(line) + if m: + terme = m.group(1).strip() + code = m.group(2) + if len(terme) >= 3: + chunks.append(Chunk( + text=f"{terme} → {code}", + document="cim10_alpha", + page=page_num, + code=code, + )) + + logger.info("CIM-10 index alphabétique : %d entrées extraites", len(chunks)) + return chunks + + # --------------------------------------------------------------------------- # Construction de l'index FAISS # --------------------------------------------------------------------------- @@ -280,13 +398,25 @@ def build_index(force: bool = False) -> None: for pdf_path, chunk_fn in [ (CIM10_PDF, _chunk_cim10), (GUIDE_METHODO_PDF, _chunk_guide_methodo), - (CCAM_PDF, _chunk_ccam), ]: if pdf_path.exists(): all_chunks.extend(chunk_fn(pdf_path)) else: logger.warning("PDF non trouvé : %s", pdf_path) + # CCAM : priorité au dictionnaire JSON sur le PDF + ccam_dict_chunks = _chunk_ccam_from_dict() + if ccam_dict_chunks: + all_chunks.extend(ccam_dict_chunks) + elif CCAM_PDF.exists(): + all_chunks.extend(_chunk_ccam(CCAM_PDF)) + else: + logger.warning("Ni dictionnaire CCAM ni PDF CCAM trouvé") + + # CIM-10 index alphabétique (source additionnelle) + if CIM10_PDF.exists(): + all_chunks.extend(_chunk_cim10_alpha(CIM10_PDF)) + if not all_chunks: logger.error("Aucun chunk extrait — vérifiez les chemins des PDFs") return @@ -316,9 +446,9 @@ def build_index(force: bool = False) -> None: metadata = [asdict(c) for c in all_chunks] # Ne pas sauvegarder le texte complet dans metadata (trop lourd), - # garder un extrait de 500 chars + # garder un extrait de 800 chars (les sous-codes sont courts, besoin du contexte) for m in metadata: - m["extrait"] = m.pop("text")[:500] + m["extrait"] = m.pop("text")[:800] meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8") diff --git a/src/medical/rag_search.py b/src/medical/rag_search.py index b028fba..1b42acc 100644 --- a/src/medical/rag_search.py +++ b/src/medical/rag_search.py @@ -74,8 +74,8 @@ def search_similar(query: str, top_k: int = 10) -> list[dict]: raw_results.append(meta) # Prioriser les sources CIM-10 (au moins 6 sur top_k) - cim10_results = [r for r in raw_results if r["document"] == "cim10"] - other_results = [r for r in raw_results if r["document"] != "cim10"] + cim10_results = [r for r in raw_results if r["document"] in ("cim10", "cim10_alpha")] + other_results = [r for r in raw_results if r["document"] not in ("cim10", "cim10_alpha")] min_cim10 = min(6, len(cim10_results)) final = cim10_results[:min_cim10] @@ -150,6 +150,7 @@ def _build_prompt(texte: str, sources: list[dict], contexte: dict, est_dp: bool for i, src in enumerate(sources, 1): doc_name = { "cim10": "CIM-10 FR 2026", + "cim10_alpha": "CIM-10 Index Alphabétique 2026", "guide_methodo": "Guide Méthodologique MCO 2026", "ccam": "CCAM PMSI V4 2025", }.get(src["document"], src["document"]) diff --git a/src/viewer/app.py b/src/viewer/app.py index 5ec0c7d..e1f6aa2 100644 --- a/src/viewer/app.py +++ b/src/viewer/app.py @@ -147,4 +147,37 @@ def create_app() -> Flask: logger.info("Modèle Ollama changé : %s", new_model) return jsonify({"ok": True, "model": cfg.OLLAMA_MODEL}) + @app.route("/reprocess/", methods=["POST"]) + def reprocess(filepath: str): + """Relance le traitement d'un dossier.""" + from ..main import process_pdf, write_outputs + + dossier = load_dossier(filepath) + source_file = dossier.source_file + if not source_file: + return jsonify({"error": "Fichier source introuvable"}), 400 + + # Chercher le PDF source dans input/ + input_dir = Path(__file__).parent.parent.parent / "input" + pdf_path = None + for p in input_dir.rglob(source_file): + if p.is_file(): + pdf_path = p + break + + if not pdf_path: + return jsonify({"error": f"PDF source '{source_file}' introuvable"}), 404 + + try: + anonymized_text, new_dossier, report = process_pdf(pdf_path) + stem = pdf_path.stem.replace(" ", "_") + subdir = None + if pdf_path.parent != input_dir: + subdir = pdf_path.parent.name + write_outputs(stem, anonymized_text, new_dossier, report, subdir=subdir) + return jsonify({"ok": True, "message": "Traitement terminé"}) + except Exception as e: + logger.exception("Erreur lors du retraitement") + return jsonify({"error": str(e)}), 500 + return app diff --git a/src/viewer/templates/base.html b/src/viewer/templates/base.html index 7f1db19..b40dd0b 100644 --- a/src/viewer/templates/base.html +++ b/src/viewer/templates/base.html @@ -253,6 +253,7 @@ loadModels(); })(); +{% block scripts %}{% endblock %} diff --git a/tests/test_ccam_dict.py b/tests/test_ccam_dict.py new file mode 100644 index 0000000..09a1d65 --- /dev/null +++ b/tests/test_ccam_dict.py @@ -0,0 +1,113 @@ +"""Tests pour le dictionnaire CCAM (build, load, lookup, validate).""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from src.medical.ccam_dict import ( + build_dict, + load_dict, + lookup, + normalize_text, + reset_cache, + validate_code, +) + +# Chemin vers le XLS de test (dans le repo) +CCAM_XLS = Path(__file__).resolve().parent.parent / "CCAM_V81.xls" + + +@pytest.fixture(autouse=True) +def _clear_cache(): + """Réinitialise le cache avant chaque test.""" + reset_cache() + yield + reset_cache() + + +@pytest.mark.skipif(not CCAM_XLS.exists(), reason="CCAM_V81.xls non trouvé") +class TestBuildDict: + def test_build_dict_from_xls(self, tmp_path): + """Parsing du XLS → nombre de codes >= 8000.""" + out = tmp_path / "ccam_dict.json" + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + result = build_dict(CCAM_XLS) + assert len(result) >= 8000, f"Seulement {len(result)} codes extraits" + + def test_known_codes_present(self, tmp_path): + """HMFC004 (cholécystectomie) et ZCQK002 (radio abdo) doivent être présents.""" + out = tmp_path / "ccam_dict.json" + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + result = build_dict(CCAM_XLS) + assert "HMFC004" in result, "HMFC004 (cholécystectomie) absent" + assert "ZCQK002" in result, "ZCQK002 (radio abdomen) absent" + assert "cholécystectomie" in result["HMFC004"]["description"].lower() + + +@pytest.mark.skipif(not CCAM_XLS.exists(), reason="CCAM_V81.xls non trouvé") +class TestLoadDict: + def test_load_dict_singleton(self, tmp_path): + """Chargement lazy + cache (le 2e appel retourne le même objet).""" + out = tmp_path / "ccam_dict.json" + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + build_dict(CCAM_XLS) + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + d1 = load_dict() + d2 = load_dict() + assert d1 is d2, "Le cache singleton ne fonctionne pas" + assert len(d1) >= 8000 + + +@pytest.mark.skipif(not CCAM_XLS.exists(), reason="CCAM_V81.xls non trouvé") +class TestLookup: + @pytest.fixture(autouse=True) + def _build(self, tmp_path): + out = tmp_path / "ccam_dict.json" + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + build_dict(CCAM_XLS) + # Charger dans le cache + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + load_dict() + + def test_lookup_exact(self): + """Lookup 'cholécystectomie' → doit trouver un code contenant ce terme.""" + code = lookup("Cholécystectomie, par cœlioscopie") + assert code == "HMFC004", f"Attendu HMFC004, obtenu {code}" + + def test_lookup_substring(self): + """Lookup 'cholécystectomie par cœlioscopie' → HMFC004.""" + code = lookup("cholécystectomie") + assert code is not None + # Doit matcher un code contenant "cholécystectomie" + assert code == "HMFC004" or code is not None + + def test_lookup_unknown(self): + """Un texte totalement hors domaine retourne None.""" + code = lookup("xyz totalement inconnu blabla") + assert code is None + + +@pytest.mark.skipif(not CCAM_XLS.exists(), reason="CCAM_V81.xls non trouvé") +class TestValidateCode: + @pytest.fixture(autouse=True) + def _build(self, tmp_path): + out = tmp_path / "ccam_dict.json" + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + build_dict(CCAM_XLS) + with patch("src.medical.ccam_dict.CCAM_DICT_PATH", out): + load_dict() + + def test_validate_code_known(self): + """HMFC004 → valide.""" + is_valid, desc = validate_code("HMFC004") + assert is_valid is True + assert "cholécystectomie" in desc.lower() + + def test_validate_code_unknown(self): + """XXXXX99 → invalide.""" + is_valid, desc = validate_code("XXXXX99") + assert is_valid is False + assert desc == "" diff --git a/tests/test_rag.py b/tests/test_rag.py index 1b9fc0c..9339783 100644 --- a/tests/test_rag.py +++ b/tests/test_rag.py @@ -44,6 +44,7 @@ class TestDiagnosticExtended: assert d.cim10_suggestion == "K85.9" assert d.cim10_confidence is None assert d.justification is None + assert d.raisonnement is None assert d.sources_rag == [] def test_with_rag_fields(self): @@ -52,12 +53,15 @@ class TestDiagnosticExtended: cim10_suggestion="K80.5", cim10_confidence="high", justification="Code K80.5 correspond à la lithiase du cholédoque", + raisonnement="1. ANALYSE CLINIQUE : La lithiase cholédoque est...", sources_rag=[ RAGSource(document="cim10", page=480, code="K80"), ], ) assert d.cim10_confidence == "high" assert d.justification is not None + assert d.raisonnement is not None + assert d.raisonnement.startswith("1. ANALYSE CLINIQUE") assert len(d.sources_rag) == 1 assert d.sources_rag[0].code == "K80" @@ -67,6 +71,7 @@ class TestDiagnosticExtended: data = d.model_dump(exclude_none=True) assert "cim10_confidence" not in data assert "justification" not in data + assert "raisonnement" not in data assert "sources_rag" in data # list vide incluse def test_dossier_with_extended_diagnostic(self): @@ -77,6 +82,7 @@ class TestDiagnosticExtended: cim10_suggestion="K85.1", cim10_confidence="high", justification="Confirmé par CIM-10 FR 2026", + raisonnement="Le DP K85.1 est le code le plus spécifique...", sources_rag=[ RAGSource(document="cim10", page=496, code="K85"), RAGSource(document="guide_methodo", page=30), @@ -84,6 +90,7 @@ class TestDiagnosticExtended: ), ) assert dossier.diagnostic_principal.cim10_confidence == "high" + assert dossier.diagnostic_principal.raisonnement is not None assert len(dossier.diagnostic_principal.sources_rag) == 2 @@ -152,10 +159,32 @@ class TestChunkingCIM10: assert len(chunks) > 100, f"Trop peu de chunks : {len(chunks)}" codes = {c.code for c in chunks if c.code} + # Codes parents 3-char assert "K85" in codes, "K85 (pancréatite) non trouvé" assert "K80" in codes, "K80 (lithiase biliaire) non trouvé" assert "E66" in codes, "E66 (obésité) non trouvé" + @pytest.mark.skipif( + not CIM10_PDF.exists(), + reason=f"PDF CIM-10 non trouvé : {CIM10_PDF}", + ) + def test_double_chunking_subcodes(self): + """Le double chunking produit des chunks sous-codes (X99.9) en plus des parents.""" + from src.medical.rag_index import _chunk_cim10 + + chunks = _chunk_cim10(CIM10_PDF) + codes = {c.code for c in chunks if c.code} + + # Il doit y avoir des sous-codes (avec un point) + subcodes = {c for c in codes if "." in c} + assert len(subcodes) > 100, f"Trop peu de sous-codes : {len(subcodes)}" + + # Le nombre total de chunks doit être significativement plus grand + # qu'un chunking simple par code 3-char + parent_codes = {c for c in codes if "." not in c} + assert len(chunks) > len(parent_codes) * 2, \ + f"Double chunking inefficace : {len(chunks)} chunks pour {len(parent_codes)} codes parents" + @pytest.mark.skipif( not CIM10_PDF.exists(), reason=f"PDF CIM-10 non trouvé : {CIM10_PDF}", @@ -164,9 +193,10 @@ class TestChunkingCIM10: from src.medical.rag_index import _chunk_cim10 chunks = _chunk_cim10(CIM10_PDF) - k85_chunks = [c for c in chunks if c.code == "K85"] - assert len(k85_chunks) >= 1 - assert "pancréatite" in k85_chunks[0].text.lower() or "pancreatite" in k85_chunks[0].text.lower() + k85_chunks = [c for c in chunks if c.code and c.code.startswith("K85")] + assert len(k85_chunks) >= 2, "Il devrait y avoir au moins un chunk parent K85 + des sous-codes" + texts_lower = " ".join(c.text.lower() for c in k85_chunks) + assert "pancréatite" in texts_lower or "pancreatite" in texts_lower class TestChunkingGuideMethodo: @@ -195,6 +225,183 @@ class TestChunkingCCAM: assert all(c.document == "ccam" for c in chunks) +class TestParseOllamaResponse: + """Tests pour _parse_ollama_response avec le marqueur ###RESULT###.""" + + def test_parse_with_marker(self): + from src.medical.rag_search import _parse_ollama_response + + raw = """1. ANALYSE CLINIQUE : La pancréatite aiguë biliaire est une inflammation... +2. CODES CANDIDATS : K85.0, K85.1, K85.9 +3. DISCRIMINATION : K85.1 est spécifique à l'origine biliaire +4. RÈGLE PMSI : Conforme pour un DP + +###RESULT### +{"code": "K85.1", "confidence": "high", "justification": "Pancréatite aiguë d'origine biliaire"}""" + + result = _parse_ollama_response(raw) + assert result is not None + assert result["code"] == "K85.1" + assert result["confidence"] == "high" + assert result["justification"] == "Pancréatite aiguë d'origine biliaire" + assert "raisonnement" in result + assert "ANALYSE CLINIQUE" in result["raisonnement"] + + def test_parse_without_marker_fallback(self): + """Fallback sur la recherche d'accolades quand le marqueur est absent.""" + from src.medical.rag_search import _parse_ollama_response + + raw = """Voici mon analyse... +{"code": "E66.0", "confidence": "medium", "justification": "Obésité due à un excès calorique"}""" + + result = _parse_ollama_response(raw) + assert result is not None + assert result["code"] == "E66.0" + assert result["confidence"] == "medium" + + def test_parse_empty_response(self): + from src.medical.rag_search import _parse_ollama_response + + result = _parse_ollama_response("") + assert result is None + + def test_parse_no_json(self): + from src.medical.rag_search import _parse_ollama_response + + result = _parse_ollama_response("Réponse sans aucun JSON valide.") + assert result is None + + def test_parse_invalid_json(self): + from src.medical.rag_search import _parse_ollama_response + + raw = """###RESULT### +{code: K85.1, invalid json}""" + result = _parse_ollama_response(raw) + assert result is None + + def test_parse_marker_with_raisonnement_containing_braces(self): + """Le raisonnement peut contenir des accolades (ex: listes, exemples).""" + from src.medical.rag_search import _parse_ollama_response + + raw = """Le code {K85} est un code parent. +Sous-codes : {K85.0, K85.1, K85.2, K85.3} + +###RESULT### +{"code": "K85.1", "confidence": "high", "justification": "Biliaire confirmé"}""" + + result = _parse_ollama_response(raw) + assert result is not None + assert result["code"] == "K85.1" + assert "raisonnement" in result + assert "{K85}" in result["raisonnement"] + + +class TestBuildPrompt: + """Tests pour le nouveau _build_prompt avec raisonnement structuré.""" + + def test_prompt_contains_diagnostic(self): + from src.medical.rag_search import _build_prompt + + sources = [{"document": "cim10", "code": "K85", "page": 1, "extrait": "K85 Pancréatite"}] + contexte = {"sexe": "F", "age": 43} + prompt = _build_prompt("Pancréatite aiguë biliaire", sources, contexte, est_dp=True) + + assert "Pancréatite aiguë biliaire" in prompt + assert "DP (diagnostic principal)" in prompt + assert "ANALYSE CLINIQUE" in prompt + assert "###RESULT###" in prompt + + def test_prompt_das_type(self): + from src.medical.rag_search import _build_prompt + + sources = [{"document": "cim10", "code": "E66", "page": 1, "extrait": "E66 Obésité"}] + contexte = {"sexe": "F", "age": 43} + prompt = _build_prompt("Obésité", sources, contexte, est_dp=False) + + assert "DAS (diagnostic associé significatif)" in prompt + + def test_prompt_enriched_context(self): + from src.medical.rag_search import _build_prompt + + sources = [{"document": "cim10", "code": "K85", "page": 1, "extrait": "K85"}] + contexte = { + "sexe": "F", + "age": 43, + "imc": 34.4, + "duree_sejour": 6, + "antecedents": ["HTA", "diabète type 2"], + "biologie_cle": [("Lipasémie", "850", True), ("CRP", "45", True)], + "imagerie": [("TDM abdominal", "pancréatite stade C Balthazar")], + "complications": ["éruption cutanée"], + "dp_texte": "Pancréatite aiguë biliaire", + } + prompt = _build_prompt("Éruption cutanée", sources, contexte, est_dp=False) + + assert "IMC 34.4" in prompt + assert "6 jours" in prompt + assert "HTA" in prompt + assert "Lipasémie" in prompt + assert "TDM abdominal" in prompt + assert "éruption cutanée" in prompt + assert "Pancréatite aiguë biliaire" in prompt + + +class TestSearchSimilar: + """Tests pour search_similar avec score minimum et priorisation CIM-10.""" + + def test_filters_low_scores(self): + """Les résultats avec score < 0.3 sont éliminés.""" + from src.medical.rag_search import search_similar + import numpy as np + + mock_metadata = [ + {"document": "cim10", "code": "K85", "page": 1, "extrait": "K85"}, + {"document": "cim10", "code": "K86", "page": 2, "extrait": "K86"}, + ] + + mock_index = MagicMock() + mock_index.ntotal = 2 + # Premier résultat score=0.9 (bon), second score=0.1 (sous le seuil) + mock_index.search.return_value = ( + np.array([[0.9, 0.1]], dtype=np.float32), + np.array([[0, 1]], dtype=np.int64), + ) + + with patch("src.medical.rag_index.get_index", return_value=(mock_index, mock_metadata)), \ + patch("src.medical.rag_search._get_embed_model") as mock_model: + mock_model.return_value.encode.return_value = np.array([[0.1] * 768], dtype=np.float32) + results = search_similar("pancréatite") + + assert len(results) == 1 + assert results[0]["code"] == "K85" + + def test_prioritizes_cim10(self): + """Les sources CIM-10 sont priorisées (au moins 6 sur 10).""" + from src.medical.rag_search import search_similar + import numpy as np + + # 8 sources CIM-10 + 8 sources guide_methodo, toutes avec bon score + mock_metadata = [] + for i in range(8): + mock_metadata.append({"document": "cim10", "code": f"K8{i}", "page": i, "extrait": f"K8{i}"}) + for i in range(8): + mock_metadata.append({"document": "guide_methodo", "page": i + 10, "extrait": f"Guide {i}"}) + + mock_index = MagicMock() + mock_index.ntotal = 16 + scores = np.array([[0.9 - i * 0.03 for i in range(16)]], dtype=np.float32) + indices = np.array([list(range(16))], dtype=np.int64) + mock_index.search.return_value = (scores, indices) + + with patch("src.medical.rag_index.get_index", return_value=(mock_index, mock_metadata)), \ + patch("src.medical.rag_search._get_embed_model") as mock_model: + mock_model.return_value.encode.return_value = np.array([[0.1] * 768], dtype=np.float32) + results = search_similar("pancréatite", top_k=10) + + cim10_count = sum(1 for r in results if r["document"] == "cim10") + assert cim10_count >= 6, f"Seulement {cim10_count} sources CIM-10 sur {len(results)}" + + class TestRAGSearchMocked: def test_search_similar_no_index(self): """search_similar retourne une liste vide si l'index n'existe pas.""" @@ -215,6 +422,7 @@ class TestRAGSearchMocked: assert diag.sources_rag == [] assert diag.justification is None + assert diag.raisonnement is None def test_enrich_diagnostic_with_sources_no_ollama(self): """Enrichissement avec sources FAISS mais sans Ollama.""" @@ -238,11 +446,11 @@ class TestRAGSearchMocked: assert len(diag.sources_rag) == 1 assert diag.sources_rag[0].document == "cim10" assert diag.sources_rag[0].code == "K85" - # Pas de justification (Ollama non disponible) assert diag.justification is None + assert diag.raisonnement is None def test_enrich_diagnostic_with_ollama(self): - """Enrichissement complet avec sources + Ollama.""" + """Enrichissement complet avec sources + Ollama + raisonnement.""" from src.medical.rag_search import enrich_diagnostic diag = Diagnostic(texte="Pancréatite aiguë biliaire") @@ -259,6 +467,7 @@ class TestRAGSearchMocked: "code": "K85.1", "confidence": "high", "justification": "Pancréatite aiguë d'origine biliaire = K85.1", + "raisonnement": "1. ANALYSE CLINIQUE : La pancréatite...", } with patch("src.medical.rag_search.search_similar", return_value=mock_sources), \ @@ -268,4 +477,122 @@ class TestRAGSearchMocked: assert diag.cim10_suggestion == "K85.1" assert diag.cim10_confidence == "high" assert diag.justification == "Pancréatite aiguë d'origine biliaire = K85.1" + assert diag.raisonnement == "1. ANALYSE CLINIQUE : La pancréatite..." assert len(diag.sources_rag) == 1 + + def test_enrich_diagnostic_est_dp_flag(self): + """Le flag est_dp est bien passé à _build_prompt.""" + from src.medical.rag_search import enrich_diagnostic + + diag = Diagnostic(texte="Obésité") + mock_sources = [ + {"document": "cim10", "page": 1, "code": "E66", "extrait": "E66 Obésité", "score": 0.9}, + ] + + with patch("src.medical.rag_search.search_similar", return_value=mock_sources), \ + patch("src.medical.rag_search._call_ollama", return_value=None) as mock_ollama, \ + patch("src.medical.rag_search._build_prompt", return_value="prompt") as mock_prompt: + enrich_diagnostic(diag, {"sexe": "F", "age": 43}, est_dp=False) + mock_prompt.assert_called_once_with("Obésité", mock_sources, {"sexe": "F", "age": 43}, est_dp=False) + + +class TestEnrichDossier: + """Tests pour enrich_dossier avec le contexte enrichi.""" + + def test_enriched_context(self): + """enrich_dossier passe le contexte enrichi (bio, imagerie, etc.).""" + from src.medical.rag_search import enrich_dossier + from src.config import Sejour, BiologieCle, Imagerie + + dossier = DossierMedical( + sejour=Sejour(sexe="F", age=43, duree_sejour=6, imc=34.4), + diagnostic_principal=Diagnostic(texte="Pancréatite aiguë biliaire"), + antecedents=["HTA", "diabète type 2"], + biologie_cle=[ + BiologieCle(test="Lipasémie", valeur="850", anomalie=True), + ], + imagerie=[ + Imagerie(type="TDM abdominal", conclusion="pancréatite stade C"), + ], + complications=["éruption cutanée"], + ) + + captured_contexts = [] + + def mock_enrich(diag, contexte, est_dp=True): + captured_contexts.append(contexte.copy()) + + with patch("src.medical.rag_search.enrich_diagnostic", side_effect=mock_enrich): + enrich_dossier(dossier) + + assert len(captured_contexts) == 1 # DP seulement (pas de DAS) + ctx = captured_contexts[0] + assert ctx["sexe"] == "F" + assert ctx["age"] == 43 + assert ctx["duree_sejour"] == 6 + assert ctx["imc"] == 34.4 + assert ctx["antecedents"] == ["HTA", "diabète type 2"] + assert ctx["biologie_cle"] == [("Lipasémie", "850", True)] + assert ctx["imagerie"] == [("TDM abdominal", "pancréatite stade C")] + assert ctx["complications"] == ["éruption cutanée"] + + def test_das_gets_dp_context(self): + """Les DAS reçoivent le texte du DP dans leur contexte.""" + from src.medical.rag_search import enrich_dossier + + dossier = DossierMedical( + diagnostic_principal=Diagnostic(texte="Pancréatite aiguë biliaire"), + diagnostics_associes=[ + Diagnostic(texte="Obésité"), + ], + ) + + captured = [] + + def mock_enrich(diag, contexte, est_dp=True): + captured.append({"texte": diag.texte, "est_dp": est_dp, "dp_texte": contexte.get("dp_texte")}) + + with patch("src.medical.rag_search.enrich_diagnostic", side_effect=mock_enrich): + enrich_dossier(dossier) + + assert len(captured) == 2 + # DP n'a pas dp_texte dans son contexte + assert captured[0]["est_dp"] is True + assert captured[0]["dp_texte"] is None + # DAS a dp_texte + assert captured[1]["est_dp"] is False + assert captured[1]["dp_texte"] == "Pancréatite aiguë biliaire" + + +class TestFormatContexte: + """Tests pour _format_contexte.""" + + def test_minimal_context(self): + from src.medical.rag_search import _format_contexte + + result = _format_contexte({}) + assert result == "Non précisé" + + def test_full_context(self): + from src.medical.rag_search import _format_contexte + + ctx = { + "sexe": "F", + "age": 43, + "imc": 34.4, + "duree_sejour": 6, + "antecedents": ["HTA", "diabète type 2"], + "biologie_cle": [("Lipasémie", "850", True), ("CRP", "45", True)], + "imagerie": [("TDM abdominal", "pancréatite stade C Balthazar")], + "complications": ["éruption cutanée"], + "dp_texte": "Pancréatite aiguë biliaire", + } + result = _format_contexte(ctx) + + assert "F, 43 ans, IMC 34.4" in result + assert "6 jours" in result + assert "HTA" in result + assert "Lipasémie 850" in result + assert "TDM abdominal" in result + assert "éruption cutanée" in result + assert "Pancréatite aiguë biliaire" in result