extract.py contenait 4 responsabilités mélangées (320 lignes) : parsing JSON tolérant, résolution de zones, crop Recodage avec classification métier, orchestration. Séparation en modules cohérents : - pipeline/json_utils.py : parsing tolérant réutilisable (strip fences, virgules manquantes, troncature des boucles d'objets vides, fermeture des structures JSON ouvertes). N'a aucune connaissance métier OGC. - pipeline/recueil.py : toute la logique spécifique à la page recueil — résolution de zones configurables, filter_cim10_codes, classification DP/DR/DAS par règle métier, run_recodage_crop_pass, merge_codage_reco, enrich_recueil (orchestration des trois : checkboxes + ghs_injustifie + crop Recodage). Chaque fonction est testable indépendamment du VLM. - pipeline/extract.py : réduit à l'orchestration pure — ingest, routing, boucle page par page, délégation à recueil.enrich_recueil, validation ATIH finale. Plus aucune logique métier enfouie. La fonction extract_dossier garde exactement la même signature et produit le même JSON en sortie : aucun breaking change externe. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
204 lines
7.4 KiB
Python
204 lines
7.4 KiB
Python
"""Logique spécifique à la page `recueil` (fiche médicale de recueil OGC).
|
|
|
|
Regroupe tout ce qui concerne cette page — la plus riche et la plus difficile
|
|
à extraire du dossier OGC — séparé de l'orchestration générale :
|
|
|
|
- Résolution des zones configurables (crop Recodage, checkboxes)
|
|
- Second passage VLM sur le crop de la colonne Recodage
|
|
- Fusion du résultat crop dans le JSON principal
|
|
- Classification des codes CIM-10 en DP/DR/DAS par règle métier
|
|
- Enrichissement post-extraction (checkboxes Accord/Désaccord, ghs_injustifie)
|
|
|
|
Les fonctions sont testables indépendamment de Qwen quand on leur fournit
|
|
déjà les sorties OCR brutes.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from PIL import Image
|
|
|
|
from .checkboxes import (
|
|
CheckboxZones,
|
|
RECUEIL_ACCORD_DESACCORD,
|
|
detect_accord_desaccord,
|
|
parse_ghs_injustifie,
|
|
)
|
|
from .json_utils import parse_json_output
|
|
from .ocr_qwen import QwenVLOCR
|
|
from .prompts import RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE
|
|
from .zones_config import get_zone, load_config
|
|
|
|
|
|
# ============================================================
|
|
# Résolution des zones (config JSON + fallback sur les defaults)
|
|
# ============================================================
|
|
|
|
def resolve_recueil_zones() -> tuple[tuple[float, float, float, float], CheckboxZones]:
|
|
"""Charge les zones de la page recueil depuis la config utilisateur,
|
|
avec fallback sur les constantes compilées si la config est absente.
|
|
|
|
Retourne (zone_crop_recodage, zones_accord_desaccord).
|
|
"""
|
|
cfg = load_config()
|
|
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
|
|
acc = get_zone("recueil", "accord_checkbox", cfg)
|
|
des = get_zone("recueil", "desaccord_checkbox", cfg)
|
|
if acc and des:
|
|
cb = CheckboxZones(accord=acc, desaccord=des)
|
|
else:
|
|
cb = RECUEIL_ACCORD_DESACCORD
|
|
return reco, cb
|
|
|
|
|
|
# ============================================================
|
|
# Classification CIM-10 → DP / DR / DAS (pur, testable sans VLM)
|
|
# ============================================================
|
|
|
|
CIM10_RE = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
|
|
|
|
|
|
def filter_cim10_codes(codes_raw: list[Any]) -> list[dict]:
|
|
"""Filtre une liste de codes OCR bruts pour ne garder que les CIM-10.
|
|
|
|
Les VLM peuvent parfois lire des codes CCAM (actes) dans un crop qui
|
|
dépasse sur le bloc Actes. On les retire ici pour ne pas polluer les DAS.
|
|
"""
|
|
kept = []
|
|
for c in codes_raw or []:
|
|
if not isinstance(c, dict):
|
|
continue
|
|
code = (c.get("code") or "").strip()
|
|
if code and CIM10_RE.match(code):
|
|
kept.append({
|
|
"code": code,
|
|
"position": str(c.get("position") or "").strip(),
|
|
})
|
|
return kept
|
|
|
|
|
|
def classify_codes_dp_dr_das(codes: list[dict]) -> tuple[str, str, list[dict]]:
|
|
"""Classifie une liste de codes {code, position} en DP, DR et liste de DAS.
|
|
|
|
Règle métier :
|
|
- 1er code sans position → DP
|
|
- 2e code sans position → DR (ignoré si identique au DP : le VLM peut
|
|
dupliquer le DP quand la case DR est visuellement vide)
|
|
- tous les codes avec position → DAS
|
|
- codes sans position au-delà du 2e → DAS sans position (pour ne rien perdre)
|
|
"""
|
|
dp, dr = "", ""
|
|
das: list[dict] = []
|
|
dp_assigned = dr_assigned = False
|
|
for c in codes:
|
|
code, position = c["code"], c["position"]
|
|
if not position:
|
|
if not dp_assigned:
|
|
dp, dp_assigned = code, True
|
|
elif not dr_assigned:
|
|
if code == dp:
|
|
dr_assigned = True # doublon DP → DR vide
|
|
else:
|
|
dr, dr_assigned = code, True
|
|
else:
|
|
das.append({"code": code, "position": ""})
|
|
else:
|
|
das.append(c)
|
|
return dp, dr, das
|
|
|
|
|
|
# ============================================================
|
|
# Second passage VLM sur crop Recodage
|
|
# ============================================================
|
|
|
|
def run_recodage_crop_pass(image_path: Path, ocr: QwenVLOCR,
|
|
zone: tuple[float, float, float, float] | None = None
|
|
) -> dict | None:
|
|
"""Execute un second passage VLM sur le crop zonal de la colonne Recodage.
|
|
|
|
Sauvegarde le crop à côté de l'image source (suffixe `_recodage.png`)
|
|
pour audit. Retourne un dict avec `dp/dr/das` + métadonnées, ou None
|
|
en cas d'échec d'OCR ou de parsing.
|
|
"""
|
|
try:
|
|
img = Image.open(image_path)
|
|
w, h = img.size
|
|
z = zone
|
|
if z is None:
|
|
z, _ = resolve_recueil_zones()
|
|
x1, y1, x2, y2 = z
|
|
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
|
|
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
|
|
crop.save(crop_path)
|
|
except Exception:
|
|
return None
|
|
|
|
t0 = time.time()
|
|
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
|
|
parsed = parse_json_output(res["text"])
|
|
if not isinstance(parsed, dict) or "_parse_error" in parsed:
|
|
return None
|
|
|
|
codes = filter_cim10_codes(parsed.get("codes") or [])
|
|
dp, dr, das = classify_codes_dp_dr_das(codes)
|
|
return {
|
|
"dp": dp, "dr": dr, "das": das,
|
|
"_source": "crop_recodage",
|
|
"_elapsed_s": round(res["elapsed_s"], 2),
|
|
"_n_codes_raw": len(parsed.get("codes") or []),
|
|
"_n_codes_kept": len(codes),
|
|
}
|
|
|
|
|
|
def merge_codage_reco(parsed: dict, reco: dict) -> None:
|
|
"""Fusionne le résultat du crop Recodage dans `parsed["codage_reco"]`.
|
|
|
|
Politique de merge : le crop est plus fiable (contexte isolé) donc il
|
|
prime sur le passage principal. Exception : si un champ du crop est vide
|
|
mais que le passage principal l'a rempli, on garde celui du passage
|
|
principal (on ne dégrade jamais un résultat existant).
|
|
"""
|
|
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
|
|
parsed["codage_reco"] = {
|
|
"dp": reco.get("dp", "") or existing.get("dp", ""),
|
|
"dr": reco.get("dr", "") or existing.get("dr", ""),
|
|
"das": reco.get("das") or existing.get("das") or [],
|
|
}
|
|
parsed.setdefault("_crop_recodage", {})["result"] = reco
|
|
|
|
|
|
# ============================================================
|
|
# Enrichissement post-extraction d'une page recueil
|
|
# ============================================================
|
|
|
|
def enrich_recueil(parsed: dict, image_path: Path, ocr: QwenVLOCR,
|
|
cb_zones: CheckboxZones | None = None) -> dict:
|
|
"""Enrichit un JSON recueil parsé avec :
|
|
- checkbox accord/désaccord (méthode densité pixels, indépendante du VLM)
|
|
- normalisation `ghs_injustifie` → 0 / 1 / ""
|
|
- second passage VLM sur le crop Recodage si besoin, fusionné dans `codage_reco`
|
|
|
|
Modifie `parsed` en place et le renvoie (pratique pour chaînage).
|
|
"""
|
|
if not isinstance(parsed, dict):
|
|
return parsed
|
|
zones = cb_zones or resolve_recueil_zones()[1]
|
|
|
|
# Checkboxes accord / désaccord
|
|
cb = detect_accord_desaccord(image_path, zones)
|
|
parsed["accord_desaccord"] = cb["decision"]
|
|
parsed["_checkbox_debug"] = cb
|
|
|
|
# Normalisation ghs_injustifie
|
|
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
|
|
|
|
# Second passage Recodage
|
|
reco = run_recodage_crop_pass(image_path, ocr)
|
|
if reco:
|
|
merge_codage_reco(parsed, reco)
|
|
|
|
return parsed
|