"""Logique spécifique à la page `recueil` (fiche médicale de recueil OGC). Regroupe tout ce qui concerne cette page — la plus riche et la plus difficile à extraire du dossier OGC — séparé de l'orchestration générale : - Résolution des zones configurables (crop Recodage, checkboxes) - Second passage VLM sur le crop de la colonne Recodage - Fusion du résultat crop dans le JSON principal - Classification des codes CIM-10 en DP/DR/DAS par règle métier - Enrichissement post-extraction (checkboxes Accord/Désaccord, ghs_injustifie) Les fonctions sont testables indépendamment de Qwen quand on leur fournit déjà les sorties OCR brutes. """ from __future__ import annotations import re import time from pathlib import Path from typing import Any from PIL import Image from .checkboxes import ( CheckboxZones, RECUEIL_ACCORD_DESACCORD, detect_accord_desaccord, parse_ghs_injustifie, ) from .json_utils import parse_json_output from .ocr_qwen import QwenVLOCR from .prompts import RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE from .zones_config import get_zone, load_config # ============================================================ # Résolution des zones (config JSON + fallback sur les defaults) # ============================================================ def resolve_recueil_zones() -> tuple[tuple[float, float, float, float], CheckboxZones]: """Charge les zones de la page recueil depuis la config utilisateur, avec fallback sur les constantes compilées si la config est absente. Retourne (zone_crop_recodage, zones_accord_desaccord). """ cfg = load_config() reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE acc = get_zone("recueil", "accord_checkbox", cfg) des = get_zone("recueil", "desaccord_checkbox", cfg) if acc and des: cb = CheckboxZones(accord=acc, desaccord=des) else: cb = RECUEIL_ACCORD_DESACCORD return reco, cb # ============================================================ # Classification CIM-10 → DP / DR / DAS (pur, testable sans VLM) # ============================================================ CIM10_RE = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$") def filter_cim10_codes(codes_raw: list[Any]) -> list[dict]: """Filtre une liste de codes OCR bruts pour ne garder que les CIM-10. Les VLM peuvent parfois lire des codes CCAM (actes) dans un crop qui dépasse sur le bloc Actes. On les retire ici pour ne pas polluer les DAS. """ kept = [] for c in codes_raw or []: if not isinstance(c, dict): continue code = (c.get("code") or "").strip() if code and CIM10_RE.match(code): kept.append({ "code": code, "position": str(c.get("position") or "").strip(), }) return kept def classify_codes_dp_dr_das(codes: list[dict]) -> tuple[str, str, list[dict]]: """Classifie une liste de codes {code, position} en DP, DR et liste de DAS. Règle métier : - 1er code sans position → DP - 2e code sans position → DR (ignoré si identique au DP : le VLM peut dupliquer le DP quand la case DR est visuellement vide) - tous les codes avec position → DAS - codes sans position au-delà du 2e → DAS sans position (pour ne rien perdre) """ dp, dr = "", "" das: list[dict] = [] dp_assigned = dr_assigned = False for c in codes: code, position = c["code"], c["position"] if not position: if not dp_assigned: dp, dp_assigned = code, True elif not dr_assigned: if code == dp: dr_assigned = True # doublon DP → DR vide else: dr, dr_assigned = code, True else: das.append({"code": code, "position": ""}) else: das.append(c) return dp, dr, das # ============================================================ # Second passage VLM sur crop Recodage # ============================================================ def run_recodage_crop_pass(image_path: Path, ocr: QwenVLOCR, zone: tuple[float, float, float, float] | None = None ) -> dict | None: """Execute un second passage VLM sur le crop zonal de la colonne Recodage. Sauvegarde le crop à côté de l'image source (suffixe `_recodage.png`) pour audit. Retourne un dict avec `dp/dr/das` + métadonnées, ou None en cas d'échec d'OCR ou de parsing. """ try: img = Image.open(image_path) w, h = img.size z = zone if z is None: z, _ = resolve_recueil_zones() x1, y1, x2, y2 = z crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h))) crop_path = image_path.parent / f"{image_path.stem}_recodage.png" crop.save(crop_path) except Exception: return None t0 = time.time() res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024) parsed = parse_json_output(res["text"]) if not isinstance(parsed, dict) or "_parse_error" in parsed: return None codes = filter_cim10_codes(parsed.get("codes") or []) dp, dr, das = classify_codes_dp_dr_das(codes) return { "dp": dp, "dr": dr, "das": das, "_source": "crop_recodage", "_elapsed_s": round(res["elapsed_s"], 2), "_n_codes_raw": len(parsed.get("codes") or []), "_n_codes_kept": len(codes), } def merge_codage_reco(parsed: dict, reco: dict) -> None: """Fusionne le résultat du crop Recodage dans `parsed["codage_reco"]`. Politique de merge : le crop est plus fiable (contexte isolé) donc il prime sur le passage principal. Exception : si un champ du crop est vide mais que le passage principal l'a rempli, on garde celui du passage principal (on ne dégrade jamais un résultat existant). """ existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {} parsed["codage_reco"] = { "dp": reco.get("dp", "") or existing.get("dp", ""), "dr": reco.get("dr", "") or existing.get("dr", ""), "das": reco.get("das") or existing.get("das") or [], } parsed.setdefault("_crop_recodage", {})["result"] = reco # ============================================================ # Enrichissement post-extraction d'une page recueil # ============================================================ def enrich_recueil(parsed: dict, image_path: Path, ocr: QwenVLOCR, cb_zones: CheckboxZones | None = None) -> dict: """Enrichit un JSON recueil parsé avec : - checkbox accord/désaccord (méthode densité pixels, indépendante du VLM) - normalisation `ghs_injustifie` → 0 / 1 / "" - second passage VLM sur le crop Recodage si besoin, fusionné dans `codage_reco` Modifie `parsed` en place et le renvoie (pratique pour chaînage). """ if not isinstance(parsed, dict): return parsed zones = cb_zones or resolve_recueil_zones()[1] # Checkboxes accord / désaccord cb = detect_accord_desaccord(image_path, zones) parsed["accord_desaccord"] = cb["decision"] parsed["_checkbox_debug"] = cb # Normalisation ghs_injustifie parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", "")) # Second passage Recodage reco = run_recodage_crop_pass(image_path, ocr) if reco: merge_codage_reco(parsed, reco) return parsed