From d326524e495d4c3f51b3719c9a4808ddc906b527 Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 24 Apr 2026 23:29:03 +0200 Subject: [PATCH] =?UTF-8?q?refactor(extract):=20d=C3=A9composer=20en=20?= =?UTF-8?q?=C3=A9tages=20testables=20(json=5Futils=20+=20recueil)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit extract.py contenait 4 responsabilités mélangées (320 lignes) : parsing JSON tolérant, résolution de zones, crop Recodage avec classification métier, orchestration. Séparation en modules cohérents : - pipeline/json_utils.py : parsing tolérant réutilisable (strip fences, virgules manquantes, troncature des boucles d'objets vides, fermeture des structures JSON ouvertes). N'a aucune connaissance métier OGC. - pipeline/recueil.py : toute la logique spécifique à la page recueil — résolution de zones configurables, filter_cim10_codes, classification DP/DR/DAS par règle métier, run_recodage_crop_pass, merge_codage_reco, enrich_recueil (orchestration des trois : checkboxes + ghs_injustifie + crop Recodage). Chaque fonction est testable indépendamment du VLM. - pipeline/extract.py : réduit à l'orchestration pure — ingest, routing, boucle page par page, délégation à recueil.enrich_recueil, validation ATIH finale. Plus aucune logique métier enfouie. La fonction extract_dossier garde exactement la même signature et produit le même JSON en sortie : aucun breaking change externe. Co-Authored-By: Claude Opus 4.7 (1M context) --- pipeline/extract.py | 336 ++++++++++------------------------------- pipeline/json_utils.py | 136 +++++++++++++++++ pipeline/recueil.py | 203 +++++++++++++++++++++++++ 3 files changed, 415 insertions(+), 260 deletions(-) create mode 100644 pipeline/json_utils.py create mode 100644 pipeline/recueil.py diff --git a/pipeline/extract.py b/pipeline/extract.py index 8bbde55..47ac1d9 100644 --- a/pipeline/extract.py +++ b/pipeline/extract.py @@ -1,230 +1,84 @@ -"""Orchestration d'extraction pour un dossier OGC.""" -import json -import re +"""Orchestration d'extraction pour un dossier OGC. + +Chaîne les étages du pipeline sans connaître leur implémentation interne : + + ingest → routing → OCR page par page → enrichissement page-spécifique → validation ATIH + +L'orchestration elle-même ne contient aucune logique métier : elle délègue à +`pipeline.recueil`, `pipeline.validation`, `pipeline.classify`, `pipeline.ocr_qwen` +et `pipeline.prompts`. Cela permet de tester indépendamment chaque étage. +""" +from __future__ import annotations + import time from pathlib import Path -from PIL import Image -from .ingest import pdf_to_images + from .classify import detect_page_type, route_by_index +from .ingest import pdf_to_images +from .json_utils import parse_json_output from .ocr_qwen import QwenVLOCR -from .prompts import ( - PAGE_TYPES, PROMPT_HEADER, - SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE, -) -from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie, CheckboxZones -from .zones_config import load_config, get_zone +from .prompts import PAGE_TYPES, PROMPT_HEADER +from .recueil import enrich_recueil, resolve_recueil_zones from .validation import annotate as validate_annotate -_EMPTY_OBJ_PATTERN = re.compile( - r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}', - re.DOTALL, -) +def _run_page_ocr(ocr: QwenVLOCR, image_path: Path, ptype: str) -> tuple[dict | None, str, float]: + """Exécute le prompt principal associé à un type de page et parse le JSON. - -def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str: - """Détecte et tronque les boucles d'objets vides. - - GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand - un tableau DAS ou actes est vide dans l'image. La sortie est alors - tronquée à `max_new_tokens` sans fermer le JSON → parse error. - On garde au plus `max_consecutive` objets vides puis on coupe. + Retourne (parsed_dict_ou_None, ocr_raw, elapsed_s). `parsed=None` quand + la page n'a pas de prompt structuré associé (concertation_med, hospit.). """ - matches = list(_EMPTY_OBJ_PATTERN.finditer(text)) - if len(matches) <= max_consecutive: - return text - # On coupe après la fin du `max_consecutive`-ième match - cut_at = matches[max_consecutive - 1].end() - return text[:cut_at] - - -def _close_open_json(text: str) -> str: - """Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué.""" - # Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles - depth_brace = 0 - depth_bracket = 0 - in_string = False - escape = False - for c in text: - if escape: - escape = False - continue - if c == "\\": - escape = True - continue - if c == '"': - in_string = not in_string - continue - if in_string: - continue - if c == "{": depth_brace += 1 - elif c == "}": depth_brace -= 1 - elif c == "[": depth_bracket += 1 - elif c == "]": depth_bracket -= 1 - # Retirer les virgules traînantes - closed = text.rstrip().rstrip(",") - # Fermer en priorité les crochets ouverts (tableaux), puis les accolades - closed += "]" * max(0, depth_bracket) - closed += "}" * max(0, depth_brace) - return closed - - -def parse_json_output(raw: str) -> dict | None: - """Tente d'extraire un JSON depuis la sortie GLM-OCR. - - Stratégies successives : - 1. parse direct après retrait des fences ```json - 2. patch des virgules manquantes entre objets / tableaux - 3. détection et troncature des boucles d'objets vides (cas fréquent sur - tableaux DAS/actes vides → boucle jusqu'à max_new_tokens) - 4. fermeture des structures JSON ouvertes après troncature - """ - if not raw: - return None - text = raw.strip() - # 1) fences markdown - text = re.sub(r"^```(?:json)?\s*", "", text) - text = re.sub(r"\s*```$", "", text) - try: - return json.loads(text) - except json.JSONDecodeError: - pass - - # 2) virgules manquantes entre `} {` et `] [` - patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text) - patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched) - try: - return json.loads(patched) - except json.JSONDecodeError: - pass - - # 3) troncature des boucles d'objets vides puis 4) fermeture - trimmed = _truncate_empty_loop(patched) - closed = _close_open_json(trimmed) - try: - result = json.loads(closed) - result["_truncated_loop"] = True # trace de l'intervention - return result - except json.JSONDecodeError as e: - return {"_raw": raw, "_parse_error": str(e)} - - -def _recueil_zones() -> tuple[tuple, CheckboxZones]: - """Charge les zones configurables pour la page recueil. - - Retourne (recodage_zone, accord_desaccord_zones). Si la config n'a pas - d'entrée, on retombe sur les constantes compilées. - """ - cfg = load_config() - reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE - acc = get_zone("recueil", "accord_checkbox", cfg) - des = get_zone("recueil", "desaccord_checkbox", cfg) - if acc and des: - cb = CheckboxZones(accord=acc, desaccord=des) - else: - cb = RECUEIL_ACCORD_DESACCORD - return reco, cb - - -def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None: - """Second passage VLM sur le crop zonal de la colonne Recodage. - - Qwen nous renvoie la liste brute de tous les codes visibles (avec position - si présente). On classifie DP/DR/DAS en Python par règles : - - les 1ᵉʳ et 2ᵉ codes SANS position → DP puis DR (DR peut être vide si - le 2ᵉ code a déjà une position). - - tous les codes AVEC position → DAS. - - Retourne un dict {dp, dr, das[]} ou None en cas d'échec. - """ - try: - img = Image.open(image_path) - w, h = img.size - reco_zone, _ = _recueil_zones() - x1, y1, x2, y2 = reco_zone - crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h))) - crop_path = image_path.parent / f"{image_path.stem}_recodage.png" - crop.save(crop_path) - except Exception: - return None - - res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024) + conf = PAGE_TYPES.get(ptype) + if not conf or conf["prompt"] == PROMPT_HEADER: + return None, "", 0.0 + res = ocr.run(image_path, conf["prompt"], max_new_tokens=4096) parsed = parse_json_output(res["text"]) - if not isinstance(parsed, dict) or "_parse_error" in parsed: - return None - - # Filtrer : ne garder que les codes au format CIM-10. Si le crop dépasse - # malgré tout dans la zone Actes, les CCAM (4 lettres + 3 chiffres) seront - # exclus ici. - cim10_re = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$") - codes_raw = parsed.get("codes") or [] - codes = [] - for c in codes_raw: - if not isinstance(c, dict): continue - code = (c.get("code") or "").strip() - if code and cim10_re.match(code): - codes.append({ - "code": code, - "position": str(c.get("position") or "").strip(), - }) - - # Classifier par règle métier : - # - 1er code sans position → DP - # - 2e code sans position → DR (sauf s'il est identique au DP : Qwen tend - # à dupliquer le DP quand DR est vide — on préfère DR="") - # - codes avec position → DAS - dp, dr = "", "" - das = [] - dp_assigned = dr_assigned = False - for c in codes: - code, position = c["code"], c["position"] - if not position: - if not dp_assigned: - dp, dp_assigned = code, True - elif not dr_assigned: - if code == dp: - # doublon du DP → on considère que DR est vide - dr_assigned = True - else: - dr, dr_assigned = code, True - else: - das.append({"code": code, "position": ""}) - else: - das.append(c) - return { - "dp": dp, "dr": dr, "das": das, - "_source": "crop_recodage", - "_elapsed_s": round(res["elapsed_s"], 2), - "_n_codes_raw": len(codes_raw), - "_n_codes_kept": len(codes), - } + return parsed, res["text"], round(res["elapsed_s"], 2) -def _merge_codage_reco(parsed: dict, reco: dict) -> None: - """Fusionne le résultat du crop Recodage dans parsed["codage_reco"]. +def _resolve_routing(images: list[Path], ocr: QwenVLOCR, + use_standard_routing: bool, + verbose: bool) -> tuple[list[str | None], list[str]]: + """Détermine le type de chaque page, soit par ordre standard (une seule + vérification sur la page 1), soit par classification OCR page par page. - Politique : le crop est plus fiable (contexte isolé), il prime sur le - passage principal SAUF si le crop laisse vide un champ que le principal - avait bien lu. + Retourne (page_types, headers). `headers[i]` est vide si pas de classify + effectuée sur la page i. """ - existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {} - merged = { - "dp": reco.get("dp", "") or existing.get("dp", ""), - "dr": reco.get("dr", "") or existing.get("dr", ""), - "das": reco.get("das") or existing.get("das") or [], - } - parsed["codage_reco"] = merged - parsed.setdefault("_crop_recodage", {})["result"] = reco + page_types: list[str | None] = [None] * len(images) + headers: list[str] = [""] * len(images) + + if use_standard_routing and images: + ptype1, header1 = detect_page_type(images[0], ocr) + if ptype1 == "recueil": + page_types = list(route_by_index(len(images))) + headers[0] = header1 + if verbose: + print(" routing standard (page 1 = recueil OK)") + return page_types, headers + if verbose: + print(f" page 1 = {ptype1} → fallback classification") + + # Fallback : classify page par page + for i, img in enumerate(images): + page_types[i], headers[i] = detect_page_type(img, ocr) + return page_types, headers def extract_dossier(pdf_path: str | Path, verbose: bool = True, use_standard_routing: bool = True) -> dict: - """Pipeline complet d'un dossier : PDF → JSON structuré. + """Pipeline complet d'un dossier : PDF → JSON structuré + annoté ATIH. - use_standard_routing=True (défaut) : route les pages par index selon - l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps. - Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par - "recueil" — si non, bascule en classification complète (fallback). + Étages : + 1. `ingest.pdf_to_images` : PDF → PNG 300 dpi (avec deskew auto, cache) + 2. `_resolve_routing` : type de chaque page + 3. `_run_page_ocr` : OCR du schéma structuré par type de page + 4. `recueil.enrich_recueil` : checkboxes + crop Recodage pour la page recueil + 5. `validation.annotate` : validation ATIH de tous les codes extraits + + Paramètre `use_standard_routing=True` exploite l'ordre standard des 6 pages + OGC et économise 5 appels OCR par dossier. Bascule automatique sur la + classification page-à-page si la page 1 n'est pas le recueil attendu. """ pdf_path = Path(pdf_path) ocr = QwenVLOCR() @@ -235,37 +89,22 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True, if verbose: print(f"[{pdf_path.name}] {len(images)} pages converties") - # Choix de stratégie de routing - page_types = [None] * len(images) - headers = [""] * len(images) - if use_standard_routing: - # Vérif rapide sur la page 1 (seul OCR de classification) - ptype1, header1 = detect_page_type(images[0], ocr) - if ptype1 == "recueil": - page_types = route_by_index(len(images)) - headers[0] = header1 - if verbose: - print(f" routing standard (page 1 = recueil OK)") - else: - if verbose: - print(f" page 1 = {ptype1} → fallback classification") - use_standard_routing = False + page_types, headers = _resolve_routing(images, ocr, use_standard_routing, verbose) - result = { + _, cb_zones = resolve_recueil_zones() + + result: dict = { "fichier": pdf_path.stem, - "pdf_hash": images[0].parent.name, + "pdf_hash": images[0].parent.name if images else "", "pages": [], "extraction": {}, } for idx, img_path in enumerate(images, 1): t0 = time.time() - if use_standard_routing: - ptype = page_types[idx - 1] - header_text = headers[idx - 1] - else: - ptype, header_text = detect_page_type(img_path, ocr) - page_info = { + ptype = page_types[idx - 1] + header_text = headers[idx - 1] + page_info: dict = { "page": idx, "type": ptype, "header": header_text.strip(), @@ -274,43 +113,20 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True, if verbose: print(f" p{idx}: {ptype}") - prompt_conf = PAGE_TYPES.get(ptype) - if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER: - res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096) - parsed = parse_json_output(res["text"]) - page_info["ocr_raw"] = res["text"] + parsed, ocr_raw, elapsed = _run_page_ocr(ocr, img_path, ptype) if ptype else (None, "", 0.0) + if parsed is not None: + page_info["ocr_raw"] = ocr_raw page_info["parsed"] = parsed - page_info["elapsed_s"] = round(res["elapsed_s"], 2) + page_info["elapsed_s"] = elapsed - # Enrichissement : checkboxes + normalisation champs booléens - # sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases - # à cocher (cf. scratch/test_prompt_crop_v2.py). if ptype == "recueil" and isinstance(parsed, dict): - _, cb_zones = _recueil_zones() - cb = detect_accord_desaccord(img_path, cb_zones) - parsed["accord_desaccord"] = cb["decision"] - parsed["_checkbox_debug"] = cb # ratios + diff pour audit - # ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD" - # → ne garder que le chiffre 0/1 de tête - parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", "")) - - # Second passage : crop de la colonne Recodage pour compenser - # la sous-extraction observée sur codage_reco.* en passage principal. - reco = _extract_recodage_crop(img_path, ocr) - if reco: - _merge_codage_reco(parsed, reco) - + enrich_recueil(parsed, img_path, ocr, cb_zones) page_info["parsed"] = parsed - # Indexer par type pour accès direct dans result["extraction"] result["extraction"][ptype] = parsed else: - # Pages non structurées : juste l'en-tête déjà OCR page_info["elapsed_s"] = round(time.time() - t0, 2) result["pages"].append(page_info) - # Post-traitement : validation ATIH de tous les codes extraits - result = validate_annotate(result) - - return result + return validate_annotate(result) diff --git a/pipeline/json_utils.py b/pipeline/json_utils.py new file mode 100644 index 0000000..b2e2a77 --- /dev/null +++ b/pipeline/json_utils.py @@ -0,0 +1,136 @@ +"""Parsing JSON tolérant pour les sorties des VLM. + +Les VLM (Qwen, GLM-OCR, GOT-OCR…) produisent du JSON avec des anomalies +fréquentes : + +- Encadrement par des fences markdown ```json ... ``` +- Virgules manquantes entre objets ou éléments de tableau +- Boucles pathologiques d'objets vides `{"code":"","position":""}` répétés + jusqu'à `max_new_tokens`, ce qui tronque le JSON sans le fermer proprement + +Ce module expose `parse_json_output()` qui applique plusieurs stratégies de +récupération avant d'abandonner. En dernier recours, il renvoie un dict avec +`_raw` + `_parse_error` pour audit, jamais `None` (ce qui casserait le pipeline). +""" +from __future__ import annotations + +import json +import re + + +# Pattern qui matche un objet vide générique {"code":"","position":"",...} +_EMPTY_OBJ_PATTERN = re.compile( + r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}', + re.DOTALL, +) + +_FENCE_OPEN_RE = re.compile(r"^```(?:json)?\s*") +_FENCE_CLOSE_RE = re.compile(r"\s*```$") +_MISSING_COMMA_OBJ_RE = re.compile(r"\}\s*\n(\s*\{)") +_MISSING_COMMA_ARR_RE = re.compile(r"\]\s*\n(\s*\[)") + + +def strip_fences(text: str) -> str: + """Retire un éventuel encadrement ```json ... ```.""" + text = _FENCE_OPEN_RE.sub("", text.strip()) + text = _FENCE_CLOSE_RE.sub("", text) + return text + + +def patch_missing_commas(text: str) -> str: + """Ajoute les virgules manquantes entre `}\\n{` et `]\\n[`. + + Les VLM omettent fréquemment ces virgules dans leurs sorties JSON. + """ + text = _MISSING_COMMA_OBJ_RE.sub(r"},\n\1", text) + text = _MISSING_COMMA_ARR_RE.sub(r"],\n\1", text) + return text + + +def truncate_empty_loop(text: str, max_consecutive: int = 2) -> str: + """Tronque les boucles d'objets vides (`{"code":"","position":""}` répété). + + Cas d'usage : quand un tableau DAS ou Actes est vide dans l'image, le + VLM a parfois tendance à générer le même objet vide en boucle jusqu'à + saturer `max_new_tokens`. La sortie est alors tronquée sans fermer le + JSON → parse error. Garder au maximum `max_consecutive` occurrences. + """ + matches = list(_EMPTY_OBJ_PATTERN.finditer(text)) + if len(matches) <= max_consecutive: + return text + cut_at = matches[max_consecutive - 1].end() + return text[:cut_at] + + +def close_open_json(text: str) -> str: + """Ajoute les brackets/braces manquants pour fermer un JSON tronqué. + + Compte les accolades et crochets non-balancés en ignorant ceux à + l'intérieur d'une chaîne, puis ferme dans le bon ordre (tableaux + ouverts d'abord, puis objets). + """ + depth_brace = 0 + depth_bracket = 0 + in_string = False + escape = False + for c in text: + if escape: + escape = False + continue + if c == "\\": + escape = True + continue + if c == '"': + in_string = not in_string + continue + if in_string: + continue + if c == "{": + depth_brace += 1 + elif c == "}": + depth_brace -= 1 + elif c == "[": + depth_bracket += 1 + elif c == "]": + depth_bracket -= 1 + closed = text.rstrip().rstrip(",") + closed += "]" * max(0, depth_bracket) + closed += "}" * max(0, depth_brace) + return closed + + +def parse_json_output(raw: str) -> dict | None: + """Parse une sortie VLM en dict. Applique plusieurs stratégies : + + 1. strip des fences markdown ```json + 2. parse direct + 3. patch des virgules manquantes + 4. troncature des boucles d'objets vides + fermeture du JSON + + En cas d'échec de toutes les stratégies, retourne + `{"_raw": raw, "_parse_error": str}` pour permettre l'audit manuel + plutôt que de casser le pipeline. + """ + if not raw: + return None + text = strip_fences(raw) + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + patched = patch_missing_commas(text) + try: + return json.loads(patched) + except json.JSONDecodeError: + pass + + trimmed = truncate_empty_loop(patched) + closed = close_open_json(trimmed) + try: + result = json.loads(closed) + if isinstance(result, dict): + result["_truncated_loop"] = True + return result + except json.JSONDecodeError as e: + return {"_raw": raw, "_parse_error": str(e)} diff --git a/pipeline/recueil.py b/pipeline/recueil.py new file mode 100644 index 0000000..c669d77 --- /dev/null +++ b/pipeline/recueil.py @@ -0,0 +1,203 @@ +"""Logique spécifique à la page `recueil` (fiche médicale de recueil OGC). + +Regroupe tout ce qui concerne cette page — la plus riche et la plus difficile +à extraire du dossier OGC — séparé de l'orchestration générale : + +- Résolution des zones configurables (crop Recodage, checkboxes) +- Second passage VLM sur le crop de la colonne Recodage +- Fusion du résultat crop dans le JSON principal +- Classification des codes CIM-10 en DP/DR/DAS par règle métier +- Enrichissement post-extraction (checkboxes Accord/Désaccord, ghs_injustifie) + +Les fonctions sont testables indépendamment de Qwen quand on leur fournit +déjà les sorties OCR brutes. +""" +from __future__ import annotations + +import re +import time +from pathlib import Path +from typing import Any + +from PIL import Image + +from .checkboxes import ( + CheckboxZones, + RECUEIL_ACCORD_DESACCORD, + detect_accord_desaccord, + parse_ghs_injustifie, +) +from .json_utils import parse_json_output +from .ocr_qwen import QwenVLOCR +from .prompts import RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE +from .zones_config import get_zone, load_config + + +# ============================================================ +# Résolution des zones (config JSON + fallback sur les defaults) +# ============================================================ + +def resolve_recueil_zones() -> tuple[tuple[float, float, float, float], CheckboxZones]: + """Charge les zones de la page recueil depuis la config utilisateur, + avec fallback sur les constantes compilées si la config est absente. + + Retourne (zone_crop_recodage, zones_accord_desaccord). + """ + cfg = load_config() + reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE + acc = get_zone("recueil", "accord_checkbox", cfg) + des = get_zone("recueil", "desaccord_checkbox", cfg) + if acc and des: + cb = CheckboxZones(accord=acc, desaccord=des) + else: + cb = RECUEIL_ACCORD_DESACCORD + return reco, cb + + +# ============================================================ +# Classification CIM-10 → DP / DR / DAS (pur, testable sans VLM) +# ============================================================ + +CIM10_RE = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$") + + +def filter_cim10_codes(codes_raw: list[Any]) -> list[dict]: + """Filtre une liste de codes OCR bruts pour ne garder que les CIM-10. + + Les VLM peuvent parfois lire des codes CCAM (actes) dans un crop qui + dépasse sur le bloc Actes. On les retire ici pour ne pas polluer les DAS. + """ + kept = [] + for c in codes_raw or []: + if not isinstance(c, dict): + continue + code = (c.get("code") or "").strip() + if code and CIM10_RE.match(code): + kept.append({ + "code": code, + "position": str(c.get("position") or "").strip(), + }) + return kept + + +def classify_codes_dp_dr_das(codes: list[dict]) -> tuple[str, str, list[dict]]: + """Classifie une liste de codes {code, position} en DP, DR et liste de DAS. + + Règle métier : + - 1er code sans position → DP + - 2e code sans position → DR (ignoré si identique au DP : le VLM peut + dupliquer le DP quand la case DR est visuellement vide) + - tous les codes avec position → DAS + - codes sans position au-delà du 2e → DAS sans position (pour ne rien perdre) + """ + dp, dr = "", "" + das: list[dict] = [] + dp_assigned = dr_assigned = False + for c in codes: + code, position = c["code"], c["position"] + if not position: + if not dp_assigned: + dp, dp_assigned = code, True + elif not dr_assigned: + if code == dp: + dr_assigned = True # doublon DP → DR vide + else: + dr, dr_assigned = code, True + else: + das.append({"code": code, "position": ""}) + else: + das.append(c) + return dp, dr, das + + +# ============================================================ +# Second passage VLM sur crop Recodage +# ============================================================ + +def run_recodage_crop_pass(image_path: Path, ocr: QwenVLOCR, + zone: tuple[float, float, float, float] | None = None + ) -> dict | None: + """Execute un second passage VLM sur le crop zonal de la colonne Recodage. + + Sauvegarde le crop à côté de l'image source (suffixe `_recodage.png`) + pour audit. Retourne un dict avec `dp/dr/das` + métadonnées, ou None + en cas d'échec d'OCR ou de parsing. + """ + try: + img = Image.open(image_path) + w, h = img.size + z = zone + if z is None: + z, _ = resolve_recueil_zones() + x1, y1, x2, y2 = z + crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h))) + crop_path = image_path.parent / f"{image_path.stem}_recodage.png" + crop.save(crop_path) + except Exception: + return None + + t0 = time.time() + res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024) + parsed = parse_json_output(res["text"]) + if not isinstance(parsed, dict) or "_parse_error" in parsed: + return None + + codes = filter_cim10_codes(parsed.get("codes") or []) + dp, dr, das = classify_codes_dp_dr_das(codes) + return { + "dp": dp, "dr": dr, "das": das, + "_source": "crop_recodage", + "_elapsed_s": round(res["elapsed_s"], 2), + "_n_codes_raw": len(parsed.get("codes") or []), + "_n_codes_kept": len(codes), + } + + +def merge_codage_reco(parsed: dict, reco: dict) -> None: + """Fusionne le résultat du crop Recodage dans `parsed["codage_reco"]`. + + Politique de merge : le crop est plus fiable (contexte isolé) donc il + prime sur le passage principal. Exception : si un champ du crop est vide + mais que le passage principal l'a rempli, on garde celui du passage + principal (on ne dégrade jamais un résultat existant). + """ + existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {} + parsed["codage_reco"] = { + "dp": reco.get("dp", "") or existing.get("dp", ""), + "dr": reco.get("dr", "") or existing.get("dr", ""), + "das": reco.get("das") or existing.get("das") or [], + } + parsed.setdefault("_crop_recodage", {})["result"] = reco + + +# ============================================================ +# Enrichissement post-extraction d'une page recueil +# ============================================================ + +def enrich_recueil(parsed: dict, image_path: Path, ocr: QwenVLOCR, + cb_zones: CheckboxZones | None = None) -> dict: + """Enrichit un JSON recueil parsé avec : + - checkbox accord/désaccord (méthode densité pixels, indépendante du VLM) + - normalisation `ghs_injustifie` → 0 / 1 / "" + - second passage VLM sur le crop Recodage si besoin, fusionné dans `codage_reco` + + Modifie `parsed` en place et le renvoie (pratique pour chaînage). + """ + if not isinstance(parsed, dict): + return parsed + zones = cb_zones or resolve_recueil_zones()[1] + + # Checkboxes accord / désaccord + cb = detect_accord_desaccord(image_path, zones) + parsed["accord_desaccord"] = cb["decision"] + parsed["_checkbox_debug"] = cb + + # Normalisation ghs_injustifie + parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", "")) + + # Second passage Recodage + reco = run_recodage_crop_pass(image_path, ocr) + if reco: + merge_codage_reco(parsed, reco) + + return parsed