280 lines
11 KiB
Python
280 lines
11 KiB
Python
"""Tests gold debug reports — sans mocks, données synthétiques.
|
|
|
|
Vérifie :
|
|
- Case report : structure JSON + top_candidates + match_eval
|
|
- Top-errors : tri correct (CONFIRMED/high en tête)
|
|
- CSV : headers présents + fichiers créés
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
|
|
from src.eval.gold_debug import (
|
|
build_case_report,
|
|
render_case_markdown,
|
|
write_case_report,
|
|
build_error_entry,
|
|
sort_error_entries,
|
|
write_top_errors_csv,
|
|
write_top_errors_md,
|
|
write_top_errors_jsonl,
|
|
select_dim_pack_cases,
|
|
write_dim_pack,
|
|
TOP_ERRORS_CSV_COLS,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures synthétiques
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_data(dp_code="D50", das_codes=None):
|
|
"""Construit un JSON pipeline minimal."""
|
|
das = []
|
|
for code in (das_codes or []):
|
|
das.append({"texte": f"diag {code}", "cim10_suggestion": code, "source": "edsnlp"})
|
|
return {
|
|
"document_type": "crh",
|
|
"diagnostic_principal": {"texte": "Anémie", "cim10_suggestion": dp_code, "source": "regex"},
|
|
"diagnostics_associes": das,
|
|
}
|
|
|
|
|
|
def _make_dp_selection(chosen_code="D50", verdict="REVIEW", confidence="medium",
|
|
candidates=None, delta=0.0):
|
|
"""Construit un dp_selection dict."""
|
|
cands = candidates or [
|
|
{"index": 0, "code": "D50", "term": "Anémie", "score": 4.0,
|
|
"section_strength": 3, "source": "regex", "score_details": {"section": 3, "confidence": 1},
|
|
"is_symptom_like": False, "is_comorbidity_like": False, "is_act_only": False},
|
|
{"index": 1, "code": "I25.1", "term": "SCA", "score": 4.0,
|
|
"section_strength": 1, "source": "llm_das", "score_details": {"section": 1, "confidence": 3},
|
|
"is_symptom_like": False, "is_comorbidity_like": False, "is_act_only": False},
|
|
]
|
|
return {
|
|
"chosen_code": chosen_code,
|
|
"chosen_term": "Anémie",
|
|
"verdict": verdict,
|
|
"confidence": confidence,
|
|
"reason": f"Écart {delta} < seuil 3.0, LLM désactivé",
|
|
"evidence": ["Scores proches : 4.0 vs 4.0"],
|
|
"candidates": cands,
|
|
"debug_scores": {"top1": 4.0, "top2": 4.0, "delta": delta},
|
|
}
|
|
|
|
|
|
def _make_gold_dict(code="I25.1", label="SCA", acceptable=None, family3=None):
|
|
return {
|
|
"dp_expected": {"code": code, "label": label},
|
|
"dp_acceptable_codes": acceptable or ["I25.1", "I25.5"],
|
|
"dp_acceptable_family3": family3 or ["I25"],
|
|
"allow_symptom_dp": False,
|
|
"confidence": "probable",
|
|
}
|
|
|
|
|
|
def _make_eval(strict=False, acceptable=False, family3=False, symptom=False):
|
|
return {
|
|
"case_id": "test_case",
|
|
"dp_expected_code": "I25.1",
|
|
"dp_expected_label": "SCA",
|
|
"chosen_code": "D50",
|
|
"confidence_gold": "probable",
|
|
"allow_symptom_dp": False,
|
|
"exact_match_strict": strict,
|
|
"exact_match_tolerant_codes": acceptable,
|
|
"family3_match_tolerant": family3,
|
|
"acceptable_match": acceptable or family3,
|
|
"symptom_not_allowed": symptom,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Case report
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBuildCaseReport:
|
|
def test_structure(self):
|
|
data = _make_data("D50", ["I25.1", "Z95.5"])
|
|
dp_sel = _make_dp_selection()
|
|
gold = _make_gold_dict()
|
|
ev = _make_eval()
|
|
|
|
report = build_case_report("74_test", data, dp_sel, gold, ev)
|
|
|
|
assert report["case_id"] == "74_test"
|
|
assert report["document_type"] == "crh"
|
|
assert report["prediction"]["chosen_code"] == "D50"
|
|
assert report["prediction"]["verdict"] == "REVIEW"
|
|
assert len(report["top_candidates"]) == 2
|
|
assert report["top_candidates"][0]["rank"] == 1
|
|
assert report["top_candidates"][0]["code"] == "D50"
|
|
assert report["match_eval"]["strict_match"] is False
|
|
assert report["match_eval"]["acceptable_match"] is False
|
|
assert report["gold"]["dp_expected"]["code"] == "I25.1"
|
|
|
|
def test_no_gold(self):
|
|
data = _make_data()
|
|
dp_sel = _make_dp_selection()
|
|
report = build_case_report("test_no_gold", data, dp_sel, None, None)
|
|
|
|
assert report["gold"] is None
|
|
assert report["match_eval"] is None
|
|
|
|
def test_render_markdown(self):
|
|
data = _make_data("D50", ["I25.1"])
|
|
dp_sel = _make_dp_selection()
|
|
gold = _make_gold_dict()
|
|
ev = _make_eval()
|
|
report = build_case_report("74_test", data, dp_sel, gold, ev)
|
|
|
|
md = render_case_markdown(report)
|
|
assert "# Case Debug — 74_test" in md
|
|
assert "D50" in md
|
|
assert "I25.1" in md
|
|
assert "Gold vs Prediction" in md
|
|
assert "Top candidats" in md
|
|
assert "Hypothèse bug" in md
|
|
|
|
def test_write_files(self, tmp_path):
|
|
data = _make_data()
|
|
dp_sel = _make_dp_selection()
|
|
report = build_case_report("test_write", data, dp_sel, None, None)
|
|
|
|
json_p, md_p = write_case_report(report, tmp_path)
|
|
assert json_p.exists()
|
|
assert md_p.exists()
|
|
loaded = json.loads(json_p.read_text())
|
|
assert loaded["case_id"] == "test_write"
|
|
|
|
def test_pool_stats(self):
|
|
data = _make_data("D50", ["I25.1", "Z95.5"])
|
|
dp_sel = _make_dp_selection()
|
|
report = build_case_report("test_pool", data, dp_sel, None, None)
|
|
|
|
assert report["pool_stats"]["raw_pool_size"] == 3 # 1 DP + 2 DAS
|
|
assert report["pool_stats"]["filtered_pool_size"] == 2 # 2 candidates
|
|
|
|
def test_review_reason_tag(self):
|
|
dp_sel = _make_dp_selection()
|
|
dp_sel["reason"] = "Aucun candidat DP identifié"
|
|
report = build_case_report("test_tag", _make_data(), dp_sel, None, None)
|
|
assert report["prediction"]["review_reason_tag"] == "no_candidates"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Top-errors sort
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTopErrors:
|
|
def _entry(self, case_id, acceptable, strict, verdict, confidence):
|
|
"""Helper pour créer un report puis un error entry."""
|
|
data = _make_data()
|
|
dp_sel = _make_dp_selection(verdict=verdict, confidence=confidence)
|
|
ev = _make_eval(strict=strict, acceptable=acceptable)
|
|
report = build_case_report(case_id, data, dp_sel, _make_gold_dict(), ev)
|
|
return build_error_entry(report)
|
|
|
|
def test_sort_acceptable_fail_first(self):
|
|
"""Acceptable FAIL trie avant acceptable OK."""
|
|
e_fail = self._entry("fail", False, False, "REVIEW", "medium")
|
|
e_ok = self._entry("ok", True, True, "REVIEW", "medium")
|
|
|
|
sorted_ = sort_error_entries([e_ok, e_fail])
|
|
assert sorted_[0]["case_id"] == "fail"
|
|
assert sorted_[1]["case_id"] == "ok"
|
|
|
|
def test_sort_confirmed_before_review(self):
|
|
"""CONFIRMED (dangereux) trie avant REVIEW dans les acceptable FAIL."""
|
|
e_confirmed = self._entry("confirmed", False, False, "CONFIRMED", "medium")
|
|
e_review = self._entry("review", False, False, "REVIEW", "medium")
|
|
|
|
sorted_ = sort_error_entries([e_review, e_confirmed])
|
|
assert sorted_[0]["case_id"] == "confirmed"
|
|
|
|
def test_sort_high_before_medium(self):
|
|
"""High confidence trie avant medium dans les CONFIRMED + acceptable FAIL."""
|
|
e_high = self._entry("high", False, False, "CONFIRMED", "high")
|
|
e_med = self._entry("med", False, False, "CONFIRMED", "medium")
|
|
|
|
sorted_ = sort_error_entries([e_med, e_high])
|
|
assert sorted_[0]["case_id"] == "high"
|
|
|
|
def test_csv_headers(self, tmp_path):
|
|
"""CSV contient tous les headers requis."""
|
|
e = self._entry("test", False, False, "REVIEW", "medium")
|
|
csv_p = tmp_path / "errors.csv"
|
|
write_top_errors_csv([e], csv_p)
|
|
|
|
with open(csv_p) as f:
|
|
reader = csv.DictReader(f)
|
|
assert set(TOP_ERRORS_CSV_COLS).issubset(set(reader.fieldnames or []))
|
|
|
|
def test_md_created(self, tmp_path):
|
|
e = self._entry("test", False, False, "REVIEW", "medium")
|
|
md_p = tmp_path / "errors.md"
|
|
write_top_errors_md([e], md_p)
|
|
assert md_p.exists()
|
|
content = md_p.read_text()
|
|
assert "Top erreurs" in content
|
|
assert "test" in content
|
|
|
|
def test_jsonl_created(self, tmp_path):
|
|
e = self._entry("test", False, False, "REVIEW", "medium")
|
|
jsonl_p = tmp_path / "errors.jsonl"
|
|
write_top_errors_jsonl([e], jsonl_p)
|
|
assert jsonl_p.exists()
|
|
lines = jsonl_p.read_text().strip().splitlines()
|
|
assert len(lines) == 1
|
|
loaded = json.loads(lines[0])
|
|
assert loaded["case_id"] == "test"
|
|
assert "_sort_key" not in loaded # internals not leaked
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DIM Pack
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDimPack:
|
|
def _report(self, case_id, verdict="REVIEW", acceptable_fail=False, has_symptom=False):
|
|
data = _make_data()
|
|
cands = [
|
|
{"index": 0, "code": "D50", "term": "Anémie", "score": 4.0,
|
|
"section_strength": 3, "score_details": {},
|
|
"is_symptom_like": has_symptom, "is_comorbidity_like": False, "is_act_only": False},
|
|
{"index": 1, "code": "I25.1", "term": "SCA", "score": 3.0,
|
|
"section_strength": 1, "score_details": {},
|
|
"is_symptom_like": False, "is_comorbidity_like": False, "is_act_only": False},
|
|
]
|
|
dp_sel = _make_dp_selection(verdict=verdict, candidates=cands)
|
|
match_eval = {"strict_match": not acceptable_fail,
|
|
"acceptable_match": not acceptable_fail,
|
|
"family3_match": False, "symptom_not_allowed": False}
|
|
return build_case_report(case_id, data, dp_sel, _make_gold_dict(), {"acceptable_match": not acceptable_fail})
|
|
|
|
def test_select_errors_first(self):
|
|
r_error = self._report("error", acceptable_fail=True)
|
|
r_ok = self._report("ok", acceptable_fail=False)
|
|
|
|
selected = select_dim_pack_cases([r_ok, r_error], 1)
|
|
assert len(selected) == 1
|
|
assert selected[0]["case_id"] == "error"
|
|
|
|
def test_write_pack(self, tmp_path):
|
|
r1 = self._report("case_1")
|
|
r2 = self._report("case_2")
|
|
|
|
csv_p, cases_dir = write_dim_pack([r1, r2], tmp_path)
|
|
assert csv_p.exists()
|
|
assert cases_dir.exists()
|
|
assert (cases_dir / "case_1.json").exists()
|
|
assert (cases_dir / "case_2.json").exists()
|
|
|
|
with open(csv_p) as f:
|
|
reader = csv.DictReader(f)
|
|
rows = list(reader)
|
|
assert len(rows) == 2
|
|
assert rows[0]["case_id"] == "case_1"
|