Files
Aivanov_scan_ogc/pipeline/extract.py
Dom 1255468676 feat(ui): calibration visuelle des zones via dessin à la souris
Nouveau module pipeline/zones_config.py : charge les zones d'extraction
depuis un fichier zones_config.json (coordonnées relatives 0-1), avec
fallback sur les constantes Python. Config partagée entre :
- pipeline/extract.py (crop colonne Recodage)
- pipeline/checkboxes.py (cases Accord/Désaccord)

Zones configurables aujourd'hui (page recueil) :
- codage_reco (crop zonal pour le second passage VLM)
- accord_checkbox / desaccord_checkbox (densité de pixels)

Mode "🔧 Calibration zones" ajouté dans pipeline/ui_overlay.py :
- Sélection d'un PDF de référence (idéalement bien cadré)
- Canvas interactif (streamlit-drawable-canvas) avec les zones
  existantes pré-dessinées en rouge
- Dessin/déplacement/redimensionnement à la souris
- Saisie d'un nom et description par zone
- Sauvegarde en JSON (ou OGC_ZONES_CONFIG si défini)

Permet au métier (Khalid) de recalibrer les zones sans toucher au code,
par exemple si le formulaire ATIH évolue ou si les scans sont d'un autre
établissement.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:59 +02:00

317 lines
12 KiB
Python

"""Orchestration d'extraction pour un dossier OGC."""
import json
import re
import time
from pathlib import Path
from PIL import Image
from .ingest import pdf_to_images
from .classify import detect_page_type, route_by_index
from .ocr_qwen import QwenVLOCR
from .prompts import (
PAGE_TYPES, PROMPT_HEADER,
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
)
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie, CheckboxZones
from .zones_config import load_config, get_zone
from .validation import annotate as validate_annotate
_EMPTY_OBJ_PATTERN = re.compile(
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
re.DOTALL,
)
def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
"""Détecte et tronque les boucles d'objets vides.
GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand
un tableau DAS ou actes est vide dans l'image. La sortie est alors
tronquée à `max_new_tokens` sans fermer le JSON → parse error.
On garde au plus `max_consecutive` objets vides puis on coupe.
"""
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
if len(matches) <= max_consecutive:
return text
# On coupe après la fin du `max_consecutive`-ième match
cut_at = matches[max_consecutive - 1].end()
return text[:cut_at]
def _close_open_json(text: str) -> str:
"""Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué."""
# Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for c in text:
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"':
in_string = not in_string
continue
if in_string:
continue
if c == "{": depth_brace += 1
elif c == "}": depth_brace -= 1
elif c == "[": depth_bracket += 1
elif c == "]": depth_bracket -= 1
# Retirer les virgules traînantes
closed = text.rstrip().rstrip(",")
# Fermer en priorité les crochets ouverts (tableaux), puis les accolades
closed += "]" * max(0, depth_bracket)
closed += "}" * max(0, depth_brace)
return closed
def parse_json_output(raw: str) -> dict | None:
"""Tente d'extraire un JSON depuis la sortie GLM-OCR.
Stratégies successives :
1. parse direct après retrait des fences ```json
2. patch des virgules manquantes entre objets / tableaux
3. détection et troncature des boucles d'objets vides (cas fréquent sur
tableaux DAS/actes vides → boucle jusqu'à max_new_tokens)
4. fermeture des structures JSON ouvertes après troncature
"""
if not raw:
return None
text = raw.strip()
# 1) fences markdown
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 2) virgules manquantes entre `} {` et `] [`
patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text)
patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched)
try:
return json.loads(patched)
except json.JSONDecodeError:
pass
# 3) troncature des boucles d'objets vides puis 4) fermeture
trimmed = _truncate_empty_loop(patched)
closed = _close_open_json(trimmed)
try:
result = json.loads(closed)
result["_truncated_loop"] = True # trace de l'intervention
return result
except json.JSONDecodeError as e:
return {"_raw": raw, "_parse_error": str(e)}
def _recueil_zones() -> tuple[tuple, CheckboxZones]:
"""Charge les zones configurables pour la page recueil.
Retourne (recodage_zone, accord_desaccord_zones). Si la config n'a pas
d'entrée, on retombe sur les constantes compilées.
"""
cfg = load_config()
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
acc = get_zone("recueil", "accord_checkbox", cfg)
des = get_zone("recueil", "desaccord_checkbox", cfg)
if acc and des:
cb = CheckboxZones(accord=acc, desaccord=des)
else:
cb = RECUEIL_ACCORD_DESACCORD
return reco, cb
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
"""Second passage VLM sur le crop zonal de la colonne Recodage.
Qwen nous renvoie la liste brute de tous les codes visibles (avec position
si présente). On classifie DP/DR/DAS en Python par règles :
- les 1ᵉʳ et 2ᵉ codes SANS position → DP puis DR (DR peut être vide si
le 2ᵉ code a déjà une position).
- tous les codes AVEC position → DAS.
Retourne un dict {dp, dr, das[]} ou None en cas d'échec.
"""
try:
img = Image.open(image_path)
w, h = img.size
reco_zone, _ = _recueil_zones()
x1, y1, x2, y2 = reco_zone
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
crop.save(crop_path)
except Exception:
return None
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
parsed = parse_json_output(res["text"])
if not isinstance(parsed, dict) or "_parse_error" in parsed:
return None
# Filtrer : ne garder que les codes au format CIM-10. Si le crop dépasse
# malgré tout dans la zone Actes, les CCAM (4 lettres + 3 chiffres) seront
# exclus ici.
cim10_re = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
codes_raw = parsed.get("codes") or []
codes = []
for c in codes_raw:
if not isinstance(c, dict): continue
code = (c.get("code") or "").strip()
if code and cim10_re.match(code):
codes.append({
"code": code,
"position": str(c.get("position") or "").strip(),
})
# Classifier par règle métier :
# - 1er code sans position → DP
# - 2e code sans position → DR (sauf s'il est identique au DP : Qwen tend
# à dupliquer le DP quand DR est vide — on préfère DR="")
# - codes avec position → DAS
dp, dr = "", ""
das = []
dp_assigned = dr_assigned = False
for c in codes:
code, position = c["code"], c["position"]
if not position:
if not dp_assigned:
dp, dp_assigned = code, True
elif not dr_assigned:
if code == dp:
# doublon du DP → on considère que DR est vide
dr_assigned = True
else:
dr, dr_assigned = code, True
else:
das.append({"code": code, "position": ""})
else:
das.append(c)
return {
"dp": dp, "dr": dr, "das": das,
"_source": "crop_recodage",
"_elapsed_s": round(res["elapsed_s"], 2),
"_n_codes_raw": len(codes_raw),
"_n_codes_kept": len(codes),
}
def _merge_codage_reco(parsed: dict, reco: dict) -> None:
"""Fusionne le résultat du crop Recodage dans parsed["codage_reco"].
Politique : le crop est plus fiable (contexte isolé), il prime sur le
passage principal SAUF si le crop laisse vide un champ que le principal
avait bien lu.
"""
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
merged = {
"dp": reco.get("dp", "") or existing.get("dp", ""),
"dr": reco.get("dr", "") or existing.get("dr", ""),
"das": reco.get("das") or existing.get("das") or [],
}
parsed["codage_reco"] = merged
parsed.setdefault("_crop_recodage", {})["result"] = reco
def extract_dossier(pdf_path: str | Path, verbose: bool = True,
use_standard_routing: bool = True) -> dict:
"""Pipeline complet d'un dossier : PDF → JSON structuré.
use_standard_routing=True (défaut) : route les pages par index selon
l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps.
Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par
"recueil" — si non, bascule en classification complète (fallback).
"""
pdf_path = Path(pdf_path)
ocr = QwenVLOCR()
if verbose:
print(f"[{pdf_path.name}] modèle prêt, VRAM={ocr.vram_gb:.2f} Go")
images = pdf_to_images(str(pdf_path))
if verbose:
print(f"[{pdf_path.name}] {len(images)} pages converties")
# Choix de stratégie de routing
page_types = [None] * len(images)
headers = [""] * len(images)
if use_standard_routing:
# Vérif rapide sur la page 1 (seul OCR de classification)
ptype1, header1 = detect_page_type(images[0], ocr)
if ptype1 == "recueil":
page_types = route_by_index(len(images))
headers[0] = header1
if verbose:
print(f" routing standard (page 1 = recueil OK)")
else:
if verbose:
print(f" page 1 = {ptype1} → fallback classification")
use_standard_routing = False
result = {
"fichier": pdf_path.stem,
"pdf_hash": images[0].parent.name,
"pages": [],
"extraction": {},
}
for idx, img_path in enumerate(images, 1):
t0 = time.time()
if use_standard_routing:
ptype = page_types[idx - 1]
header_text = headers[idx - 1]
else:
ptype, header_text = detect_page_type(img_path, ocr)
page_info = {
"page": idx,
"type": ptype,
"header": header_text.strip(),
"elapsed_s": None,
}
if verbose:
print(f" p{idx}: {ptype}")
prompt_conf = PAGE_TYPES.get(ptype)
if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER:
res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096)
parsed = parse_json_output(res["text"])
page_info["ocr_raw"] = res["text"]
page_info["parsed"] = parsed
page_info["elapsed_s"] = round(res["elapsed_s"], 2)
# Enrichissement : checkboxes + normalisation champs booléens
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
# à cocher (cf. scratch/test_prompt_crop_v2.py).
if ptype == "recueil" and isinstance(parsed, dict):
_, cb_zones = _recueil_zones()
cb = detect_accord_desaccord(img_path, cb_zones)
parsed["accord_desaccord"] = cb["decision"]
parsed["_checkbox_debug"] = cb # ratios + diff pour audit
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"
# → ne garder que le chiffre 0/1 de tête
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
# Second passage : crop de la colonne Recodage pour compenser
# la sous-extraction observée sur codage_reco.* en passage principal.
reco = _extract_recodage_crop(img_path, ocr)
if reco:
_merge_codage_reco(parsed, reco)
page_info["parsed"] = parsed
# Indexer par type pour accès direct dans result["extraction"]
result["extraction"][ptype] = parsed
else:
# Pages non structurées : juste l'en-tête déjà OCR
page_info["elapsed_s"] = round(time.time() - t0, 2)
result["pages"].append(page_info)
# Post-traitement : validation ATIH de tous les codes extraits
result = validate_annotate(result)
return result