"""Smoke tests end-to-end sur PDFs CRH réels. Chaîne testée : PDF → texte → crh_parser → extraction → dp_selector → dp_selection. Les tests skip proprement si les PDFs ne sont pas présents localement. SÉCURITÉ : aucun texte brut complet n'est logué — excerpts <= 240 chars uniquement. """ from __future__ import annotations from pathlib import Path import pytest # --------------------------------------------------------------------------- # Chemins PDF réels (non versionnés, gitignored) # --------------------------------------------------------------------------- REAL_CRH_DIR = Path(__file__).resolve().parent.parent / "real_crh_pdfs" PDF_23066188 = REAL_CRH_DIR / "23066188.pdf" PDF_23080179 = REAL_CRH_DIR / "23080179.pdf" needs_pdf_23066188 = pytest.mark.skipif( not PDF_23066188.exists(), reason=f"PDF réel absent : {PDF_23066188}", ) needs_pdf_23080179 = pytest.mark.skipif( not PDF_23080179.exists(), reason=f"PDF réel absent : {PDF_23080179}", ) # --------------------------------------------------------------------------- # Helper : pipeline minimale offline (pas de LLM) # --------------------------------------------------------------------------- def _run_offline_pipeline(pdf_path: Path) -> tuple: """Exécute la chaîne CRH complète sans Ollama. Returns (raw_text, parsed, dossier) — excerpts seulement pour asserts. """ from src.extraction.pdf_extractor import extract_text from src.extraction.document_classifier import classify from src.extraction.crh_parser import parse_crh from src.anonymization.anonymizer import Anonymizer from src.medical.cim10_extractor import extract_medical_info raw_text = extract_text(pdf_path) assert len(raw_text) > 200, "PDF trop court ou vide" doc_type = classify(raw_text) assert doc_type == "crh", f"Attendu crh, obtenu {doc_type}" parsed = parse_crh(raw_text) anonymizer = Anonymizer(parsed_data=parsed) anon_text = anonymizer.anonymize(raw_text) # edsnlp optionnel edsnlp_result = None try: from src.medical.edsnlp_pipeline import analyze, is_available if is_available(): edsnlp_result = analyze(anon_text) except Exception: pass dossier = extract_medical_info( parsed_data=parsed, anonymized_text=anon_text, edsnlp_result=edsnlp_result, use_rag=False, raw_text=raw_text, ) return raw_text, parsed, dossier # =================================================================== # Test 1 : Case 23066188 — Méningite à entérovirus (DP clair) # =================================================================== @needs_pdf_23066188 class TestCase23066188: """CRH pédiatrie — méningite à entérovirus, DP sans ambiguïté.""" @pytest.fixture(autouse=True) def setup(self): self.raw_text, self.parsed, self.dossier = _run_offline_pipeline(PDF_23066188) def test_document_classified_crh(self): assert self.dossier.document_type == "crh" def test_sections_contain_meningite(self): """Le parser CRH doit trouver au moins une section mentionnant méningite.""" sections = self.parsed.get("sections", {}) contenu = self.parsed.get("contenu_medical", "") all_text = " ".join(sections.values()) + " " + contenu low = all_text.lower() assert "méningite" in low or "meningite" in low, ( f"'méningite' introuvable dans sections/contenu (excerpt: {all_text[:240]})" ) def test_sections_contain_enterovirus(self): """Entérovirus doit apparaître dans le contenu médical.""" sections = self.parsed.get("sections", {}) contenu = self.parsed.get("contenu_medical", "") all_text = " ".join(sections.values()) + " " + contenu low = all_text.lower() assert "entérovirus" in low or "enterovirus" in low, ( f"'entérovirus' introuvable (excerpt: {all_text[:240]})" ) def test_pool_contains_a87(self): """Le pool de candidats doit contenir un code famille A87.""" sel = self.dossier.dp_selection assert sel is not None, "dp_selection est None" codes = [c.code or "" for c in sel.candidates] has_a87 = any(c.startswith("A87") for c in codes) # Fallback tolérant : vérifier aussi le DP extrait dp = self.dossier.diagnostic_principal dp_code = dp.cim10_suggestion if dp else "" assert has_a87 or (dp_code or "").startswith("A87"), ( f"Aucun candidat A87.* — codes trouvés: {codes}, DP: {dp_code}" ) def test_dp_code_family_a87(self): """Le DP choisi doit être dans la famille A87.""" sel = self.dossier.dp_selection assert sel is not None chosen = sel.chosen_code or "" # Tolérant : accepter family3 match assert chosen.startswith("A87") or chosen.startswith("A87"), ( f"DP choisi = {chosen}, attendu A87.*" ) def test_evidence_if_confirmed(self): """Si verdict CONFIRMED, evidence ne doit pas être vide (règle A1).""" sel = self.dossier.dp_selection if sel and sel.verdict == "CONFIRMED": assert len(sel.evidence) > 0, "CONFIRMED sans evidence — violation règle A1" # =================================================================== # Test 2 : Case 23080179 — DLBCL masqué par adénopathie (piège) # =================================================================== @needs_pdf_23080179 class TestCase23080179: """CRH onco — lymphome DLBCL masqué par adénopathie R59.0.""" @pytest.fixture(autouse=True) def setup(self): self.raw_text, self.parsed, self.dossier = _run_offline_pipeline(PDF_23080179) def test_document_classified_crh(self): assert self.dossier.document_type == "crh" def test_text_contains_dlbcl(self): """Le texte brut doit contenir DLBCL ou lymphome.""" low = self.raw_text[:5000].lower() assert "dlbcl" in low or "lymphome" in low, ( f"'DLBCL'/'lymphome' introuvable dans les 5000 premiers chars" ) def test_conclusion_contains_valym(self): """La conclusion doit mentionner le protocole VALYM.""" conclusion = self.parsed.get("sections", {}).get("conclusion", "") assert "VALYM" in conclusion or "valym" in conclusion.lower(), ( f"'VALYM' introuvable dans conclusion (excerpt: {conclusion[:240]})" ) def test_pool_contains_c83(self): """Le pool doit contenir un candidat famille C83 (lymphome).""" sel = self.dossier.dp_selection assert sel is not None, "dp_selection est None" codes = [c.code or "" for c in sel.candidates] dp = self.dossier.diagnostic_principal dp_code = dp.cim10_suggestion if dp else "" all_codes = codes + [dp_code] has_c83 = any(c.startswith("C83") for c in all_codes) # Tolérant : accepter aussi C85 (lymphome non hodgkinien) ou C84 has_lymphoma = any( c.startswith(("C83", "C84", "C85")) for c in all_codes ) assert has_c83 or has_lymphoma, ( f"Aucun candidat C83/C84/C85 — codes: {all_codes}" ) def test_dp_not_symptom_r59(self): """Le DP ne doit PAS être R59.0 (symptôme adénopathie). Avec le patch alias DLBCL→C83.3, le scoring doit placer C83.3 devant R59.0 grâce au bonus conclusion/alias. """ sel = self.dossier.dp_selection dp = self.dossier.diagnostic_principal chosen = (sel.chosen_code if sel else None) or (dp.cim10_suggestion if dp else "") assert not chosen.startswith("R59"), ( f"Pipeline code encore R59.0 (symptôme) au lieu de C83.* — " f"le patch alias aurait dû corriger ça" ) def test_c83_is_top1(self): """C83.* (DLBCL) doit être le candidat #1 grâce au bonus alias.""" sel = self.dossier.dp_selection assert sel is not None, "dp_selection est None" assert len(sel.candidates) > 0, "Aucun candidat" top1 = sel.candidates[0] assert (top1.code or "").startswith("C83"), ( f"Top1 = {top1.code} ({top1.label}), attendu C83.* — " f"tous: {[(c.code, c.score) for c in sel.candidates[:3]]}" ) def test_evidence_if_confirmed(self): """Si verdict CONFIRMED, evidence ne doit pas être vide.""" sel = self.dossier.dp_selection if sel and sel.verdict == "CONFIRMED": assert len(sel.evidence) > 0, "CONFIRMED sans evidence — violation règle A1"