Qwen renvoie typiquement le libellé complet `0 SE 1 2 3 4 ATU FFM FSD` dans le champ ghs_injustifie alors qu'une seule valeur 0/1 est attendue. Ajout de `pipeline.checkboxes.parse_ghs_injustifie` qui extrait le premier chiffre 0/1 via regex, ou "" si illisible. Post-traitement appliqué à chaque extraction recueil et aux 18 JSONs V2 existants (10 fichiers corrigés en place — les 8 autres avaient déjà ghs_injustifie absent ou vide). Note sur les 7 cases SE1-4/ATU/FFM/FSD : zones trop petites pour être calibrées à l'œil et aucun cas positif (`ghs_injustifie=1`) dans l'échantillon 2018 pour valider visuellement. La détection est en placeholder, à recalibrer sur un cas positif réel. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
297 lines
11 KiB
Python
297 lines
11 KiB
Python
"""Orchestration d'extraction pour un dossier OGC."""
|
|
import json
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
from .ingest import pdf_to_images
|
|
from .classify import detect_page_type, route_by_index
|
|
from .ocr_qwen import QwenVLOCR
|
|
from .prompts import (
|
|
PAGE_TYPES, PROMPT_HEADER,
|
|
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
|
|
)
|
|
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie
|
|
from .validation import annotate as validate_annotate
|
|
|
|
|
|
_EMPTY_OBJ_PATTERN = re.compile(
|
|
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
|
|
re.DOTALL,
|
|
)
|
|
|
|
|
|
def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
|
|
"""Détecte et tronque les boucles d'objets vides.
|
|
|
|
GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand
|
|
un tableau DAS ou actes est vide dans l'image. La sortie est alors
|
|
tronquée à `max_new_tokens` sans fermer le JSON → parse error.
|
|
On garde au plus `max_consecutive` objets vides puis on coupe.
|
|
"""
|
|
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
|
|
if len(matches) <= max_consecutive:
|
|
return text
|
|
# On coupe après la fin du `max_consecutive`-ième match
|
|
cut_at = matches[max_consecutive - 1].end()
|
|
return text[:cut_at]
|
|
|
|
|
|
def _close_open_json(text: str) -> str:
|
|
"""Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué."""
|
|
# Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles
|
|
depth_brace = 0
|
|
depth_bracket = 0
|
|
in_string = False
|
|
escape = False
|
|
for c in text:
|
|
if escape:
|
|
escape = False
|
|
continue
|
|
if c == "\\":
|
|
escape = True
|
|
continue
|
|
if c == '"':
|
|
in_string = not in_string
|
|
continue
|
|
if in_string:
|
|
continue
|
|
if c == "{": depth_brace += 1
|
|
elif c == "}": depth_brace -= 1
|
|
elif c == "[": depth_bracket += 1
|
|
elif c == "]": depth_bracket -= 1
|
|
# Retirer les virgules traînantes
|
|
closed = text.rstrip().rstrip(",")
|
|
# Fermer en priorité les crochets ouverts (tableaux), puis les accolades
|
|
closed += "]" * max(0, depth_bracket)
|
|
closed += "}" * max(0, depth_brace)
|
|
return closed
|
|
|
|
|
|
def parse_json_output(raw: str) -> dict | None:
|
|
"""Tente d'extraire un JSON depuis la sortie GLM-OCR.
|
|
|
|
Stratégies successives :
|
|
1. parse direct après retrait des fences ```json
|
|
2. patch des virgules manquantes entre objets / tableaux
|
|
3. détection et troncature des boucles d'objets vides (cas fréquent sur
|
|
tableaux DAS/actes vides → boucle jusqu'à max_new_tokens)
|
|
4. fermeture des structures JSON ouvertes après troncature
|
|
"""
|
|
if not raw:
|
|
return None
|
|
text = raw.strip()
|
|
# 1) fences markdown
|
|
text = re.sub(r"^```(?:json)?\s*", "", text)
|
|
text = re.sub(r"\s*```$", "", text)
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# 2) virgules manquantes entre `} {` et `] [`
|
|
patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text)
|
|
patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched)
|
|
try:
|
|
return json.loads(patched)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# 3) troncature des boucles d'objets vides puis 4) fermeture
|
|
trimmed = _truncate_empty_loop(patched)
|
|
closed = _close_open_json(trimmed)
|
|
try:
|
|
result = json.loads(closed)
|
|
result["_truncated_loop"] = True # trace de l'intervention
|
|
return result
|
|
except json.JSONDecodeError as e:
|
|
return {"_raw": raw, "_parse_error": str(e)}
|
|
|
|
|
|
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
|
|
"""Second passage VLM sur le crop zonal de la colonne Recodage.
|
|
|
|
Qwen nous renvoie la liste brute de tous les codes visibles (avec position
|
|
si présente). On classifie DP/DR/DAS en Python par règles :
|
|
- les 1ᵉʳ et 2ᵉ codes SANS position → DP puis DR (DR peut être vide si
|
|
le 2ᵉ code a déjà une position).
|
|
- tous les codes AVEC position → DAS.
|
|
|
|
Retourne un dict {dp, dr, das[]} ou None en cas d'échec.
|
|
"""
|
|
try:
|
|
img = Image.open(image_path)
|
|
w, h = img.size
|
|
x1, y1, x2, y2 = RECUEIL_RECODAGE_ZONE
|
|
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
|
|
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
|
|
crop.save(crop_path)
|
|
except Exception:
|
|
return None
|
|
|
|
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
|
|
parsed = parse_json_output(res["text"])
|
|
if not isinstance(parsed, dict) or "_parse_error" in parsed:
|
|
return None
|
|
|
|
# Filtrer : ne garder que les codes au format CIM-10. Si le crop dépasse
|
|
# malgré tout dans la zone Actes, les CCAM (4 lettres + 3 chiffres) seront
|
|
# exclus ici.
|
|
cim10_re = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
|
|
codes_raw = parsed.get("codes") or []
|
|
codes = []
|
|
for c in codes_raw:
|
|
if not isinstance(c, dict): continue
|
|
code = (c.get("code") or "").strip()
|
|
if code and cim10_re.match(code):
|
|
codes.append({
|
|
"code": code,
|
|
"position": str(c.get("position") or "").strip(),
|
|
})
|
|
|
|
# Classifier par règle métier :
|
|
# - 1er code sans position → DP
|
|
# - 2e code sans position → DR (sauf s'il est identique au DP : Qwen tend
|
|
# à dupliquer le DP quand DR est vide — on préfère DR="")
|
|
# - codes avec position → DAS
|
|
dp, dr = "", ""
|
|
das = []
|
|
dp_assigned = dr_assigned = False
|
|
for c in codes:
|
|
code, position = c["code"], c["position"]
|
|
if not position:
|
|
if not dp_assigned:
|
|
dp, dp_assigned = code, True
|
|
elif not dr_assigned:
|
|
if code == dp:
|
|
# doublon du DP → on considère que DR est vide
|
|
dr_assigned = True
|
|
else:
|
|
dr, dr_assigned = code, True
|
|
else:
|
|
das.append({"code": code, "position": ""})
|
|
else:
|
|
das.append(c)
|
|
return {
|
|
"dp": dp, "dr": dr, "das": das,
|
|
"_source": "crop_recodage",
|
|
"_elapsed_s": round(res["elapsed_s"], 2),
|
|
"_n_codes_raw": len(codes_raw),
|
|
"_n_codes_kept": len(codes),
|
|
}
|
|
|
|
|
|
def _merge_codage_reco(parsed: dict, reco: dict) -> None:
|
|
"""Fusionne le résultat du crop Recodage dans parsed["codage_reco"].
|
|
|
|
Politique : le crop est plus fiable (contexte isolé), il prime sur le
|
|
passage principal SAUF si le crop laisse vide un champ que le principal
|
|
avait bien lu.
|
|
"""
|
|
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
|
|
merged = {
|
|
"dp": reco.get("dp", "") or existing.get("dp", ""),
|
|
"dr": reco.get("dr", "") or existing.get("dr", ""),
|
|
"das": reco.get("das") or existing.get("das") or [],
|
|
}
|
|
parsed["codage_reco"] = merged
|
|
parsed.setdefault("_crop_recodage", {})["result"] = reco
|
|
|
|
|
|
def extract_dossier(pdf_path: str | Path, verbose: bool = True,
|
|
use_standard_routing: bool = True) -> dict:
|
|
"""Pipeline complet d'un dossier : PDF → JSON structuré.
|
|
|
|
use_standard_routing=True (défaut) : route les pages par index selon
|
|
l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps.
|
|
Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par
|
|
"recueil" — si non, bascule en classification complète (fallback).
|
|
"""
|
|
pdf_path = Path(pdf_path)
|
|
ocr = QwenVLOCR()
|
|
if verbose:
|
|
print(f"[{pdf_path.name}] modèle prêt, VRAM={ocr.vram_gb:.2f} Go")
|
|
|
|
images = pdf_to_images(str(pdf_path))
|
|
if verbose:
|
|
print(f"[{pdf_path.name}] {len(images)} pages converties")
|
|
|
|
# Choix de stratégie de routing
|
|
page_types = [None] * len(images)
|
|
headers = [""] * len(images)
|
|
if use_standard_routing:
|
|
# Vérif rapide sur la page 1 (seul OCR de classification)
|
|
ptype1, header1 = detect_page_type(images[0], ocr)
|
|
if ptype1 == "recueil":
|
|
page_types = route_by_index(len(images))
|
|
headers[0] = header1
|
|
if verbose:
|
|
print(f" routing standard (page 1 = recueil OK)")
|
|
else:
|
|
if verbose:
|
|
print(f" page 1 = {ptype1} → fallback classification")
|
|
use_standard_routing = False
|
|
|
|
result = {
|
|
"fichier": pdf_path.stem,
|
|
"pdf_hash": images[0].parent.name,
|
|
"pages": [],
|
|
"extraction": {},
|
|
}
|
|
|
|
for idx, img_path in enumerate(images, 1):
|
|
t0 = time.time()
|
|
if use_standard_routing:
|
|
ptype = page_types[idx - 1]
|
|
header_text = headers[idx - 1]
|
|
else:
|
|
ptype, header_text = detect_page_type(img_path, ocr)
|
|
page_info = {
|
|
"page": idx,
|
|
"type": ptype,
|
|
"header": header_text.strip(),
|
|
"elapsed_s": None,
|
|
}
|
|
if verbose:
|
|
print(f" p{idx}: {ptype}")
|
|
|
|
prompt_conf = PAGE_TYPES.get(ptype)
|
|
if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER:
|
|
res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096)
|
|
parsed = parse_json_output(res["text"])
|
|
page_info["ocr_raw"] = res["text"]
|
|
page_info["parsed"] = parsed
|
|
page_info["elapsed_s"] = round(res["elapsed_s"], 2)
|
|
|
|
# Enrichissement : checkboxes + normalisation champs booléens
|
|
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
|
|
# à cocher (cf. scratch/test_prompt_crop_v2.py).
|
|
if ptype == "recueil" and isinstance(parsed, dict):
|
|
cb = detect_accord_desaccord(img_path, RECUEIL_ACCORD_DESACCORD)
|
|
parsed["accord_desaccord"] = cb["decision"]
|
|
parsed["_checkbox_debug"] = cb # ratios + diff pour audit
|
|
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"
|
|
# → ne garder que le chiffre 0/1 de tête
|
|
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
|
|
|
|
# Second passage : crop de la colonne Recodage pour compenser
|
|
# la sous-extraction observée sur codage_reco.* en passage principal.
|
|
reco = _extract_recodage_crop(img_path, ocr)
|
|
if reco:
|
|
_merge_codage_reco(parsed, reco)
|
|
|
|
page_info["parsed"] = parsed
|
|
|
|
# Indexer par type pour accès direct dans result["extraction"]
|
|
result["extraction"][ptype] = parsed
|
|
else:
|
|
# Pages non structurées : juste l'en-tête déjà OCR
|
|
page_info["elapsed_s"] = round(time.time() - t0, 2)
|
|
|
|
result["pages"].append(page_info)
|
|
|
|
# Post-traitement : validation ATIH de tous les codes extraits
|
|
result = validate_annotate(result)
|
|
|
|
return result
|