"""Orchestration d'extraction pour un dossier OGC.""" import json import re import time from pathlib import Path from .ingest import pdf_to_images from .classify import detect_page_type, route_by_index from .ocr_qwen import QwenVLOCR from .prompts import PAGE_TYPES, PROMPT_HEADER from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD from .validation import annotate as validate_annotate _EMPTY_OBJ_PATTERN = re.compile( r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}', re.DOTALL, ) def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str: """Détecte et tronque les boucles d'objets vides. GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand un tableau DAS ou actes est vide dans l'image. La sortie est alors tronquée à `max_new_tokens` sans fermer le JSON → parse error. On garde au plus `max_consecutive` objets vides puis on coupe. """ matches = list(_EMPTY_OBJ_PATTERN.finditer(text)) if len(matches) <= max_consecutive: return text # On coupe après la fin du `max_consecutive`-ième match cut_at = matches[max_consecutive - 1].end() return text[:cut_at] def _close_open_json(text: str) -> str: """Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué.""" # Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles depth_brace = 0 depth_bracket = 0 in_string = False escape = False for c in text: if escape: escape = False continue if c == "\\": escape = True continue if c == '"': in_string = not in_string continue if in_string: continue if c == "{": depth_brace += 1 elif c == "}": depth_brace -= 1 elif c == "[": depth_bracket += 1 elif c == "]": depth_bracket -= 1 # Retirer les virgules traînantes closed = text.rstrip().rstrip(",") # Fermer en priorité les crochets ouverts (tableaux), puis les accolades closed += "]" * max(0, depth_bracket) closed += "}" * max(0, depth_brace) return closed def parse_json_output(raw: str) -> dict | None: """Tente d'extraire un JSON depuis la sortie GLM-OCR. Stratégies successives : 1. parse direct après retrait des fences ```json 2. patch des virgules manquantes entre objets / tableaux 3. détection et troncature des boucles d'objets vides (cas fréquent sur tableaux DAS/actes vides → boucle jusqu'à max_new_tokens) 4. fermeture des structures JSON ouvertes après troncature """ if not raw: return None text = raw.strip() # 1) fences markdown text = re.sub(r"^```(?:json)?\s*", "", text) text = re.sub(r"\s*```$", "", text) try: return json.loads(text) except json.JSONDecodeError: pass # 2) virgules manquantes entre `} {` et `] [` patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text) patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched) try: return json.loads(patched) except json.JSONDecodeError: pass # 3) troncature des boucles d'objets vides puis 4) fermeture trimmed = _truncate_empty_loop(patched) closed = _close_open_json(trimmed) try: result = json.loads(closed) result["_truncated_loop"] = True # trace de l'intervention return result except json.JSONDecodeError as e: return {"_raw": raw, "_parse_error": str(e)} def extract_dossier(pdf_path: str | Path, verbose: bool = True, use_standard_routing: bool = True) -> dict: """Pipeline complet d'un dossier : PDF → JSON structuré. use_standard_routing=True (défaut) : route les pages par index selon l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps. Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par "recueil" — si non, bascule en classification complète (fallback). """ pdf_path = Path(pdf_path) ocr = QwenVLOCR() if verbose: print(f"[{pdf_path.name}] modèle prêt, VRAM={ocr.vram_gb:.2f} Go") images = pdf_to_images(str(pdf_path)) if verbose: print(f"[{pdf_path.name}] {len(images)} pages converties") # Choix de stratégie de routing page_types = [None] * len(images) headers = [""] * len(images) if use_standard_routing: # Vérif rapide sur la page 1 (seul OCR de classification) ptype1, header1 = detect_page_type(images[0], ocr) if ptype1 == "recueil": page_types = route_by_index(len(images)) headers[0] = header1 if verbose: print(f" routing standard (page 1 = recueil OK)") else: if verbose: print(f" page 1 = {ptype1} → fallback classification") use_standard_routing = False result = { "fichier": pdf_path.stem, "pdf_hash": images[0].parent.name, "pages": [], "extraction": {}, } for idx, img_path in enumerate(images, 1): t0 = time.time() if use_standard_routing: ptype = page_types[idx - 1] header_text = headers[idx - 1] else: ptype, header_text = detect_page_type(img_path, ocr) page_info = { "page": idx, "type": ptype, "header": header_text.strip(), "elapsed_s": None, } if verbose: print(f" p{idx}: {ptype}") prompt_conf = PAGE_TYPES.get(ptype) if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER: res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096) parsed = parse_json_output(res["text"]) page_info["ocr_raw"] = res["text"] page_info["parsed"] = parsed page_info["elapsed_s"] = round(res["elapsed_s"], 2) # Enrichissement : checkboxes accord/désaccord sur la fiche recueil # (GLM-OCR ne sait pas lire les checkboxes — voir test_prompt_crop_v2.py) if ptype == "recueil" and isinstance(parsed, dict): cb = detect_accord_desaccord(img_path, RECUEIL_ACCORD_DESACCORD) parsed["accord_desaccord"] = cb["decision"] parsed["_checkbox_debug"] = cb # ratios + diff pour audit page_info["parsed"] = parsed # Indexer par type pour accès direct dans result["extraction"] result["extraction"][ptype] = parsed else: # Pages non structurées : juste l'en-tête déjà OCR page_info["elapsed_s"] = round(time.time() - t0, 2) result["pages"].append(page_info) # Post-traitement : validation ATIH de tous les codes extraits result = validate_annotate(result) return result