From 5ba39035694828b5701a8811cc8da16dccd0d94a Mon Sep 17 00:00:00 2001 From: dom Date: Sun, 8 Mar 2026 11:58:28 +0100 Subject: [PATCH 1/4] feat: tests pour src/viewer/helpers.py (77 tests) Couvre les filtres Jinja2 (confidence_badge, severity_badge, cma_level_badge, decision_badge, human_where, format_doc_name, etc.), les fonctions de statistiques (compute_group_stats, compute_dashboard_stats, compute_dim_synthesis), et les utilitaires (_date_to_iso, _sort_qc_alerts, _compute_jours_restants). Utilise pytest.mark.parametrize pour les cas multiples. Co-Authored-By: Claude Opus 4.6 --- tests/test_viewer_helpers.py | 526 +++++++++++++++++++++++++++++++++++ 1 file changed, 526 insertions(+) create mode 100644 tests/test_viewer_helpers.py diff --git a/tests/test_viewer_helpers.py b/tests/test_viewer_helpers.py new file mode 100644 index 0000000..4e3d1a2 --- /dev/null +++ b/tests/test_viewer_helpers.py @@ -0,0 +1,526 @@ +"""Tests unitaires pour src/viewer/helpers.py. + +Couvre : filtres Jinja2, helpers de statistiques, fonctions utilitaires. +Pas de dépendance à Ollama ni au système de fichiers (sauf tmpdir). +""" + +from __future__ import annotations + +import re +from collections import Counter +from unittest.mock import patch, MagicMock + +import pytest +from markupsafe import Markup + +from src.config import ( + DossierMedical, + Diagnostic, + DPSelection, + DPCandidate, + ActeCCAM, + CodeDecision, + GHMEstimation, + VetoReport, + VetoIssue, + CompletudeDossier, + ControleCPAM, + FinancialImpact, + Sejour, +) +from src.viewer.helpers import ( + compute_group_stats, + compute_dashboard_stats, + compute_dim_synthesis, + confidence_badge, + confidence_label, + severity_badge, + cma_level_badge, + format_duration, + format_dossier_name, + format_doc_name, + decision_badge, + format_cpam_text, + human_where, + _date_to_iso, + _sort_qc_alerts, + _compute_jours_restants, +) + + +# ── Helpers ──────────────────────────────────────────────────────────── + + +def _item(dossier: DossierMedical, path_rel: str = "test/file.json") -> dict: + """Crée un item de scan_dossiers minimal.""" + return {"name": "file", "path_rel": path_rel, "dossier": dossier} + + +def _diag( + texte: str = "Diagnostic", + code: str | None = None, + confidence: str | None = None, + est_cma: bool | None = None, + decision: CodeDecision | None = None, + cim10_final: str | None = None, +) -> Diagnostic: + return Diagnostic( + texte=texte, + cim10_suggestion=code, + cim10_confidence=confidence, + est_cma=est_cma, + cim10_decision=decision, + cim10_final=cim10_final, + ) + + +# =================================================================== +# confidence_badge / confidence_label +# =================================================================== + + +class TestConfidenceBadge: + def test_high(self): + result = confidence_badge("high") + assert isinstance(result, Markup) + assert "Haute" in result + assert "#16a34a" in result + + def test_medium(self): + result = confidence_badge("medium") + assert "Moyenne" in result + + def test_low(self): + result = confidence_badge("low") + assert "Basse" in result + assert "#dc2626" in result + + def test_none_returns_empty(self): + assert confidence_badge(None) == "" + + def test_empty_string_returns_empty(self): + assert confidence_badge("") == "" + + def test_unknown_value_uses_default_colors(self): + result = confidence_badge("unknown") + assert "unknown" in result + assert "#6b7280" in result # default foreground + + +class TestConfidenceLabel: + @pytest.mark.parametrize("value,expected", [ + ("high", "Haute"), + ("medium", "Moyenne"), + ("low", "Basse"), + (None, ""), + ("", ""), + ("autre", "autre"), + ]) + def test_labels(self, value, expected): + assert confidence_label(value) == expected + + +# =================================================================== +# cma_level_badge +# =================================================================== + + +class TestCmaLevelBadge: + def test_none_returns_empty(self): + assert cma_level_badge(None) == "" + + def test_zero_returns_empty(self): + assert cma_level_badge(0) == "" + + def test_negative_returns_empty(self): + assert cma_level_badge(-1) == "" + + @pytest.mark.parametrize("level", [1, 2, 3, 4]) + def test_valid_levels(self, level): + result = cma_level_badge(level) + assert isinstance(result, Markup) + assert f"CMA {level}" in result + + def test_level_above_4_capped(self): + """Un niveau > 4 est cappé à 4.""" + result = cma_level_badge(5) + assert "CMA 4" in result + + +# =================================================================== +# format_dossier_name / format_doc_name +# =================================================================== + + +class TestFormatDossierName: + def test_racine(self): + assert format_dossier_name("racine") == "Non classés" + + def test_normal_name(self): + assert format_dossier_name("190_23139234") == "190_23139234" + + +class TestFormatDocName: + @pytest.mark.parametrize("name,expected", [ + ("190_fusionne_cim10", "Fusionné"), + ("CRH_23139234_cim10", "CRH"), + ("CRO_23139234_cim10", "CRO"), + ("crh_23139234", "CRH"), + ("trackare-01295620", "Trackare"), + ("ANAPATH_23103383", "Anapath"), + ("some_other_doc", "some_other_doc"), + ]) + def test_doc_names(self, name, expected): + assert format_doc_name(name) == expected + + +# =================================================================== +# decision_badge +# =================================================================== + + +class TestDecisionBadge: + def test_none_returns_empty(self): + assert decision_badge(None) == "" + + def test_keep_returns_empty(self): + """KEEP n'a pas de badge (cas par défaut).""" + dec = CodeDecision(action="KEEP") + assert decision_badge(dec) == "" + + @pytest.mark.parametrize("action,label", [ + ("DOWNGRADE", "Rétrogradé"), + ("REMOVE", "Supprimé"), + ("RULED_OUT", "Écarté"), + ("NEED_INFO", "Preuve manquante"), + ("PROMOTE_DP", "Promu en DP"), + ]) + def test_action_labels(self, action, label): + dec = CodeDecision(action=action) + result = decision_badge(dec) + assert label in result + + def test_dict_input(self): + """Accepte un dict en plus d'un CodeDecision.""" + result = decision_badge({"action": "REMOVE"}) + assert "Supprimé" in result + + def test_dict_keep(self): + result = decision_badge({"action": "KEEP"}) + assert result == "" + + def test_unknown_action(self): + """Action inconnue affiche le texte brut.""" + result = decision_badge({"action": "CUSTOM_ACTION"}) + assert "CUSTOM_ACTION" in result + + +# =================================================================== +# human_where +# =================================================================== + + +class TestHumanWhere: + @pytest.mark.parametrize("value,expected", [ + (None, "Global"), + ("", "Global"), + ("diagnostic_principal", "Diagnostic Principal"), + ("diagnostics_associes", "Diagnostics Associés"), + ("sejour", "Séjour"), + ("diagnostics_associes[0]", "DAS n°1"), + ("diagnostics_associes[5]", "DAS n°6"), + ("actes_ccam[0]", "Acte n°1"), + ("actes_ccam[2]", "Acte n°3"), + ("autre_chose", "autre_chose"), + ]) + def test_conversions(self, value, expected): + assert human_where(value) == expected + + +# =================================================================== +# _date_to_iso +# =================================================================== + + +class TestDateToIso: + def test_valid_date(self): + assert _date_to_iso("15/03/2025") == "2025-03-15" + + def test_invalid_format(self): + assert _date_to_iso("2025-03-15") == "" + + def test_empty_string(self): + assert _date_to_iso("") == "" + + def test_single_part(self): + assert _date_to_iso("15") == "" + + +# =================================================================== +# _sort_qc_alerts +# =================================================================== + + +class TestSortQcAlerts: + def test_critical_first(self): + alerts = [ + "Recommandation : ajouter un DAS", + "Erreur critique : code invalide", + "Code justifié solidement", + ] + sorted_alerts = _sort_qc_alerts(alerts) + assert sorted_alerts[0] == "Erreur critique : code invalide" + assert sorted_alerts[-1] == "Code justifié solidement" + + def test_empty_list(self): + assert _sort_qc_alerts([]) == [] + + def test_dp_prioritized_within_tier(self): + """Les alertes mentionnant le DP sont priorisées dans leur tier.""" + alerts = [ + "Redondance dans DAS", + "Redondance DP diagnostic principal suspect", + ] + sorted_alerts = _sort_qc_alerts(alerts) + assert "DP" in sorted_alerts[0] or "diagnostic principal" in sorted_alerts[0] + + +# =================================================================== +# _compute_jours_restants +# =================================================================== + + +class TestComputeJoursRestants: + def test_no_date(self): + ctrl = MagicMock() + ctrl.date_limite_reponse = None + assert _compute_jours_restants(ctrl) is None + + def test_invalid_date(self): + ctrl = MagicMock() + ctrl.date_limite_reponse = "not-a-date" + assert _compute_jours_restants(ctrl) is None + + def test_valid_date_returns_int(self): + ctrl = MagicMock() + ctrl.date_limite_reponse = "15/03/2030" + result = _compute_jours_restants(ctrl) + assert isinstance(result, int) + assert result > 0 # well into the future + + +# =================================================================== +# compute_group_stats +# =================================================================== + + +class TestComputeGroupStats: + def test_empty_items(self): + stats = compute_group_stats([]) + assert stats == {"das_count": 0, "alertes_count": 0, "actes_count": 0, "cma_count": 0} + + def test_cma_counted_on_dp(self): + """CMA sur le DP est compté.""" + d = DossierMedical( + diagnostic_principal=_diag("DP", "K85.9", est_cma=True), + ) + stats = compute_group_stats([_item(d)]) + assert stats["cma_count"] == 1 + + def test_multiple_items(self): + d1 = DossierMedical( + diagnostics_associes=[_diag("DAS1", "I10", est_cma=True)], + actes_ccam=[ActeCCAM(texte="Acte1")], + alertes_codage=["Alerte"], + ) + d2 = DossierMedical( + diagnostics_associes=[_diag("DAS2", "E11.9"), _diag("DAS3", "J18.9", est_cma=True)], + ) + stats = compute_group_stats([_item(d1), _item(d2)]) + assert stats["das_count"] == 3 + assert stats["actes_count"] == 1 + assert stats["alertes_count"] == 1 + assert stats["cma_count"] == 2 + + +# =================================================================== +# compute_dashboard_stats +# =================================================================== + + +class TestComputeDashboardStats: + def test_empty_groups(self): + stats = compute_dashboard_stats({}) + assert stats["total_dossiers"] == 0 + assert stats["total_fichiers"] == 0 + assert stats["top_codes"] == [] + assert stats["processing_time_avg"] == 0 + + def test_single_dossier(self): + d = DossierMedical( + diagnostic_principal=_diag("DP", "K85.9", confidence="high"), + diagnostics_associes=[_diag("DAS", code="I10")], + actes_ccam=[ActeCCAM(texte="Acte")], + alertes_codage=["Alerte"], + processing_time_s=10.5, + ) + groups = {"grp1": [_item(d)]} + stats = compute_dashboard_stats(groups) + assert stats["total_dossiers"] == 1 + assert stats["total_fichiers"] == 1 + assert stats["total_das"] == 1 + assert stats["total_actes"] == 1 + assert stats["total_alertes"] == 1 + assert stats["processing_time_avg"] == 10.5 + + def test_dp_validity_absent_when_no_dp(self): + d = DossierMedical() + groups = {"grp": [_item(d)]} + stats = compute_dashboard_stats(groups) + assert stats["dp_validity"].get("absent", 0) == 1 + + def test_ghm_types_counted(self): + d = DossierMedical( + ghm_estimation=GHMEstimation(type_ghm="C", severite=3), + ) + groups = {"grp": [_item(d)]} + stats = compute_dashboard_stats(groups) + assert stats["ghm_types"].get("C") == 1 + assert stats["severity_dist"].get(3) == 1 + + +# =================================================================== +# compute_dim_synthesis +# =================================================================== + + +class TestComputeDimSynthesis: + def test_empty_groups(self): + result = compute_dim_synthesis({}) + assert result["dp"]["total"] == 0 + assert result["das"]["total"] == 0 + assert result["veto"]["avg_score"] == 0 + + def test_dp_confirmed(self): + d = DossierMedical( + dp_final=DPSelection( + chosen_code="K85.9", + verdict="CONFIRMED", + confidence="high", + evidence=["Test evidence"], + ), + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["dp"]["total"] == 1 + assert result["dp"]["confirmed"] == 1 + + def test_dp_review_creates_alert(self): + d = DossierMedical( + dp_final=DPSelection( + chosen_code="R10.4", + verdict="REVIEW", + confidence="medium", + reason="Ambigu", + evidence=[], + ), + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["dp"]["review"] == 1 + assert len(result["alertes"]["review"]) == 1 + + def test_das_decisions_counted(self): + d = DossierMedical( + diagnostics_associes=[ + _diag("DAS1", "I10", decision=CodeDecision(action="KEEP")), + _diag("DAS2", "D69.6", decision=CodeDecision(action="RULED_OUT")), + _diag("DAS3", "D50", decision=CodeDecision(action="DOWNGRADE", final_code="D64.9")), + _diag("DAS4", "Z87.1", decision=CodeDecision(action="REMOVE")), + ], + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["das"]["total"] == 4 + assert result["das"]["kept"] == 1 + assert result["das"]["ruled_out"] == 1 + assert result["das"]["downgraded"] == 1 + assert result["das"]["removed"] == 1 + + def test_das_no_decision_counted_as_kept(self): + d = DossierMedical( + diagnostics_associes=[_diag("DAS", "I10")], # no decision + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["das"]["kept"] == 1 + + def test_veto_report_aggregated(self): + d = DossierMedical( + veto_report=VetoReport( + verdict="FAIL", + score_contestabilite=40, + issues=[ + VetoIssue(veto="VETO-01", severity="HARD", where="dp", message="test"), + ], + ), + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["veto"]["avg_score"] == 40 + assert result["veto"]["distribution"].get("FAIL") == 1 + assert len(result["alertes"]["fail"]) == 1 + + def test_completude_indefendable(self): + d = DossierMedical( + completude=CompletudeDossier( + verdict_global="indefendable", + score_global=20, + documents_manquants=["CRO", "Anapath"], + ), + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["completude"]["distribution"].get("indefendable") == 1 + assert len(result["alertes"]["indefendable"]) == 1 + + def test_taux_modification_zero_when_no_das(self): + result = compute_dim_synthesis({}) + assert result["das"]["taux_modification"] == 0 + + def test_cpam_impact(self): + d = DossierMedical( + controles_cpam=[ + ControleCPAM( + numero_ogc=1, + financial_impact=FinancialImpact( + impact_estime_euros=1500, + priorite="haute", + ), + validation_dim="valide", + ), + ], + ) + groups = {"grp": [_item(d)]} + result = compute_dim_synthesis(groups) + assert result["cpam"]["total"] == 1 + assert result["cpam"]["impact_total"] == 1500 + assert result["cpam"]["by_priority"].get("haute") == 1 + assert result["cpam"]["by_status"].get("valide") == 1 + + +# =================================================================== +# format_cpam_text (additional edge cases beyond test_viewer.py) +# =================================================================== + + +class TestFormatCpamTextExtra: + def test_blank_lines_produce_br(self): + result = format_cpam_text("Line 1\n\nLine 2") + assert "
" in result + + def test_list_closed_at_end(self): + """A list not followed by text should still be closed.""" + result = format_cpam_text("- Item 1\n- Item 2") + assert result.count("") == 1 From caaa6deb144f1da49392a7e1454a4346108cfa33 Mon Sep 17 00:00:00 2001 From: dom Date: Sun, 8 Mar 2026 11:59:57 +0100 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20tests=20=C3=A9tendus=20pour=20src/q?= =?UTF-8?q?uality/decision=5Fengine.py=20(53=20tests)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Couvre les règles non couvertes dans le fichier existant : - Fonctions internes (_norm, _first_float, _parse_normal_range, etc.) - Nettoyage hiérarchique (.9 vs code spécifique) - D50 sans preuve martiale → downgrade D64.9 - D69.6 thrombopénie vs plaquettes normales → RULED_OUT - decision_summaries pour toutes les actions (DOWNGRADE, REMOVE, RULED_OUT, NEED_INFO) - Détection analytes (sodium, potassium) - _age_band et cas limites de scoring Co-Authored-By: Claude Opus 4.6 --- tests/test_decision_engine_ext.py | 493 ++++++++++++++++++++++++++++++ 1 file changed, 493 insertions(+) create mode 100644 tests/test_decision_engine_ext.py diff --git a/tests/test_decision_engine_ext.py b/tests/test_decision_engine_ext.py new file mode 100644 index 0000000..90a94aa --- /dev/null +++ b/tests/test_decision_engine_ext.py @@ -0,0 +1,493 @@ +"""Tests étendus pour src/quality/decision_engine.py. + +Couvre les règles non couvertes dans test_decision_engine.py : +- Nettoyage hiérarchique (.9 vs spécifique) +- D50 sans preuve martiale → downgrade D64.9 +- D69.6 thrombopénie vs plaquettes normales +- Bio rules génériques (YAML-driven) +- Fonctions internes (_norm, _first_float, _parse_normal_range, etc.) +- decision_summaries (DOWNGRADE, REMOVE, RULED_OUT, NEED_INFO) +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from src.config import ( + BiologieCle, + CodeDecision, + Diagnostic, + DossierMedical, + PreuveClinique, + Sejour, + Traitement, + VetoReport, + VetoIssue, +) +from src.quality.decision_engine import ( + _norm, + _first_float, + _parse_normal_range, + _parse_float, + _is_sodium_test, + _is_potassium_test, + _das_promotion_score, + _age_band, + apply_decisions, + decision_summaries, +) + + +# ── Helpers ──────────────────────────────────────────────────────────── + + +def _dossier( + dp_code: str | None = None, + das: list[Diagnostic] | None = None, + bio: list[BiologieCle] | None = None, + traitements: list[Traitement] | None = None, + age: int | None = None, + sexe: str | None = None, +) -> DossierMedical: + dp = None + if dp_code: + dp = Diagnostic(texte="DP", cim10_suggestion=dp_code) + return DossierMedical( + sejour=Sejour(age=age, sexe=sexe), + diagnostic_principal=dp, + diagnostics_associes=das or [], + biologie_cle=bio or [], + traitements_sortie=traitements or [], + ) + + +def _diag( + texte: str, + code: str, + confidence: str = "high", + source: str = "trackare", + status: str | None = None, + source_excerpt: str | None = None, + preuves: list[PreuveClinique] | None = None, +) -> Diagnostic: + return Diagnostic( + texte=texte, + cim10_suggestion=code, + cim10_confidence=confidence, + source=source, + status=status, + source_excerpt=source_excerpt, + preuves_cliniques=preuves or [], + ) + + +def _bio(test: str, valeur: str | None = None, valeur_num: float | None = None, + quality: str | None = None) -> BiologieCle: + return BiologieCle(test=test, valeur=valeur, valeur_num=valeur_num, quality=quality) + + +# =================================================================== +# Fonctions internes +# =================================================================== + + +class TestNorm: + def test_basic_normalization(self): + assert _norm(" Hello World ") == "hello world" + + def test_accent_removal(self): + assert _norm("Hémoglobine élevée") == "hemoglobine elevee" + + def test_apostrophe_normalization(self): + assert _norm("l\u2019anémie") == "l'anemie" + + +class TestFirstFloat: + @pytest.mark.parametrize("text,expected", [ + ("CRP 180 mg/L", 180.0), + ("Hb 7,5 g/dL", 7.5), + ("-3.2 mmol/L", -3.2), + ("pas de nombre", None), + ("", None), + ]) + def test_extraction(self, text, expected): + assert _first_float(text) == expected + + +class TestParseNormalRange: + def test_standard_range(self): + lo, hi = _parse_normal_range("Plaquettes 250 G/L [N: 150-450]") + assert lo == 150.0 + assert hi == 450.0 + + def test_decimal_range(self): + lo, hi = _parse_normal_range("K+ 4.2 mmol/L [N: 3,5 - 5,0]") + assert lo == 3.5 + assert hi == 5.0 + + def test_no_range(self): + lo, hi = _parse_normal_range("CRP 180 mg/L") + assert lo is None + assert hi is None + + +class TestParseFloat: + @pytest.mark.parametrize("value,expected", [ + ("4.5", 4.5), + ("4,5", 4.5), + (" 7.2 ", 7.2), + ("abc", None), + (None, None), + ("", None), + ("-3.1", -3.1), + ]) + def test_parsing(self, value, expected): + assert _parse_float(value) == expected + + +class TestIsSodiumTest: + @pytest.mark.parametrize("test,expected", [ + ("Sodium", True), + ("sodium", True), + ("Natrémie", True), + ("Na+", True), + ("Na", True), + ("Potassium", False), + ("CRP", False), + ]) + def test_detection(self, test, expected): + assert _is_sodium_test(test) == expected + + +class TestIsPotassiumTest: + @pytest.mark.parametrize("test,expected", [ + ("Potassium", True), + ("potassium", True), + ("Kaliémie", True), + ("K+", True), + ("K", True), + ("Sodium", False), + ("CRP", False), + ]) + def test_detection(self, test, expected): + assert _is_potassium_test(test) == expected + + +class TestAgeBand: + def test_adult(self): + d = _dossier(age=65) + assert _age_band(d, {"age_bands": {"adult_min_years": 18}}) == "adult" + + def test_child(self): + d = _dossier(age=5) + assert _age_band(d, {"age_bands": {"adult_min_years": 18}}) == "child" + + def test_unknown_age(self): + d = _dossier() + assert _age_band(d, {}) == "unknown" + + def test_boundary_18(self): + d = _dossier(age=18) + assert _age_band(d, {"age_bands": {"adult_min_years": 18}}) == "adult" + + +# =================================================================== +# Nettoyage hiérarchique (.9 vs spécifique) +# =================================================================== + + +@patch("src.quality.decision_engine.load_reference_ranges", return_value={}) +@patch("src.quality.decision_engine.load_bio_rules", return_value={}) +@patch("src.quality.decision_engine.rule_enabled", return_value=True) +@patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")) +class TestHierarchyCleanup: + + def test_generic_removed_when_specific_exists(self, mock_val, mock_rule, mock_bio, mock_ref): + """K81.9 (générique) retiré quand K81.0 (spécifique) est présent.""" + dp = Diagnostic(texte="Cholécystite", cim10_suggestion="K81.0") + das1 = _diag("Cholécystite SAI", "K81.9") + das2 = _diag("HTA", "I10") + dossier = _dossier(dp_code=None, das=[das1, das2]) + dossier.diagnostic_principal = dp + + apply_decisions(dossier) + + # K81.9 doit être supprimé + k81_9 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "K81.9"][0] + assert k81_9.cim10_decision is not None + assert k81_9.cim10_decision.action == "REMOVE" + assert "RULE-HIERARCHY-CLEANUP" in k81_9.cim10_decision.applied_rules + assert k81_9.cim10_final is None + + def test_no_cleanup_when_no_specific(self, mock_val, mock_rule, mock_bio, mock_ref): + """K81.9 seul (pas de K81.x spécifique) → conservé.""" + das1 = _diag("Cholécystite SAI", "K81.9") + das2 = _diag("HTA", "I10") + dp = Diagnostic(texte="DP", cim10_suggestion="J18.9") + dossier = _dossier(das=[das1, das2]) + dossier.diagnostic_principal = dp + + apply_decisions(dossier) + + k81_9 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "K81.9"][0] + # Pas de décision REMOVE + assert k81_9.cim10_decision is None or k81_9.cim10_decision.action != "REMOVE" + + +# =================================================================== +# D50 sans preuve martiale → downgrade D64.9 +# =================================================================== + + +@patch("src.quality.decision_engine.load_reference_ranges", return_value={}) +@patch("src.quality.decision_engine.load_bio_rules", return_value={}) +class TestD50NeedsIron: + + @patch("src.quality.decision_engine.rule_enabled", return_value=True) + @patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")) + def test_d50_downgraded_without_iron_evidence(self, mock_val, mock_rule, mock_bio, mock_ref): + """D50 sans preuve ferritine/martial → downgrade D64.9.""" + das = _diag( + "Anémie ferriprive", + "D50", + preuves=[ + PreuveClinique(type="biologie", element="Hb 9.5 g/dL", interpretation="Anémie confirmée"), + ], + ) + dossier = _dossier(dp_code="K85.9", das=[das], age=65) + + apply_decisions(dossier) + + d50 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "D50"][0] + assert d50.cim10_final == "D64.9" + assert d50.cim10_decision.action == "DOWNGRADE" + assert "RULE-D50-NEEDS-IRON" in d50.cim10_decision.applied_rules + + @patch("src.quality.decision_engine.rule_enabled", return_value=True) + @patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")) + def test_d50_kept_with_ferritin_evidence(self, mock_val, mock_rule, mock_bio, mock_ref): + """D50 avec preuve de ferritine → conservé.""" + das = _diag( + "Anémie ferriprive", + "D50", + preuves=[ + PreuveClinique(type="biologie", element="Hb 9.5 g/dL", interpretation="Anémie confirmée"), + PreuveClinique(type="biologie", element="Ferritine 8 ng/mL", interpretation="Carence martiale"), + ], + ) + dossier = _dossier(dp_code="K85.9", das=[das], age=65) + + apply_decisions(dossier) + + d50 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "D50"][0] + assert d50.cim10_decision is None or d50.cim10_decision.action != "DOWNGRADE" + + @patch("src.quality.decision_engine.rule_enabled", return_value=True) + @patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")) + def test_d50_kept_with_martial_treatment(self, mock_val, mock_rule, mock_bio, mock_ref): + """D50 avec traitement martial → conservé (preuve par le traitement).""" + das = _diag( + "Anémie ferriprive", + "D50", + preuves=[ + PreuveClinique(type="biologie", element="Hb 8.0 g/dL", interpretation="Anémie confirmée"), + ], + ) + dossier = _dossier( + dp_code="K85.9", + das=[das], + age=65, + traitements=[Traitement(medicament="Fer intraveineux", posologie="200mg")], + ) + + apply_decisions(dossier) + + d50 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "D50"][0] + assert d50.cim10_decision is None or d50.cim10_decision.action != "DOWNGRADE" + + def test_d50_rule_disabled(self, mock_bio, mock_ref): + """RULE-D50-NEEDS-IRON désactivée → D50 conservé.""" + def _rule_selective(rule_id): + return rule_id != "RULE-D50-NEEDS-IRON" + + with patch("src.quality.decision_engine.rule_enabled", side_effect=_rule_selective): + with patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")): + das = _diag( + "Anémie ferriprive", + "D50", + preuves=[ + PreuveClinique(type="biologie", element="Hb 9.5", interpretation="Anémie confirmée"), + ], + ) + dossier = _dossier(dp_code="K85.9", das=[das], age=65) + apply_decisions(dossier) + d50 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "D50"][0] + assert d50.cim10_decision is None or d50.cim10_decision.action != "DOWNGRADE" + + +# =================================================================== +# D69.6 thrombopénie vs plaquettes normales +# =================================================================== + + +@patch("src.quality.decision_engine.load_bio_rules", return_value={}) +class TestD696PltNormal: + + @patch("src.quality.decision_engine.load_reference_ranges", return_value={ + "age_bands": {"adult_min_years": 18}, + "fallback_ranges": {"adult": {"platelets": {"low": 150, "high": 450}}}, + "safe_zones_unknown_age": {"platelets_ruled_out_low": 170}, + }) + @patch("src.quality.decision_engine.rule_enabled", return_value=True) + @patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")) + def test_d696_ruled_out_with_normal_platelets(self, mock_val, mock_rule, mock_ref, mock_bio): + """D69.6 avec plaquettes normales → RULED_OUT.""" + das = _diag("Thrombopénie", "D69.6") + bio = [_bio("Plaquettes", "250 G/L [N: 150-450]", valeur_num=250.0)] + dossier = _dossier(dp_code="K85.9", das=[das], bio=bio, age=65) + + apply_decisions(dossier) + + d696 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "D69.6"][0] + assert d696.cim10_decision is not None + assert d696.cim10_decision.action == "RULED_OUT" + assert d696.status == "ruled_out" + assert d696.cim10_final is None + + @patch("src.quality.decision_engine.load_reference_ranges", return_value={ + "age_bands": {"adult_min_years": 18}, + "fallback_ranges": {"adult": {"platelets": {"low": 150, "high": 450}}}, + "safe_zones_unknown_age": {"platelets_ruled_out_low": 170}, + }) + @patch("src.quality.decision_engine.rule_enabled", return_value=True) + @patch("src.quality.decision_engine.cim10_validate", return_value=(True, "label")) + def test_d696_kept_with_low_platelets(self, mock_val, mock_rule, mock_ref, mock_bio): + """D69.6 avec plaquettes basses → conservé.""" + das = _diag("Thrombopénie", "D69.6") + bio = [_bio("Plaquettes", "80 G/L", valeur_num=80.0)] + dossier = _dossier(dp_code="K85.9", das=[das], bio=bio, age=65) + + apply_decisions(dossier) + + d696 = [d for d in dossier.diagnostics_associes if d.cim10_suggestion == "D69.6"][0] + assert d696.cim10_decision is None or d696.cim10_decision.action != "RULED_OUT" + + +# =================================================================== +# _das_promotion_score — cas supplémentaires +# =================================================================== + + +class TestDasPromotionScoreExtra: + def test_empty_code(self): + """Un diagnostic sans code final a un score minimal.""" + diag = Diagnostic(texte="Test", cim10_final="") + score = _das_promotion_score(diag) + # pas de lettre → pertinence=2, mais specificite=0 + assert score[2] == 0 + + def test_none_confidence(self): + """Confiance None → score minimal.""" + diag = Diagnostic(texte="Test", cim10_final="K85.9", cim10_confidence=None) + score = _das_promotion_score(diag) + assert score[1] == 1 # default low + + +# =================================================================== +# decision_summaries — toutes les actions +# =================================================================== + + +class TestDecisionSummaries: + + def test_keep_no_output(self): + """KEEP ne produit pas de résumé.""" + das = Diagnostic(texte="HTA", cim10_suggestion="I10", cim10_decision=CodeDecision(action="KEEP")) + dossier = DossierMedical(diagnostics_associes=[das]) + lines = decision_summaries(dossier) + assert len(lines) == 0 + + def test_downgrade_summary(self): + das = Diagnostic( + texte="Anémie", + cim10_suggestion="D50", + cim10_decision=CodeDecision( + action="DOWNGRADE", + final_code="D64.9", + downgraded_from="D50", + applied_rules=["RULE-D50-NEEDS-IRON"], + needs_info=["Bilan martial disponible ?"], + ), + ) + dossier = DossierMedical(diagnostics_associes=[das]) + lines = decision_summaries(dossier) + assert any("D50" in l and "D64.9" in l for l in lines) + assert any("besoin_info" in l for l in lines) + + def test_remove_summary(self): + das = Diagnostic( + texte="Cholécystite SAI", + cim10_suggestion="K81.9", + cim10_decision=CodeDecision( + action="REMOVE", + applied_rules=["RULE-HIERARCHY-CLEANUP"], + ), + ) + dossier = DossierMedical(diagnostics_associes=[das]) + lines = decision_summaries(dossier) + assert any("K81.9" in l and "supprimé" in l for l in lines) + + def test_ruled_out_summary(self): + das = Diagnostic( + texte="Thrombopénie", + cim10_suggestion="D69.6", + cim10_decision=CodeDecision( + action="RULED_OUT", + applied_rules=["RULE-D69.6-PLT-NORMAL"], + reason="Plaquettes normales", + ), + ) + dossier = DossierMedical(diagnostics_associes=[das]) + lines = decision_summaries(dossier) + assert any("D69.6" in l and "écarté" in l for l in lines) + assert any("raison" in l for l in lines) + + def test_need_info_summary(self): + das = Diagnostic( + texte="Hyponatrémie", + cim10_suggestion="E87.1", + cim10_decision=CodeDecision( + action="NEED_INFO", + applied_rules=["RULE-HYPONATREMIA-MISSING"], + reason="Sodium non extrait", + needs_info=["Valeurs de sodium ?"], + ), + ) + dossier = DossierMedical(diagnostics_associes=[das]) + lines = decision_summaries(dossier) + assert any("NEED_INFO" in l for l in lines) + assert any("besoin_info" in l for l in lines) + + def test_no_diagnostics(self): + """Dossier sans diagnostics → liste vide.""" + dossier = DossierMedical() + lines = decision_summaries(dossier) + assert lines == [] + + def test_dp_decision_included(self): + """Décision sur le DP est incluse dans le résumé.""" + dp = Diagnostic( + texte="DP", + cim10_suggestion="D50", + cim10_decision=CodeDecision( + action="DOWNGRADE", + final_code="D64.9", + downgraded_from="D50", + applied_rules=["RULE-D50-NEEDS-IRON"], + ), + ) + dossier = DossierMedical(diagnostic_principal=dp) + lines = decision_summaries(dossier) + assert any("diagnostic_principal" in l for l in lines) From dcee7c960cb6147aee7f6d982e050d6c1657a5e3 Mon Sep 17 00:00:00 2001 From: dom Date: Sun, 8 Mar 2026 12:01:04 +0100 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20tests=20=C3=A9tendus=20pour=20src/m?= =?UTF-8?q?edical/dp=5Ffinalizer.py=20(34=20tests)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Couvre les cas non couverts dans test_dp_finalizer.py : - R6 Z-code non whitelisté remplacé par DAS - Fonctions internes (_family3, _code_in_candidates, _has_strong_evidence) - _apply_r5 en appel direct (Z whitelisté, R-code avec candidat non-R) - Détection source Trackare par document_type/reason/evidence - Cas limites (code None, candidats vides, pas de dp_selection) Co-Authored-By: Claude Opus 4.6 --- tests/test_dp_finalizer_ext.py | 340 +++++++++++++++++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100644 tests/test_dp_finalizer_ext.py diff --git a/tests/test_dp_finalizer_ext.py b/tests/test_dp_finalizer_ext.py new file mode 100644 index 0000000..ae13ead --- /dev/null +++ b/tests/test_dp_finalizer_ext.py @@ -0,0 +1,340 @@ +"""Tests étendus pour src/medical/dp_finalizer.py. + +Couvre les cas non couverts dans test_dp_finalizer.py : +- R6 — Z-code non whitelisté remplacé par DAS +- Fonctions internes (_family3, _code_in_candidates, _has_strong_evidence) +- finalize_dp avec source Trackare détectée par evidence/reason +- Cas limites : codes None, candidats vides, Z whitelistés en R6 +""" + +from __future__ import annotations + +import pytest + +from src.config import DossierMedical, DPSelection, DPCandidate, Diagnostic, Sejour +from src.medical.dp_finalizer import ( + finalize_dp, + decide_dp_final, + _family3, + _code_in_candidates, + _has_strong_evidence, + _apply_r5, + _make_selection, +) + + +# ── Helpers ──────────────────────────────────────────────────────────── + + +def _sel( + code: str | None = None, + term: str = "", + verdict: str = "REVIEW", + confidence: str = "medium", + evidence: list[str] | None = None, + reason: str = "", + candidates: list[DPCandidate] | None = None, +) -> DPSelection: + return DPSelection( + chosen_code=code, + chosen_term=term or (code or ""), + verdict=verdict, + confidence=confidence, + evidence=evidence or [], + reason=reason, + candidates=candidates or [], + ) + + +def _cand(code: str, term: str = "", score: float = 0.0) -> DPCandidate: + return DPCandidate(index=0, term=term or code, code=code, score=score) + + +def _dossier( + doc_type: str = "crh", + dp_selection: DPSelection | None = None, + das: list[Diagnostic] | None = None, +) -> DossierMedical: + return DossierMedical( + document_type=doc_type, + sejour=Sejour(), + dp_selection=dp_selection, + diagnostics_associes=das or [], + ) + + +# =================================================================== +# Fonctions internes +# =================================================================== + + +class TestFamily3: + @pytest.mark.parametrize("code,expected", [ + ("K81.0", "K81"), + ("K81", "K81"), + ("I26.9", "I26"), + ("Z51.1", "Z51"), + (None, ""), + ("", ""), + ]) + def test_extraction(self, code, expected): + assert _family3(code) == expected + + +class TestCodeInCandidates: + def test_exact_match(self): + sel = _sel(candidates=[_cand("I26.9"), _cand("K85.1")]) + assert _code_in_candidates("I26.9", sel) is True + + def test_family3_match(self): + sel = _sel(candidates=[_cand("I26.9")]) + assert _code_in_candidates("I26.0", sel) is True # same family + + def test_no_match(self): + sel = _sel(candidates=[_cand("I26.9")]) + assert _code_in_candidates("K85.1", sel) is False + + def test_none_code(self): + sel = _sel(candidates=[_cand("I26.9")]) + assert _code_in_candidates(None, sel) is False + + def test_empty_candidates(self): + sel = _sel(candidates=[]) + assert _code_in_candidates("I26.9", sel) is False + + +class TestHasStrongEvidence: + def test_no_evidence(self): + sel = _sel(evidence=[]) + assert _has_strong_evidence(sel) is False + + def test_only_trackare_source(self): + """'Source: Trackare' seul n'est pas une preuve forte.""" + sel = _sel(evidence=["Source: Trackare (codage établissement)"]) + assert _has_strong_evidence(sel) is False + + def test_strong_evidence(self): + sel = _sel(evidence=["Diagnostic de sortie: «Embolie pulmonaire»"]) + assert _has_strong_evidence(sel) is True + + def test_mixed_evidence(self): + sel = _sel(evidence=[ + "Source: Trackare (codage établissement)", + "Conclusion CRH: «Pancréatite biliaire»", + ]) + assert _has_strong_evidence(sel) is True + + +class TestMakeSelection: + def test_preserves_source_candidates(self): + source = _sel(candidates=[_cand("I26.9", "EP", 5.0)]) + result = _make_selection( + code="I26.9", term="EP", verdict="CONFIRMED", + confidence="high", evidence=[], reason="test", + source_sel=source, + ) + assert len(result.candidates) == 1 + assert result.candidates[0].code == "I26.9" + + def test_no_source(self): + result = _make_selection( + code="I26.9", term="EP", verdict="CONFIRMED", + confidence="high", evidence=[], reason="test", + ) + assert result.candidates == [] + assert result.debug_scores is None + + +# =================================================================== +# R5 — _apply_r5 (direct call) +# =================================================================== + + +class TestApplyR5Direct: + def test_z_code_non_whitelisted_forced_review(self): + dp = _sel(code="Z95.5", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, None, allow_symptom_dp=False) + assert dp.verdict == "REVIEW" + assert flags.get("z_code_dp_review") is True + + def test_z_code_whitelisted_untouched(self): + dp = _sel(code="Z51.1", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, None, allow_symptom_dp=False) + assert dp.verdict == "CONFIRMED" + assert "z_code_dp_review" not in flags + + def test_r_code_with_non_r_candidate(self): + crh = _sel(candidates=[_cand("R06.0"), _cand("J18.9")]) + dp = _sel(code="R06.0", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, crh, allow_symptom_dp=False) + assert dp.verdict == "REVIEW" + assert flags.get("r_code_dp_with_non_r_candidate") is True + + def test_r_code_allowed_by_flag(self): + crh = _sel(candidates=[_cand("R06.0"), _cand("J18.9")]) + dp = _sel(code="R06.0", verdict="CONFIRMED", confidence="high", evidence=[]) + dp, flags, alertes = _apply_r5(dp, crh, allow_symptom_dp=True) + assert dp.verdict == "CONFIRMED" + + def test_review_dp_not_changed(self): + """R5 ne touche que les CONFIRMED.""" + dp = _sel(code="Z95.5", verdict="REVIEW", confidence="medium", evidence=[]) + dp, flags, alertes = _apply_r5(dp, None, allow_symptom_dp=False) + assert dp.verdict == "REVIEW" + assert "z_code_dp_review" not in flags # pas de flag car déjà REVIEW + + +# =================================================================== +# R6 — Z-code remplacé par DAS (via finalize_dp) +# =================================================================== + + +class TestR6ZcodeReplacedByDas: + def test_z_code_replaced_by_das(self): + """Z-code non whitelisté en DP → DAS high/medium non-R non-Z promu.""" + sel = _sel( + code="Z96.6", # non whitelisté + verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP Trackare", + ) + das = [ + Diagnostic(texte="Pancréatite", cim10_suggestion="K85.9", cim10_confidence="high"), + ] + d = _dossier(doc_type="trackare", dp_selection=sel, das=das) + finalize_dp(d) + + assert d.dp_final is not None + assert d.dp_final.chosen_code == "K85.9" + assert d.quality_flags.get("z_code_replaced_by_das") is True + assert any("K85.9" in a for a in d.alertes_codage) + + def test_z_code_whitelisted_not_replaced(self): + """Z-code whitelisté en DP → pas de remplacement.""" + sel = _sel( + code="Z51.1", # whitelisté + verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP Trackare", + ) + das = [ + Diagnostic(texte="Pancréatite", cim10_suggestion="K85.9", cim10_confidence="high"), + ] + d = _dossier(doc_type="trackare", dp_selection=sel, das=das) + finalize_dp(d) + + assert d.dp_final.chosen_code == "Z51.1" + assert "z_code_replaced_by_das" not in d.quality_flags + + def test_z_code_no_eligible_das(self): + """Z-code non whitelisté mais aucun DAS éligible → pas de remplacement.""" + sel = _sel( + code="Z96.6", + verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP Trackare", + ) + das = [ + Diagnostic(texte="Symptôme R", cim10_suggestion="R10.4", cim10_confidence="high"), + Diagnostic(texte="Z-code", cim10_suggestion="Z87.1", cim10_confidence="low"), + ] + d = _dossier(doc_type="trackare", dp_selection=sel, das=das) + finalize_dp(d) + + # R10.4 est R-code, Z87.1 est Z-code → aucun éligible + assert "z_code_replaced_by_das" not in d.quality_flags + + +# =================================================================== +# finalize_dp — détection source Trackare +# =================================================================== + + +class TestFinalizeDpSourceDetection: + def test_trackare_detected_by_document_type(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", evidence=[], reason="CRH DP") + d = _dossier(doc_type="trackare", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is not None + + def test_trackare_detected_by_reason(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", evidence=[], + reason="DP Trackare — source d'autorité") + d = _dossier(doc_type="crh", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is not None + + def test_trackare_detected_by_evidence(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", + evidence=["Source: Trackare (codage établissement)"], + reason="DP") + d = _dossier(doc_type="crh", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is not None + + def test_crh_only_when_no_trackare_signal(self): + sel = _sel(code="K81.0", verdict="CONFIRMED", + evidence=["Diagnostic de sortie CRH"], + reason="CRH CONFIRMED") + d = _dossier(doc_type="crh", dp_selection=sel) + finalize_dp(d) + assert d.dp_trackare is None + assert d.dp_crh_only is not None + + +# =================================================================== +# finalize_dp — pas de dp_selection +# =================================================================== + + +class TestFinalizeDpNoSelection: + def test_no_dp_selection(self): + """Dossier sans dp_selection → REVIEW, flag no_dp_source.""" + d = _dossier(doc_type="crh") + finalize_dp(d) + assert d.dp_final is not None + assert d.dp_final.verdict == "REVIEW" + assert d.quality_flags.get("no_dp_source") is True + + +# =================================================================== +# decide_dp_final — edge cases +# =================================================================== + + +class TestDecideDpFinalEdgeCases: + def test_trackare_z_code_whitelisted(self): + """Z03 (whitelisté) en Trackare-only → CONFIRMED.""" + trackare = _sel(code="Z03.8", verdict="CONFIRMED", + evidence=["Source: Trackare"]) + dp, flags, _ = decide_dp_final(trackare, None) + # Z03 is whitelisted + assert dp.verdict == "CONFIRMED" + + def test_trackare_empty_evidence_gets_default(self): + """Trackare-only sans evidence → evidence par défaut ajoutée.""" + trackare = _sel(code="K81.0", verdict="CONFIRMED", evidence=[]) + dp, flags, _ = decide_dp_final(trackare, None) + assert any("Trackare" in e for e in dp.evidence) + + def test_both_sources_trackare_review_from_selector(self): + """Si dp_selector a déjà mis REVIEW, le finalizer le respecte.""" + trackare = _sel(code="K81.0", verdict="REVIEW", + evidence=["Source: Trackare"]) + crh = _sel(code="K81.0", verdict="REVIEW", + evidence=["Diagnostic CRH"], + candidates=[_cand("K81.0")]) + dp, flags, _ = decide_dp_final(trackare, crh) + # R2 corrobore → CONFIRMED + assert dp.verdict == "CONFIRMED" + assert flags.get("trackare_confirmed_by_crh") is True + + def test_code_none_handling(self): + """Trackare avec code None → pas de crash.""" + trackare = _sel(code=None, verdict="REVIEW", evidence=["Source: Trackare"]) + crh = _sel(code="K81.0", verdict="CONFIRMED", + evidence=["Diagnostic de sortie CRH"]) + dp, flags, _ = decide_dp_final(trackare, crh) + # Code None ne commence pas par R, ne commence pas par Z + # Il ne matche pas CRH → R4 ambigu + assert dp is not None From aed5c87bc32e13048c21b39a04cfcbd552399f58 Mon Sep 17 00:00:00 2001 From: dom Date: Sun, 8 Mar 2026 12:01:47 +0100 Subject: [PATCH 4/4] feat: endpoint /health avec tests (8 tests) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ajoute GET /health retournant un JSON avec : - status: "ok" - version: "2.1.0" - ollama: true/false (connectivité testée avec timeout 2s) - timestamp: ISO 8601 UTC Tests couvrent : format JSON, champs requis, Ollama joignable/injoignable, format timestamp ISO, type booléen du champ ollama. Co-Authored-By: Claude Opus 4.6 --- src/viewer/app.py | 17 ++++++++ tests/test_health_endpoint.py | 79 +++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 tests/test_health_endpoint.py diff --git a/src/viewer/app.py b/src/viewer/app.py index a78d99d..3cb97cf 100644 --- a/src/viewer/app.py +++ b/src/viewer/app.py @@ -616,4 +616,21 @@ def create_app() -> Flask: metrics=metrics, total_selection=len(selection), groups=groups) + # --- Health check endpoint --- + @app.route("/health") + def health(): + from datetime import datetime, timezone + ollama_ok = False + try: + r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=2) + ollama_ok = r.status_code == 200 + except Exception: + pass + return jsonify({ + "status": "ok", + "version": "2.1.0", + "ollama": ollama_ok, + "timestamp": datetime.now(timezone.utc).isoformat(), + }) + return app diff --git a/tests/test_health_endpoint.py b/tests/test_health_endpoint.py new file mode 100644 index 0000000..3eb46df --- /dev/null +++ b/tests/test_health_endpoint.py @@ -0,0 +1,79 @@ +"""Tests pour le endpoint /health du viewer Flask.""" + +import json +import os +from unittest.mock import patch, MagicMock + +import pytest +import requests + +from src.viewer.app import create_app + + +@pytest.fixture +def app(): + with patch.dict(os.environ, {"T2A_DEMO_USER": "", "T2A_DEMO_PASS": ""}): + app = create_app() + app.config["TESTING"] = True + yield app + + +@pytest.fixture +def client(app): + return app.test_client() + + +class TestHealthEndpoint: + + def test_health_returns_200(self, client): + response = client.get("/health") + assert response.status_code == 200 + + def test_health_returns_json(self, client): + response = client.get("/health") + data = response.get_json() + assert data is not None + assert data["status"] == "ok" + + def test_health_contains_required_fields(self, client): + response = client.get("/health") + data = response.get_json() + assert "status" in data + assert "version" in data + assert "ollama" in data + assert "timestamp" in data + + def test_health_version_format(self, client): + response = client.get("/health") + data = response.get_json() + assert data["version"] == "2.1.0" + + def test_health_timestamp_is_iso(self, client): + from datetime import datetime + response = client.get("/health") + data = response.get_json() + # Should parse without error + ts = datetime.fromisoformat(data["timestamp"]) + assert ts is not None + + @patch("src.viewer.app.requests.get") + def test_health_ollama_reachable(self, mock_get, client): + """Quand Ollama répond, ollama=True.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_get.return_value = mock_resp + response = client.get("/health") + data = response.get_json() + assert data["ollama"] is True + + @patch("src.viewer.app.requests.get", side_effect=requests.ConnectionError("refused")) + def test_health_ollama_unreachable(self, mock_get, client): + """Quand Ollama est injoignable, ollama=False.""" + response = client.get("/health") + data = response.get_json() + assert data["ollama"] is False + + def test_health_ollama_is_bool(self, client): + response = client.get("/health") + data = response.get_json() + assert isinstance(data["ollama"], bool)