Nouveau module pipeline/zones_config.py : charge les zones d'extraction depuis un fichier zones_config.json (coordonnées relatives 0-1), avec fallback sur les constantes Python. Config partagée entre : - pipeline/extract.py (crop colonne Recodage) - pipeline/checkboxes.py (cases Accord/Désaccord) Zones configurables aujourd'hui (page recueil) : - codage_reco (crop zonal pour le second passage VLM) - accord_checkbox / desaccord_checkbox (densité de pixels) Mode "🔧 Calibration zones" ajouté dans pipeline/ui_overlay.py : - Sélection d'un PDF de référence (idéalement bien cadré) - Canvas interactif (streamlit-drawable-canvas) avec les zones existantes pré-dessinées en rouge - Dessin/déplacement/redimensionnement à la souris - Saisie d'un nom et description par zone - Sauvegarde en JSON (ou OGC_ZONES_CONFIG si défini) Permet au métier (Khalid) de recalibrer les zones sans toucher au code, par exemple si le formulaire ATIH évolue ou si les scans sont d'un autre établissement. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
492 lines
17 KiB
Python
492 lines
17 KiB
Python
"""Interface Streamlit de review / annotation des extractions OGC (V2).
|
|
|
|
Usages :
|
|
1. **Visualisation** : image + champs structurés + JSON brut côte-à-côte,
|
|
pour chaque page du dossier (recueil, concertations, preuves…).
|
|
2. **Correction & gold set** : éditer chaque champ, sauvegarder dans gold/<nom>.json.
|
|
3. **Badges de validation ATIH** : chaque code médical est marqué ✓/✗ avec la
|
|
suggestion de correction s'il existe une correction Levenshtein ≤ 1.
|
|
|
|
Lancement (depuis la racine du projet) :
|
|
streamlit run pipeline/ui_overlay.py
|
|
|
|
Ou indirectement via run_overlay.sh.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from copy import deepcopy
|
|
from pathlib import Path
|
|
|
|
# Assurer l'accès à `pipeline.*` quand streamlit lance ce fichier
|
|
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
if str(_REPO_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(_REPO_ROOT))
|
|
|
|
import streamlit as st
|
|
from PIL import Image
|
|
|
|
from pipeline.ingest import pdf_to_images
|
|
from pipeline.zones_config import load_config, save_config, DEFAULT_CONFIG_PATH
|
|
|
|
try:
|
|
from streamlit_drawable_canvas import st_canvas
|
|
_HAS_CANVAS = True
|
|
except ImportError:
|
|
_HAS_CANVAS = False
|
|
|
|
|
|
# ============================================================
|
|
# Configuration
|
|
# ============================================================
|
|
|
|
PDF_DIR = Path("2018 CARC")
|
|
EXTRACT_DIR = Path("output/v2")
|
|
GOLD_DIR = Path("gold")
|
|
GOLD_DIR.mkdir(exist_ok=True)
|
|
|
|
# Ordre des pages selon le type
|
|
PAGE_ORDER = ["recueil", "concertation_med", "hospitalisation",
|
|
"preuves", "concertation_2", "concertation_1"]
|
|
PAGE_LABEL = {
|
|
"recueil": "p1 — Recueil",
|
|
"concertation_med": "p2 — Concertation médicale",
|
|
"hospitalisation": "p3 — Hospitalisation (manuscrit)",
|
|
"preuves": "p4 — Éléments de preuve",
|
|
"concertation_2": "p5 — Concertation 2/2 (décision)",
|
|
"concertation_1": "p6 — Concertation 1/2 (argumentaire)",
|
|
}
|
|
|
|
# Champs éditables par type de page
|
|
PAGE_FIELDS = {
|
|
"recueil": [
|
|
("En-tête", [
|
|
"etablissement", "finess", "date_debut_controle",
|
|
"n_ogc", "n_champ", "dates_sejour",
|
|
]),
|
|
("Codage Établissement", [
|
|
"codage_etab.dp", "codage_etab.dp_libelle", "codage_etab.dr",
|
|
]),
|
|
("Codage Recodage", [
|
|
"codage_reco.dp", "codage_reco.dr",
|
|
]),
|
|
("GHM / GHS", [
|
|
"ghm_etab", "ghs_etab", "ghm_reco", "ghs_reco",
|
|
]),
|
|
("Décisions", [
|
|
"recodage_impactant", "ghs_injustifie",
|
|
"accord_desaccord", "praticien_conseil",
|
|
]),
|
|
],
|
|
"concertation_2": [
|
|
("Décision finale", [
|
|
"ghs_initial", "ghs_avant_concertation", "ghs_final",
|
|
"decision", "date_concertation",
|
|
]),
|
|
("Signatures", [
|
|
"praticien_controleur", "medecin_dim",
|
|
]),
|
|
],
|
|
"concertation_1": [
|
|
("Argumentaire", [
|
|
"date_concertation", "argumentaire",
|
|
]),
|
|
],
|
|
"preuves": [
|
|
("Entête preuves", [
|
|
"date", "praticien_controleur", "medecin_dim",
|
|
]),
|
|
],
|
|
# concertation_med et hospitalisation : pas d'édition structurée
|
|
# (pages quasi vides ou manuscrit dense).
|
|
}
|
|
|
|
# Validité : quels champs d'une page sont validables via referentials ATIH
|
|
VALIDATION_PATHS = {
|
|
"recueil": [
|
|
("codage_etab.dp", "cim10"),
|
|
("codage_etab.dr", "cim10"),
|
|
("codage_reco.dp", "cim10"),
|
|
("codage_reco.dr", "cim10"),
|
|
("ghm_etab", "ghm"),
|
|
("ghs_etab", "ghs"),
|
|
("ghm_reco", "ghm"),
|
|
("ghs_reco", "ghs"),
|
|
],
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# Helpers I/O
|
|
# ============================================================
|
|
|
|
def list_pdfs() -> list[Path]:
|
|
return sorted(PDF_DIR.glob("OGC *.pdf"))
|
|
|
|
|
|
def load_extract(name: str) -> dict | None:
|
|
path = EXTRACT_DIR / f"{name}.json"
|
|
if not path.exists(): return None
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def load_gold(name: str) -> dict | None:
|
|
path = GOLD_DIR / f"{name}.json"
|
|
if not path.exists(): return None
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def save_gold(name: str, data: dict) -> Path:
|
|
path = GOLD_DIR / f"{name}.json"
|
|
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def get_field(d: dict | None, path: str) -> str:
|
|
if d is None: return ""
|
|
for k in path.split("."):
|
|
d = d.get(k, "") if isinstance(d, dict) else ""
|
|
return str(d) if d else ""
|
|
|
|
|
|
def set_field(d: dict, path: str, value: str) -> None:
|
|
keys = path.split(".")
|
|
node = d
|
|
for k in keys[:-1]:
|
|
if k not in node or not isinstance(node[k], dict):
|
|
node[k] = {}
|
|
node = node[k]
|
|
node[keys[-1]] = value
|
|
|
|
|
|
def compare_value(pipe_val: str, gold_val: str) -> str:
|
|
if not gold_val and not pipe_val: return "∅"
|
|
if not gold_val: return "—"
|
|
if pipe_val.strip() == gold_val.strip(): return "✓"
|
|
return "✗"
|
|
|
|
|
|
def validation_marker(page_data: dict, field_path: str) -> tuple[str, str]:
|
|
"""Retourne (emoji, info) selon la validation ATIH d'un champ."""
|
|
v = page_data.get("_validation") or {}
|
|
# Descendre dans _validation selon le chemin du champ
|
|
parts = field_path.split(".")
|
|
node = v
|
|
for p in parts:
|
|
node = node.get(p) if isinstance(node, dict) else None
|
|
if node is None:
|
|
return "", ""
|
|
if not isinstance(node, dict) or "valid" not in node:
|
|
return "", ""
|
|
if node.get("valid") is True:
|
|
lib = node.get("libelle_ref", "")
|
|
return "🟢", (f"ATIH ok — {lib}" if lib else "ATIH ok")
|
|
if node.get("valid") is False:
|
|
sug = node.get("suggestion", "")
|
|
if sug:
|
|
return "🟡", f"invalide ATIH — suggestion : {sug}"
|
|
return "🔴", "invalide ATIH"
|
|
return "", ""
|
|
|
|
|
|
# ============================================================
|
|
# Vue principale
|
|
# ============================================================
|
|
|
|
def render_page_editor(name: str, ptype: str, extract: dict, gold: dict | None):
|
|
"""Affiche l'image + les champs éditables + JSON brut pour une page donnée."""
|
|
pipe_data = (extract.get("extraction") or {}).get(ptype) or {}
|
|
gold_data = (gold or {}).get("extraction", {}).get(ptype) or {} if gold else {}
|
|
|
|
# Trouver l'index de la page pour charger l'image
|
|
pages_meta = extract.get("pages") or []
|
|
page_num = next((p["page"] for p in pages_meta if p.get("type") == ptype), None)
|
|
pdf_path = next(p for p in list_pdfs() if p.stem == name)
|
|
if page_num is None:
|
|
st.warning(f"Aucune page trouvée de type '{ptype}' pour {name}")
|
|
return
|
|
|
|
col_img, col_fields = st.columns([5, 4])
|
|
|
|
with col_img:
|
|
st.caption(f"{name} — page {page_num} ({ptype})")
|
|
images = pdf_to_images(str(pdf_path))
|
|
if page_num <= len(images):
|
|
st.image(Image.open(images[page_num - 1]), use_container_width=True)
|
|
|
|
with col_fields:
|
|
fields_def = PAGE_FIELDS.get(ptype)
|
|
if fields_def:
|
|
st.markdown("**Champs extraits**")
|
|
# Formulaire d'édition
|
|
with st.form(f"form_{ptype}_{name}"):
|
|
edited = deepcopy(gold_data) if gold_data else deepcopy(pipe_data)
|
|
for section, fields in fields_def:
|
|
st.markdown(f"*{section}*")
|
|
for f in fields:
|
|
pipe_val = get_field(pipe_data, f)
|
|
gold_val = get_field(gold_data, f)
|
|
cur_val = get_field(edited, f) or pipe_val
|
|
if not gold_data:
|
|
cur_val = pipe_val
|
|
cmp = compare_value(pipe_val, gold_val)
|
|
emoji, help_txt = validation_marker(pipe_data, f)
|
|
label = f"{cmp}{emoji} `{f}`"
|
|
# argumentaire : textarea
|
|
if f == "argumentaire":
|
|
new = st.text_area(label, value=cur_val, height=220,
|
|
key=f"fld_{name}_{ptype}_{f}",
|
|
help=help_txt or f"pipeline : {pipe_val!r}")
|
|
else:
|
|
new = st.text_input(label, value=cur_val,
|
|
key=f"fld_{name}_{ptype}_{f}",
|
|
help=help_txt or f"pipeline : {pipe_val!r}")
|
|
set_field(edited, f, new.strip())
|
|
|
|
col_a, col_b = st.columns(2)
|
|
with col_a:
|
|
save = st.form_submit_button("💾 Sauver (gold)")
|
|
with col_b:
|
|
reset = st.form_submit_button("📋 Reset depuis pipeline")
|
|
|
|
if save:
|
|
g = gold or {"fichier": name, "extraction": {}}
|
|
g.setdefault("extraction", {})[ptype] = edited
|
|
save_gold(name, g)
|
|
st.success(f"Gold {ptype} sauvegardé")
|
|
elif reset:
|
|
g = gold or {"fichier": name, "extraction": {}}
|
|
g.setdefault("extraction", {})[ptype] = deepcopy(pipe_data)
|
|
save_gold(name, g)
|
|
st.info("Gold réinitialisé depuis pipeline")
|
|
else:
|
|
st.info(f"Pas d'édition structurée pour type `{ptype}`. JSON brut seulement.")
|
|
|
|
# JSON brut en bas, pleine largeur, replié par défaut
|
|
with st.expander("📄 JSON brut extrait (pipeline)"):
|
|
st.json(pipe_data)
|
|
with st.expander("🥇 JSON gold actuel"):
|
|
st.json(gold_data if gold_data else {})
|
|
# OCR brut (utile quand parse_error)
|
|
page_meta = next((p for p in pages_meta if p.get("page") == page_num), None)
|
|
if page_meta and page_meta.get("ocr_raw"):
|
|
with st.expander("📝 OCR raw (texte brut renvoyé par le modèle)"):
|
|
st.code(page_meta.get("ocr_raw", ""), language="json")
|
|
|
|
|
|
def render_calibration_page():
|
|
"""Mode 'Calibration zones' : dessine des rectangles à la souris sur une
|
|
image de référence, sauvegarde dans pipeline/zones_config.json."""
|
|
st.header("🔧 Calibration des zones")
|
|
|
|
if not _HAS_CANVAS:
|
|
st.error(
|
|
"Le package `streamlit-drawable-canvas` n'est pas installé.\n"
|
|
"Installe-le avec : `pip install streamlit-drawable-canvas`"
|
|
)
|
|
return
|
|
|
|
pdfs = list_pdfs()
|
|
if not pdfs:
|
|
st.error("Aucun PDF disponible pour la calibration")
|
|
return
|
|
|
|
col_ctrl, _ = st.columns([1, 3])
|
|
with col_ctrl:
|
|
ref_name = st.selectbox(
|
|
"PDF de référence (bien cadré)",
|
|
[p.stem for p in pdfs], key="calib_pdf",
|
|
)
|
|
page_type = st.selectbox(
|
|
"Type de page", ["recueil"],
|
|
help="Aujourd'hui seule la page recueil a des zones configurables",
|
|
)
|
|
# Page numéro selon le type (recueil = page 1)
|
|
page_num = {"recueil": 1}.get(page_type, 1)
|
|
|
|
ref_pdf = next(p for p in pdfs if p.stem == ref_name)
|
|
img_path = pdf_to_images(str(ref_pdf))[page_num - 1]
|
|
img = Image.open(img_path)
|
|
img_w, img_h = img.size
|
|
|
|
# Charger config existante et préparer les zones
|
|
cfg = load_config()
|
|
existing_zones = cfg.get(page_type, {})
|
|
|
|
# On scale l'image pour tenir dans le canvas (largeur ~900 px max)
|
|
canvas_w = 900
|
|
scale = canvas_w / img_w
|
|
canvas_h = int(img_h * scale)
|
|
|
|
# Préparer les rectangles initiaux depuis la config
|
|
initial_rects = []
|
|
for zone_name, z in existing_zones.items():
|
|
if not isinstance(z, dict): continue
|
|
initial_rects.append({
|
|
"type": "rect",
|
|
"left": z["x1"] * canvas_w,
|
|
"top": z["y1"] * canvas_h,
|
|
"width": (z["x2"] - z["x1"]) * canvas_w,
|
|
"height": (z["y2"] - z["y1"]) * canvas_h,
|
|
"fill": "rgba(255, 100, 100, 0.15)",
|
|
"stroke": "red",
|
|
"strokeWidth": 2,
|
|
"label_name": zone_name,
|
|
})
|
|
|
|
st.caption(
|
|
"💡 Dessine un rectangle par zone à la souris. Les zones existantes "
|
|
"apparaissent déjà pré-dessinées. Tu peux les modifier (drag), "
|
|
"en ajouter, ou en supprimer (touche Suppr) puis cliquer sur "
|
|
"**Sauvegarder**."
|
|
)
|
|
|
|
drawing_mode = st.radio(
|
|
"Mode", ["rect", "transform"], horizontal=True,
|
|
format_func=lambda x: {"rect": "✏️ Dessiner", "transform": "🖱 Sélectionner / Déplacer"}[x],
|
|
key="calib_drawing_mode",
|
|
)
|
|
|
|
canvas_result = st_canvas(
|
|
fill_color="rgba(255, 100, 100, 0.15)",
|
|
stroke_width=2,
|
|
stroke_color="red",
|
|
background_image=img,
|
|
update_streamlit=True,
|
|
width=canvas_w,
|
|
height=canvas_h,
|
|
drawing_mode=drawing_mode,
|
|
initial_drawing={"objects": initial_rects, "version": "5.2.1"},
|
|
key="calib_canvas",
|
|
)
|
|
|
|
# Reconstituer la config à partir des rectangles dessinés
|
|
rects = (canvas_result.json_data or {}).get("objects", []) if canvas_result.json_data else []
|
|
|
|
st.markdown("### Zones détectées")
|
|
if not rects:
|
|
st.info("Aucun rectangle dessiné.")
|
|
return
|
|
|
|
new_zones = {}
|
|
for i, r in enumerate(rects):
|
|
if r.get("type") != "rect":
|
|
continue
|
|
# Récupérer le nom existant si présent, sinon demander
|
|
default_name = r.get("label_name") or f"zone_{i+1}"
|
|
name = st.text_input(
|
|
f"Nom de la zone {i+1}",
|
|
value=default_name, key=f"calib_name_{i}",
|
|
)
|
|
x1 = r["left"] / canvas_w
|
|
y1 = r["top"] / canvas_h
|
|
x2 = x1 + r["width"] / canvas_w
|
|
y2 = y1 + r["height"] / canvas_h
|
|
desc = existing_zones.get(name, {}).get("description", "")
|
|
desc = st.text_input(
|
|
f"Description (optionnel)", value=desc, key=f"calib_desc_{i}",
|
|
)
|
|
st.caption(f"Coords relatives : ({x1:.3f}, {y1:.3f}) → ({x2:.3f}, {y2:.3f})")
|
|
new_zones[name] = {"x1": round(x1, 4), "y1": round(y1, 4),
|
|
"x2": round(x2, 4), "y2": round(y2, 4),
|
|
"description": desc}
|
|
|
|
if st.button("💾 Sauvegarder la configuration", type="primary"):
|
|
cfg[page_type] = new_zones
|
|
path = save_config(cfg)
|
|
st.success(f"Configuration sauvegardée : {path}")
|
|
st.json(new_zones)
|
|
|
|
|
|
def main():
|
|
st.set_page_config(page_title="OGC Overlay", layout="wide")
|
|
|
|
# Réduire les marges par défaut
|
|
st.markdown("""
|
|
<style>
|
|
.block-container { padding-top: 1rem; padding-bottom: 1rem; max-width: 95%; }
|
|
</style>
|
|
""", unsafe_allow_html=True)
|
|
|
|
st.title("🩺 Extraction OGC — review & gold set")
|
|
|
|
# Sélecteur de mode en haut de sidebar
|
|
with st.sidebar:
|
|
mode = st.radio("Mode", ["📋 Review dossier", "🔧 Calibration zones"])
|
|
if mode == "🔧 Calibration zones":
|
|
render_calibration_page()
|
|
return
|
|
|
|
pdfs = list_pdfs()
|
|
if not pdfs:
|
|
st.error(f"Aucun PDF trouvé dans {PDF_DIR}")
|
|
return
|
|
|
|
with st.sidebar:
|
|
st.header("Dossier")
|
|
names = [p.stem for p in pdfs]
|
|
name = st.selectbox("Choisir un OGC", names)
|
|
|
|
extract = load_extract(name)
|
|
gold = load_gold(name)
|
|
|
|
if extract is None:
|
|
st.error(f"Pas de JSON extrait pour {name}")
|
|
st.caption(f"Attendu : output/v2/{name}.json")
|
|
else:
|
|
st.caption(f"✓ Extraction pipeline chargée")
|
|
if gold:
|
|
st.success("Gold set défini")
|
|
else:
|
|
st.caption("Pas encore de gold")
|
|
|
|
# Résumé validation ATIH
|
|
if extract:
|
|
rec_v = (extract.get("extraction") or {}).get("recueil", {}).get("_validation", {})
|
|
summary = rec_v.get("summary", {})
|
|
if summary:
|
|
st.markdown("---")
|
|
st.markdown("**Validation ATIH (page recueil)**")
|
|
st.metric("Codes valides", f"{summary.get('valid',0)}/{summary.get('total_codes','?')}")
|
|
cc = rec_v.get("cross_checks", {})
|
|
for side in ("etab", "reco"):
|
|
c = cc.get(side, {})
|
|
if c.get("checked"):
|
|
icon = "✓" if c.get("coherent") else "✗"
|
|
st.caption(f"{icon} GHM↔GHS {side}")
|
|
|
|
st.markdown("---")
|
|
st.caption("💡 Chaque code médical est suivi d'un marqueur :")
|
|
st.caption("🟢 valide ATIH — 🟡 invalide (suggestion dispo) — 🔴 invalide")
|
|
st.caption("✓/✗/∅ = accord avec gold set")
|
|
|
|
if extract is None:
|
|
return
|
|
|
|
# Onglets par type de page — un par vraie page trouvée
|
|
pages_meta = extract.get("pages") or []
|
|
available_types = []
|
|
seen = set()
|
|
for p in pages_meta:
|
|
t = p.get("type")
|
|
if t and t not in seen:
|
|
seen.add(t)
|
|
available_types.append(t)
|
|
# Ordre canonique puis fallback
|
|
ordered = [t for t in PAGE_ORDER if t in available_types] + \
|
|
[t for t in available_types if t not in PAGE_ORDER]
|
|
|
|
if not ordered:
|
|
st.warning("Aucune page classifiée dans ce dossier.")
|
|
return
|
|
|
|
tabs = st.tabs([PAGE_LABEL.get(t, t) for t in ordered])
|
|
for tab, ptype in zip(tabs, ordered):
|
|
with tab:
|
|
render_page_editor(name, ptype, extract, gold)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|