diff --git a/tests/test_dp_finalizer_ext.py b/tests/test_dp_finalizer_ext.py new file mode 100644 index 0000000..ae13ead --- /dev/null +++ b/tests/test_dp_finalizer_ext.py @@ -0,0 +1,340 @@ +"""Tests étendus pour src/medical/dp_finalizer.py. + +Couvre les cas non couverts dans test_dp_finalizer.py : +- R6 — Z-code non whitelisté remplacé par DAS +- Fonctions internes (_family3, _code_in_candidates, _has_strong_evidence) +- finalize_dp avec source Trackare détectée par evidence/reason +- Cas limites : codes None, candidats vides, Z whitelistés en R6 +""" + +from __future__ import annotations + +import pytest + +from src.config import DossierMedical, DPSelection, DPCandidate, Diagnostic, Sejour +from src.medical.dp_finalizer import ( + finalize_dp, + decide_dp_final, + _family3, + _code_in_candidates, + _has_strong_evidence, + _apply_r5, + _make_selection, +) + + +# ── Helpers ──────────────────────────────────────────────────────────── + + +def _sel( + code: str | None = None, + term: str = "", + verdict: str = "REVIEW", + confidence: str = "medium", + evidence: list[str] | None = None, + reason: str = "", + candidates: list[DPCandidate] | None = None, +) -> DPSelection: + return DPSelection( + chosen_code=code, + chosen_term=term or (code or ""), + verdict=verdict, + confidence=confidence, + evidence=evidence or [], + reason=reason, + candidates=candidates or [], + ) + + +def _cand(code: str, term: str = "", score: float = 0.0) -> DPCandidate: + return DPCandidate(index=0, term=term or code, code=code, score=score) + + +def _dossier( + doc_type: str = "crh", + dp_selection: DPSelection | None = None, + das: list[Diagnostic] | None = None, +) -> DossierMedical: + return DossierMedical( + document_type=doc_type, + sejour=Sejour(), + dp_selection=dp_selection, + diagnostics_associes=das or [], + ) + + +# =================================================================== +# Fonctions internes +# =================================================================== + + +class TestFamily3: + @pytest.mark.parametrize("code,expected", [ + ("K81.0", "K81"), + ("K81", "K81"), + ("I26.9", "I26"), + ("Z51.1", "Z51"), + (None, ""), + ("", ""), + ]) + def test_extraction(self, code, expected): + assert _family3(code) == expected + + +class TestCodeInCandidates: + def test_exact_match(self): + sel = _sel(candidates=[_cand("I26.9"), _cand("K85.1")]) + assert _code_in_candidates("I26.9", sel) is True + + def test_family3_match(self): + sel = _sel(candidates=[_cand("I26.9")]) + assert _code_in_candidates("I26.0", sel) is True # same family + + def test_no_match(self): + sel = _sel(candidates=[_cand("I26.9")]) + assert _code_in_candidates("K85.1", sel) is False + + def test_none_code(self): + sel = _sel(candidates=[_cand("I26.9")]) + assert _code_in_candidates(None, sel) is False + + def test_empty_candidates(self): + sel = _sel(candidates=[]) + assert _code_in_candidates("I26.9", sel) is False + + +class TestHasStrongEvidence: + def test_no_evidence(self): + sel = _sel(evidence=[]) + assert _has_strong_evidence(sel) is False + + def test_only_trackare_source(self): + """'Source: Trackare' seul n'est pas une preuve forte.""" + sel = _sel(evidence=["Source: Trackare (codage établissement)"]) + assert _has_strong_evidence(sel) is False + + def test_strong_evidence(self): + sel = _sel(evidence=["Diagnostic de sortie: «Embolie pulmonaire»"]) + assert _has_strong_evidence(sel) is True + + def test_mixed_evidence(self): + sel = _sel(evidence=[ + "Source: Trackare (codage établissement)", + "Conclusion CRH: «Pancréatite biliaire»", + ]) + assert _has_strong_evidence(sel) is True + + +class TestMakeSelection: + def test_preserves_source_candidates(self): + source = _sel(candidates=[_cand("I26.9", "EP", 5.0)]) + result = _make_selection( + code="I26.9", term="EP", verdict="CONFIRMED", + confidence="high", evidence=[], reason="test", + source_sel=source, + ) + assert len(result.candidates) == 1 + assert result.candidates[0].code == "I26.9" + + def test_no_source(self): + result = _make_selection( + code="I26.9", term="EP", verdict="CONFIRMED", + confidence="high", evidence=[], reason="test", + ) + assert result.candidates == [] + assert result.debug_scores is None + + +# =================================================================== +# R5 — _apply_r5 (direct call) +# =================================================================== + + +class TestApplyR5Direct: + def test_z_code_non_whitelisted_forced_review(self): + dp = _sel(code="Z95.5", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, None, allow_symptom_dp=False) + assert dp.verdict == "REVIEW" + assert flags.get("z_code_dp_review") is True + + def test_z_code_whitelisted_untouched(self): + dp = _sel(code="Z51.1", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, None, allow_symptom_dp=False) + assert dp.verdict == "CONFIRMED" + assert "z_code_dp_review" not in flags + + def test_r_code_with_non_r_candidate(self): + crh = _sel(candidates=[_cand("R06.0"), _cand("J18.9")]) + dp = _sel(code="R06.0", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, crh, allow_symptom_dp=False) + assert dp.verdict == "REVIEW" + assert flags.get("r_code_dp_with_non_r_candidate") is True + + def test_r_code_allowed_by_flag(self): + crh = _sel(candidates=[_cand("R06.0"), _cand("J18.9")]) + dp = _sel(code="R06.0", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, crh, allow_symptom_dp=True) + assert dp.verdict == "CONFIRMED" + + def test_review_dp_not_changed(self): + """R5 ne touche que les CONFIRMED.""" + dp = _sel(code="Z95.5", verdict="REVIEW", confidence="medium", evidence=[]) + dp, flags, alertes = _apply_r5(dp, None, allow_symptom_dp=False) + assert dp.verdict == "REVIEW" + assert "z_code_dp_review" not in flags # pas de flag car déjà REVIEW + + +# =================================================================== +# R6 — Z-code remplacé par DAS (via finalize_dp) +# =================================================================== + + +class TestR6ZcodeReplacedByDas: + def test_z_code_replaced_by_das(self): + """Z-code non whitelisté en DP → DAS high/medium non-R non-Z promu.""" + sel = _sel( + code="Z96.6", # non whitelisté + verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP Trackare", + ) + das = [ + Diagnostic(texte="Pancréatite", cim10_suggestion="K85.9", cim10_confidence="high"), + ] + d = _dossier(doc_type="trackare", dp_selection=sel, das=das) + finalize_dp(d) + + assert d.dp_final is not None + assert d.dp_final.chosen_code == "K85.9" + assert d.quality_flags.get("z_code_replaced_by_das") is True + assert any("K85.9" in a for a in d.alertes_codage) + + def test_z_code_whitelisted_not_replaced(self): + """Z-code whitelisté en DP → pas de remplacement.""" + sel = _sel( + code="Z51.1", # whitelisté + verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP Trackare", + ) + das = [ + Diagnostic(texte="Pancréatite", cim10_suggestion="K85.9", cim10_confidence="high"), + ] + d = _dossier(doc_type="trackare", dp_selection=sel, das=das) + finalize_dp(d) + + assert d.dp_final.chosen_code == "Z51.1" + assert "z_code_replaced_by_das" not in d.quality_flags + + def test_z_code_no_eligible_das(self): + """Z-code non whitelisté mais aucun DAS éligible → pas de remplacement.""" + sel = _sel( + code="Z96.6", + verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP Trackare", + ) + das = [ + Diagnostic(texte="Symptôme R", cim10_suggestion="R10.4", cim10_confidence="high"), + Diagnostic(texte="Z-code", cim10_suggestion="Z87.1", cim10_confidence="low"), + ] + d = _dossier(doc_type="trackare", dp_selection=sel, das=das) + finalize_dp(d) + + # R10.4 est R-code, Z87.1 est Z-code → aucun éligible + assert "z_code_replaced_by_das" not in d.quality_flags + + +# =================================================================== +# finalize_dp — détection source Trackare +# =================================================================== + + +class TestFinalizeDpSourceDetection: + def test_trackare_detected_by_document_type(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", evidence=[], reason="CRH DP") + d = _dossier(doc_type="trackare", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is not None + + def test_trackare_detected_by_reason(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", evidence=[], + reason="DP Trackare — source d'autorité") + d = _dossier(doc_type="crh", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is not None + + def test_trackare_detected_by_evidence(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP") + d = _dossier(doc_type="crh", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is not None + + def test_crh_only_when_no_trackare_signal(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", + evidence=["Diagnostic de sortie CRH"], + reason="CRH CONFIRMED") + d = _dossier(doc_type="crh", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is None + assert d.dp_crh_only is not None + + +# =================================================================== +# finalize_dp — pas de dp_selection +# =================================================================== + + +class TestFinalizeDpNoSelection: + def test_no_dp_selection(self): + """Dossier sans dp_selection → REVIEW, flag no_dp_source.""" + d = _dossier(doc_type="crh") + finalize_dp(d) + assert d.dp_final is not None + assert d.dp_final.verdict == "REVIEW" + assert d.quality_flags.get("no_dp_source") is True + + +# =================================================================== +# decide_dp_final — edge cases +# =================================================================== + + +class TestDecideDpFinalEdgeCases: + def test_trackare_z_code_whitelisted(self): + """Z03 (whitelisté) en Trackare-only → CONFIRMED.""" + trackare = _sel(code="Z03.8", verdict="CONFIRMED", + evidence=["Source: Trackare"]) + dp, flags, _ = decide_dp_final(trackare, None) + # Z03 is whitelisted + assert dp.verdict == "CONFIRMED" + + def test_trackare_empty_evidence_gets_default(self): + """Trackare-only sans evidence → evidence par défaut ajoutée.""" + trackare = _sel(code="K81.0", verdict="CONFIRMED", evidence=[]) + dp, flags, _ = decide_dp_final(trackare, None) + assert any("Trackare" in e for e in dp.evidence) + + def test_both_sources_trackare_review_from_selector(self): + """Si dp_selector a déjà mis REVIEW, le finalizer le respecte.""" + trackare = _sel(code="K81.0", verdict="REVIEW", + evidence=["Source: Trackare"]) + crh = _sel(code="K81.0", verdict="REVIEW", + evidence=["Diagnostic CRH"], + candidates=[_cand("K81.0")]) + dp, flags, _ = decide_dp_final(trackare, crh) + # R2 corrobore → CONFIRMED + assert dp.verdict == "CONFIRMED" + assert flags.get("trackare_confirmed_by_crh") is True + + def test_code_none_handling(self): + """Trackare avec code None → pas de crash.""" + trackare = _sel(code=None, verdict="REVIEW", evidence=["Source: Trackare"]) + crh = _sel(code="K81.0", verdict="CONFIRMED", + evidence=["Diagnostic de sortie CRH"]) + dp, flags, _ = decide_dp_final(trackare, crh) + # Code None ne commence pas par R, ne commence pas par Z + # Il ne matche pas CRH → R4 ambigu + assert dp is not None