""" Extraction structurée des fiches OGC (contrôle T2A) Pipeline : PDF scan → images → docTR OCR par zones → VLM correction → données structurées """ import os import re import json import glob import base64 import io import fitz # PyMuPDF import numpy as np import requests from PIL import Image from doctr.models import ocr_predictor # ============================================================ # Configuration Ollama VLM # ============================================================ OLLAMA_URL = "http://localhost:11434/api/generate" VLM_MODEL = "gemma3:27b-cloud" # ============================================================ # Zones d'extraction — coordonnées relatives (0-1) sur la page # Format: (x_min, y_min, x_max, y_max) # ============================================================ ZONES_PAGE1 = { # --- En-tête --- "etablissement": (0.020, 0.060, 0.520, 0.085), "finess": (0.520, 0.060, 0.720, 0.085), "date_debut_controle": (0.720, 0.060, 0.960, 0.085), "n_ogc": (0.780, 0.088, 0.960, 0.115), "n_champ": (0.020, 0.088, 0.120, 0.110), "dates_sejour": (0.250, 0.110, 0.600, 0.135), # --- Données du séjour — ligne Établissement --- "age_etab": (0.120, 0.185, 0.195, 0.208), "sexe_etab": (0.218, 0.185, 0.260, 0.208), "duree_sejour_etab": (0.435, 0.185, 0.485, 0.208), "mode_entree_etab": (0.495, 0.185, 0.540, 0.208), "provenance_etab": (0.545, 0.185, 0.595, 0.208), "mode_sortie_etab": (0.600, 0.185, 0.650, 0.208), "destination_etab": (0.655, 0.185, 0.710, 0.208), "nb_seances_etab": (0.710, 0.185, 0.760, 0.208), "nb_rum_etab": (0.760, 0.185, 0.810, 0.208), # --- Données du séjour — ligne Recodage --- "age_reco": (0.120, 0.210, 0.195, 0.233), "sexe_reco": (0.218, 0.210, 0.260, 0.233), "duree_sejour_reco": (0.435, 0.210, 0.485, 0.233), "mode_entree_reco": (0.495, 0.210, 0.540, 0.233), "provenance_reco": (0.545, 0.210, 0.595, 0.233), "mode_sortie_reco": (0.600, 0.210, 0.650, 0.233), "destination_reco": (0.655, 0.210, 0.710, 0.233), # --- Données du RUM --- "um_etab": (0.370, 0.270, 0.435, 0.293), "igs_etab": (0.460, 0.270, 0.520, 0.293), "duree_rum_etab": (0.630, 0.258, 0.690, 0.280), "dates_rum_etab": (0.545, 0.282, 0.770, 0.302), "um_reco": (0.370, 0.312, 0.435, 0.335), "igs_reco": (0.460, 0.312, 0.520, 0.335), "duree_rum_reco": (0.630, 0.300, 0.690, 0.322), # --- Codage Établissement (DP, DR, DAS) --- "dp_code_etab": (0.065, 0.355, 0.170, 0.375), "dp_libelle": (0.200, 0.355, 0.770, 0.375), "dr_code_etab": (0.065, 0.375, 0.170, 0.392), "das_bloc_etab": (0.065, 0.385, 0.770, 0.475), # --- Recodage (colonne droite) --- "dp_code_reco": (0.785, 0.355, 0.920, 0.375), "dr_code_reco": (0.785, 0.375, 0.920, 0.392), "das_bloc_reco": (0.785, 0.385, 0.960, 0.475), # --- Actes --- "actes_bloc_etab": (0.065, 0.680, 0.770, 0.790), "actes_bloc_reco": (0.785, 0.680, 0.960, 0.790), # --- GHM / GHS (ligne unique en bas) --- "ghm_ghs_ligne": (0.010, 0.808, 0.960, 0.830), # --- Décisions --- "recodage_impactant": (0.010, 0.838, 0.400, 0.858), "ghs_injustifie": (0.010, 0.857, 0.180, 0.878), "praticien_conseil": (0.010, 0.898, 0.500, 0.920), } # Zones spéciales pour détection de checkboxes (accord/désaccord page recueil) # Zone englobant les 2 checkboxes + labels ZONE_ACCORD_DESACCORD = (0.580, 0.838, 0.850, 0.878) # Sous-zones des checkboxes individuelles (carrés ~15x15px à 300dpi) ZONE_CHECKBOX_ACCORD = (0.588, 0.840, 0.610, 0.858) ZONE_CHECKBOX_DESACCORD = (0.588, 0.860, 0.610, 0.878) # Page "Concertation 2/2" — Décision finale # Corrigé : les zones GHS sont dans le bloc decision_bloc (plus bas que prévu) ZONES_CONCERTATION_2 = { "ghs_ligne": (0.030, 0.190, 0.960, 0.230), "accord_concertation": (0.030, 0.270, 0.960, 0.320), "date_concertation": (0.100, 0.690, 0.350, 0.730), } # Page "Concertation 1/2" — Argumentaire ZONES_CONCERTATION_1 = { "date_concertation_arg": (0.140, 0.125, 0.400, 0.160), "argumentaire": (0.030, 0.220, 0.960, 0.560), } # ============================================================ # Post-traitement — nettoyage des valeurs extraites # ============================================================ def clean_value(text, field_type="text"): """Nettoie une valeur extraite selon son type""" if not text: return "" # Supprimer les labels courants capturés dans les zones labels_to_strip = [ r"Etablissement\s*:?\s*", r"FINESS\s*:?\s*", r"Date?\s*début\s*contrôle\s*:?\s*", r"[Nn][o°]\s*OGC\s*:?\s*", r"[Nn][o°]\s*[Cc]hamp\s*:?\s*", r"Dates?\s*du\s*séjour\s*:?\s*", r"^[Dd][Pp]\s+", r"^[Dd][Rr]\s*$", r"GHM\s*[ée]tablissement\s*:?\s*", r"GHS\s*[ée]tablissement\s*:?\s*", r"GHM\s*apr[eè]s\s*recodage\s*:?\s*", r"GHS\s*apr[eè]s\s*recodage\s*:?\s*", r"GHS\s*initial\s*:?\s*", r"GHS\s*avant\s*concertation\s*:?\s*", r"GHS\s*final\s*apr[eè]s\s*concertation\s*:?\s*", r"Recodage\s*impactant\s*la\s*facturation\s*:?\s*", r"GHS\s*injustifi[ée]\s*:?\s*", r"^Recoaage\s*", # OCR error fréquent pour "Recodage" r"^Recodage\s*", ] cleaned = text for pattern in labels_to_strip: cleaned = re.sub(pattern, "", cleaned, flags=re.IGNORECASE).strip() # Supprimer les artefacts OCR courants cleaned = re.sub(r"^\s*[\-/|.]+\s*$", "", cleaned) # Lignes de bruit pur cleaned = re.sub(r"\s*\|\s*", " | ", cleaned) # Normaliser séparateurs cleaned = cleaned.strip(" |-./") if field_type == "number": nums = re.findall(r"\d+", cleaned) if not nums: return "" return max(nums, key=lambda x: len(x)) if field_type == "code_cim": match = re.search(r"[A-Z]\d{2,4}\.?\d*\s*\*?", cleaned) return match.group(0).strip() if match else cleaned if field_type == "date": match = re.search(r"\d{2}/\d{2}/\d{4}", cleaned) return match.group(0) if match else cleaned if field_type == "dates_range": match = re.search(r"(\d{2}/\d{2}/\d{4})\s*au\s*(\d{2}/\d{2}/\d{4})", cleaned) if match: return f"{match.group(1)} au {match.group(2)}" return cleaned return cleaned def clean_dp_libelle(text): """Nettoie le libellé du DP en supprimant le bruit des zones adjacentes""" if not text: return "" parts = text.split("|") best = "" for part in parts: cleaned = part.strip(" :-.|/") # Ignorer les parties courtes, le bruit, et les headers capturés if len(cleaned) > 3 and re.search(r"[A-Z]{3,}", cleaned): # Exclure les headers de tableau capturés par erreur if re.search(r"(?i)codage|etablissement|recodage", cleaned): continue if len(cleaned) > len(best): best = cleaned return best if best else text.strip(" :-.|/") def parse_ghm_ghs_line(text): """Parse la ligne GHM/GHS qui contient les 4 valeurs""" result = { "ghm_etab": "", "ghs_etab": "", "ghm_reco": "", "ghs_reco": "", } ghm_etab = re.search(r"GHM\s*[ée]tablissement\s*:?\s*(\w+)", text, re.IGNORECASE) ghs_etab = re.search(r"GHS\s*[ée]tablissement\s*:?\s*(\d+)", text, re.IGNORECASE) ghm_reco = re.search(r"GHM\s*apr[eè]s\s*recodage\s*:?\s*(\w+)", text, re.IGNORECASE) ghs_reco = re.search(r"GHS\s*apr[eè]s\s*recodage\s*:?\s*(\d+)", text, re.IGNORECASE) if ghm_etab: result["ghm_etab"] = ghm_etab.group(1) if ghs_etab: result["ghs_etab"] = ghs_etab.group(1) if ghm_reco: result["ghm_reco"] = ghm_reco.group(1) if ghs_reco: result["ghs_reco"] = ghs_reco.group(1) return result def parse_ghs_concertation_line(text): """Parse la ligne GHS de la page concertation 2/2""" result = { "ghs_initial": "", "ghs_avant_concertation": "", "ghs_final": "", } m_init = re.search(r"GHS\s*initial\s*:?\s*(\d+)", text, re.IGNORECASE) m_avant = re.search(r"GHS\s*avant\s*concertation\s*:?\s*(\d+)", text, re.IGNORECASE) m_final = re.search(r"GHS\s*final\s*(?:apr[eè]s\s*concertation)?\s*:?\s*(\d+)", text, re.IGNORECASE) if m_init: result["ghs_initial"] = m_init.group(1) if m_avant: result["ghs_avant_concertation"] = m_avant.group(1) if m_final: result["ghs_final"] = m_final.group(1) return result def parse_das_bloc(text): """Parse un bloc DAS en liste de (code, position, libellé)""" entries = [] parts = text.split("|") i = 0 while i < len(parts): part = parts[i].strip() match = re.match(r"([A-Z]\d{2,4}\.?\d*)\s*(\*?)", part) if match: code = match.group(1) + (" *" if match.group(2) else "") position = "" libelle = "" if i + 1 < len(parts): pos_match = re.match(r"\s*(\d+)\s*$", parts[i + 1].strip()) if pos_match: position = pos_match.group(1) i += 1 if i + 1 < len(parts): next_part = parts[i + 1].strip() if not re.match(r"[A-Z]\d{2,4}", next_part): libelle = next_part i += 1 entries.append({"code": code.strip(), "position": position, "libelle": libelle}) i += 1 return entries def parse_actes_bloc(text): """Parse un bloc d'actes CCAM""" entries = [] parts = text.split("|") i = 0 while i < len(parts): part = parts[i].strip() match = re.match(r"([A-Z]{4}\d{3})", part) if match: code = match.group(1) position = "" libelle = "" if i + 1 < len(parts): pos_match = re.match(r"\s*(\d+)\s*$", parts[i + 1].strip()) if pos_match: position = pos_match.group(1) i += 1 if i + 1 < len(parts): next_part = parts[i + 1].strip() if not re.match(r"[A-Z]{4}\d{3}", next_part): libelle = next_part i += 1 entries.append({"code": code, "position": position, "libelle": libelle}) i += 1 return entries # ============================================================ # Détection de checkboxes par analyse de densité de pixels # ============================================================ def detect_checkbox_state(image, zone): """Détecte si une checkbox est cochée en analysant la densité de pixels sombres""" crop = crop_zone(image, zone) gray = np.array(crop.convert("L")) # Pixels sombres = < 128 (sur 0-255) dark_ratio = np.mean(gray < 128) # Une checkbox cochée (☒) a ~20-40% de pixels sombres # Une checkbox vide (☐) a ~5-15% de pixels sombres (juste le contour) return dark_ratio def detect_accord_checkbox(image, debug_dir=None): """Détecte Accord/Désaccord via analyse visuelle des checkboxes""" ratio_accord = detect_checkbox_state(image, ZONE_CHECKBOX_ACCORD) ratio_desaccord = detect_checkbox_state(image, ZONE_CHECKBOX_DESACCORD) if debug_dir: crop_a = crop_zone(image, ZONE_CHECKBOX_ACCORD) crop_d = crop_zone(image, ZONE_CHECKBOX_DESACCORD) crop_a.save(os.path.join(debug_dir, "checkbox_accord.png")) crop_d.save(os.path.join(debug_dir, "checkbox_desaccord.png")) # La checkbox cochée a significativement plus de pixels sombres diff = ratio_accord - ratio_desaccord if abs(diff) < 0.03: return "ambigu" elif diff > 0: return "accord" else: return "désaccord" def detect_accord_concertation(text): """Détecte la décision de concertation depuis le texte OCR""" text_lower = text.lower() if re.search(r"maintien.*(?:avis|initial).*(?:médecin|controleur)", text_lower): return "maintien_avis_controleur" if "retour groupage" in text_lower: return "retour_groupage_dim" if "autre groupage" in text_lower: return "autre_groupage" return text # ============================================================ # Correction VLM (Ollama gemma3) # ============================================================ def image_to_base64(image, quality=85): """Convertit une image PIL en base64 JPEG""" buf = io.BytesIO() image.save(buf, format="JPEG", quality=quality) return base64.b64encode(buf.getvalue()).decode() PROMPT_CORRECTION_RECUEIL = """Tu es un expert en codage PMSI/T2A. Voici l'extraction OCR brute d'une fiche de recueil OGC. L'OCR a fait des erreurs. Corrige les erreurs en comparant avec l'image de la fiche. Extraction OCR brute : {doctr_json} RÈGLES DE CORRECTION : 1. Corrige les textes garblés (noms, libellés) en lisant l'image 2. Remplis les champs vides si tu peux les lire sur l'image 3. NE MODIFIE PAS les codes GHM/GHS s'ils ont le bon format (2 chiffres + lettre + 3 chiffres, ex: 06C043). NE MODIFIE PAS les codes GHS numériques s'ils sont déjà remplis. 4. Les codes CIM-10 : lettre majuscule + 2-4 chiffres (ex: K650, T814, E8758). Corrige si le format est invalide. 5. Pour accord_desaccord : regarde les cases cochées ☒/☐ sur l'image. Réponds "accord" ou "désaccord". 6. Supprime tout bruit OCR (tirets parasites, caractères aléatoires, séparateurs |) 7. Pour les champs numériques (age, sexe, duree, mode_entree, provenance, mode_sortie, destination) : ne garde que le nombre, sans texte parasite 8. Le praticien_conseil est un nom de médecin (Dr/DR + nom) Réponds UNIQUEMENT avec le JSON corrigé, même structure, sans commentaire ni markdown.""" PROMPT_CORRECTION_CONCERTATION = """Tu es un expert en codage PMSI/T2A. Voici l'extraction OCR brute d'une page de concertation OGC. Corrige les erreurs en comparant avec l'image. Extraction OCR brute : {doctr_json} RÈGLES : 1. Les GHS sont des nombres à 4 chiffres. Corrige s'ils sont tronqués ou bruités. 2. Pour la décision : "maintien_avis_controleur", "retour_groupage_dim" ou "autre_groupage" selon la case cochée 3. Extrais la date de concertation si visible (format dd/mm/yyyy) 4. Pour l'argumentaire : corrige les erreurs OCR évidentes mais garde le texte complet 5. Supprime tout bruit OCR Réponds UNIQUEMENT avec le JSON corrigé, même structure, sans commentaire ni markdown.""" def vlm_correct(image, doctr_parsed, prompt_template, page_type="recueil"): """Envoie l'image + JSON docTR au VLM pour correction""" b64 = image_to_base64(image) prompt = prompt_template.format(doctr_json=json.dumps(doctr_parsed, ensure_ascii=False, indent=2)) try: resp = requests.post(OLLAMA_URL, json={ "model": VLM_MODEL, "prompt": prompt, "images": [b64], "stream": False, "options": {"temperature": 0.1, "num_predict": 3000} }, timeout=120) text = resp.json().get("response", "") # Extraire le JSON de la réponse (peut être entouré de ```) text = re.sub(r"^```(?:json)?\s*", "", text.strip()) text = re.sub(r"\s*```$", "", text.strip()) corrected = json.loads(text) return corrected except Exception as e: print(f" ⚠ VLM correction échouée ({e}), utilisation des données docTR") return doctr_parsed def merge_recueil(doctr_parsed, vlm_corrected): """Fusionne les résultats docTR et VLM, en préférant chacun pour ses forces""" merged = json.loads(json.dumps(vlm_corrected)) # deep copy # Préférer docTR pour les codes structurés quand ils sont valides code_fields_to_protect = [ ("ghm_etab", r"^\d{2}[A-Z]\d{3}$"), ("ghm_reco", r"^\d{2}[A-Z]\d{3}$"), ("ghs_etab", r"^\d{3,5}$"), ("ghs_reco", r"^\d{3,5}$"), ] for field, pattern in code_fields_to_protect: doctr_val = doctr_parsed.get(field, "") vlm_val = merged.get(field, "") # Si docTR a une valeur valide et VLM l'a changée, garder docTR if doctr_val and re.match(pattern, doctr_val): merged[field] = doctr_val # Protéger les codes CIM valides de docTR dans codage_etab/reco for section in ["codage_etab", "codage_reco"]: if section in doctr_parsed and section in merged: for code_field in ["dp", "dr"]: doctr_code = doctr_parsed[section].get(code_field, "") if doctr_code and re.match(r"^[A-Z]\d{2,4}\.?\d*", doctr_code): # Garder docTR si valide, sauf si VLM a aussi un code valide vlm_code = merged[section].get(code_field, "") if not vlm_code or not re.match(r"^[A-Z]\d{2,4}\.?\d*", vlm_code): merged[section][code_field] = doctr_code return merged # ============================================================ # Fonctions OCR # ============================================================ def pdf_to_images(pdf_path, dpi=300): """Convertit un PDF en liste d'images PIL""" doc = fitz.open(pdf_path) images = [] mat = fitz.Matrix(dpi / 72, dpi / 72) for page in doc: pix = page.get_pixmap(matrix=mat) img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) images.append(img) doc.close() return images def crop_zone(image, zone): """Extrait une zone d'une image PIL (coordonnées relatives 0-1)""" w, h = image.size x1, y1, x2, y2 = zone return image.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h))) def ocr_image(model, image): """OCR sur une image PIL, retourne les lignes avec confiance""" crop_np = np.array(image) result = model([crop_np]) lines = [] for page in result.pages: for block in page.blocks: for line in block.lines: text = " ".join(w.value for w in line.words) conf = sum(w.confidence for w in line.words) / len(line.words) if line.words else 0 lines.append({"text": text, "confidence": conf}) return lines def ocr_zone(model, image, zone, debug_dir=None, zone_name=None): """OCR sur une zone spécifique""" crop = crop_zone(image, zone) if debug_dir and zone_name: crop.save(os.path.join(debug_dir, f"zone_{zone_name}.png")) return ocr_image(model, crop) def extract_text(lines): """Concatène les lignes en texte simple""" return " | ".join(l["text"] for l in lines if l["text"].strip()) def detect_page_type(model, image): """Détecte le type de page en cherchant des marqueurs textuels""" header_zone = (0.020, 0.005, 0.950, 0.085) lines = ocr_zone(model, image, header_zone) header_text = extract_text(lines).upper() if "RECUEIL DU PRATICIEN" in header_text: return "recueil" elif "CONCERTATION 2/2" in header_text: return "concertation_2" elif "CONCERTATION 1/2" in header_text: return "concertation_1" elif "HOSPITALISATION" in header_text: return "commentaires" elif "PREUVE" in header_text or "ELEMENT" in header_text: return "elements_preuve" elif "CONCERTATION" in header_text: return "concertation_medicale" else: return "inconnu" # ============================================================ # Extraction par type de page # ============================================================ def extract_page_recueil(model, image, debug_dir=None): """Extrait les données de la fiche de recueil (page 1)""" raw = {} for zone_name, zone_coords in ZONES_PAGE1.items(): lines = ocr_zone(model, image, zone_coords, debug_dir, zone_name) raw[zone_name] = extract_text(lines) # Post-traitement structuré ghm_ghs = parse_ghm_ghs_line(raw.get("ghm_ghs_ligne", "")) das_etab = parse_das_bloc(raw.get("das_bloc_etab", "")) das_reco = parse_das_bloc(raw.get("das_bloc_reco", "")) actes_etab = parse_actes_bloc(raw.get("actes_bloc_etab", "")) actes_reco = parse_actes_bloc(raw.get("actes_bloc_reco", "")) # Détection accord/désaccord par analyse visuelle des checkboxes accord = detect_accord_checkbox(image, debug_dir) return { "raw": raw, "parsed": { "etablissement": clean_value(raw["etablissement"]), "finess": clean_value(raw["finess"]).split("|")[0].strip(), "date_debut_controle": clean_value(raw["date_debut_controle"], "date"), "n_ogc": clean_value(raw["n_ogc"], "number"), "n_champ": clean_value(raw["n_champ"], "number"), "dates_sejour": clean_value(raw["dates_sejour"], "dates_range"), "sejour_etab": { "age": clean_value(raw["age_etab"], "number"), "sexe": clean_value(raw["sexe_etab"], "number"), "duree_sejour": clean_value(raw["duree_sejour_etab"], "number"), "mode_entree": clean_value(raw["mode_entree_etab"], "number"), "provenance": clean_value(raw["provenance_etab"], "number"), "mode_sortie": clean_value(raw["mode_sortie_etab"], "number"), "destination": clean_value(raw["destination_etab"], "number"), }, "sejour_reco": { "age": clean_value(raw["age_reco"], "number"), "sexe": clean_value(raw["sexe_reco"], "number"), "duree_sejour": clean_value(raw["duree_sejour_reco"], "number"), "mode_entree": clean_value(raw["mode_entree_reco"], "number"), "provenance": clean_value(raw["provenance_reco"], "number"), "mode_sortie": clean_value(raw["mode_sortie_reco"], "number"), "destination": clean_value(raw["destination_reco"], "number"), }, "rum_etab": { "um": clean_value(raw["um_etab"]), "igs": clean_value(raw["igs_etab"], "number"), "duree": clean_value(raw["duree_rum_etab"], "number"), "dates": clean_value(raw["dates_rum_etab"]), }, "codage_etab": { "dp": clean_value(raw["dp_code_etab"], "code_cim"), "dp_libelle": clean_dp_libelle(raw["dp_libelle"]), "dr": clean_value(raw["dr_code_etab"], "code_cim"), "das": das_etab, }, "codage_reco": { "dp": clean_value(raw["dp_code_reco"], "code_cim"), "dr": clean_value(raw["dr_code_reco"], "code_cim"), "das": das_reco, }, "actes_etab": actes_etab, "actes_reco": actes_reco, "ghm_etab": ghm_ghs["ghm_etab"], "ghs_etab": ghm_ghs["ghs_etab"], "ghm_reco": ghm_ghs["ghm_reco"], "ghs_reco": ghm_ghs["ghs_reco"], "recodage_impactant": clean_value(raw["recodage_impactant"], "number"), "ghs_injustifie": clean_value(raw["ghs_injustifie"], "number"), "accord_desaccord": accord, "praticien_conseil": clean_value(raw["praticien_conseil"]), }, } def extract_page_concertation_1(model, image, debug_dir=None): """Extrait l'argumentaire du médecin contrôleur""" raw = {} for zone_name, zone_coords in ZONES_CONCERTATION_1.items(): lines = ocr_zone(model, image, zone_coords, debug_dir, f"conc1_{zone_name}") raw[zone_name] = extract_text(lines) return { "raw": raw, "parsed": { "date_concertation": clean_value(raw.get("date_concertation_arg", ""), "date"), "argumentaire": raw.get("argumentaire", ""), }, } def extract_page_concertation_2(model, image, debug_dir=None): """Extrait la décision finale après concertation""" raw = {} for zone_name, zone_coords in ZONES_CONCERTATION_2.items(): lines = ocr_zone(model, image, zone_coords, debug_dir, f"conc2_{zone_name}") raw[zone_name] = extract_text(lines) # Parser la ligne GHS depuis la zone corrigée ghs_data = parse_ghs_concertation_line(raw.get("ghs_ligne", "")) # Détecter la décision de concertation accord_text = raw.get("accord_concertation", "") decision = detect_accord_concertation(accord_text) return { "raw": raw, "parsed": { "ghs_initial": ghs_data["ghs_initial"], "ghs_avant_concertation": ghs_data["ghs_avant_concertation"], "ghs_final": ghs_data["ghs_final"], "decision": decision, "accord_concertation_raw": accord_text, "date_concertation": clean_value(raw.get("date_concertation", ""), "date"), }, } # ============================================================ # Extraction complète d'un dossier OGC # ============================================================ def extract_ogc(pdf_path, model, debug=False, use_vlm=True): """Extraction complète d'un dossier OGC""" basename = os.path.splitext(os.path.basename(pdf_path))[0] debug_dir = None if debug: debug_dir = os.path.join(os.path.dirname(pdf_path), "..", "debug_zones", basename) os.makedirs(debug_dir, exist_ok=True) print(f"\n{'='*60}") print(f" Extraction: {basename}") print(f"{'='*60}") images = pdf_to_images(pdf_path) # Auto-détection des types de pages print(f" Détection des pages...") page_types = {} for i, img in enumerate(images): ptype = detect_page_type(model, img) page_types[i] = ptype print(f" Page {i+1}: {ptype}") result = {"fichier": basename, "recueil": None, "concertation_1": None, "concertation_2": None} for page_idx, ptype in page_types.items(): if ptype == "recueil": print(f" Extraction page {page_idx+1} (recueil)...") recueil = extract_page_recueil(model, images[page_idx], debug_dir) if use_vlm: print(f" Correction VLM (recueil)...") vlm_corrected = vlm_correct( images[page_idx], recueil["parsed"], PROMPT_CORRECTION_RECUEIL, "recueil" ) recueil["parsed"] = merge_recueil(recueil["parsed"], vlm_corrected) result["recueil"] = recueil elif ptype == "concertation_1": print(f" Extraction page {page_idx+1} (argumentaire)...") conc1 = extract_page_concertation_1(model, images[page_idx], debug_dir) if use_vlm: print(f" Correction VLM (argumentaire)...") vlm_corrected = vlm_correct( images[page_idx], conc1["parsed"], PROMPT_CORRECTION_CONCERTATION, "concertation_1" ) conc1["parsed"] = vlm_corrected result["concertation_1"] = conc1 elif ptype == "concertation_2": print(f" Extraction page {page_idx+1} (décision finale)...") conc2 = extract_page_concertation_2(model, images[page_idx], debug_dir) if use_vlm: print(f" Correction VLM (décision finale)...") vlm_corrected = vlm_correct( images[page_idx], conc2["parsed"], PROMPT_CORRECTION_CONCERTATION, "concertation_2" ) conc2["parsed"] = vlm_corrected result["concertation_2"] = conc2 return result def print_results(result): """Affiche les résultats de manière lisible""" print(f"\n{'─'*60}") print(f" RÉSULTATS: {result['fichier']}") print(f"{'─'*60}") rec = result.get("recueil", {}) if rec: p = rec["parsed"] print(f"\n [EN-TÊTE]") print(f" Établissement : {p['etablissement']}") print(f" FINESS : {p['finess']}") print(f" Date contrôle : {p['date_debut_controle']}") print(f" N° OGC : {p['n_ogc']}") print(f" N° Champ : {p['n_champ']}") print(f" Dates séjour : {p['dates_sejour']}") se = p["sejour_etab"] print(f"\n [SÉJOUR ÉTABLISSEMENT]") print(f" Âge: {se['age']} Sexe: {se['sexe']} Durée: {se['duree_sejour']}") print(f" Mode entrée: {se['mode_entree']} Prov: {se['provenance']} Mode sortie: {se['mode_sortie']} Dest: {se['destination']}") sr = p["sejour_reco"] print(f"\n [SÉJOUR RECODAGE]") print(f" Âge: {sr['age']} Sexe: {sr['sexe']} Durée: {sr['duree_sejour']}") print(f" Mode entrée: {sr['mode_entree']} Prov: {sr['provenance']} Mode sortie: {sr['mode_sortie']} Dest: {sr['destination']}") rum = p["rum_etab"] print(f"\n [RUM]") print(f" UM: {rum['um']} IGS: {rum['igs']} Durée: {rum['duree']} Dates: {rum['dates']}") ce = p.get("codage_etab", {}) print(f"\n [CODAGE ÉTABLISSEMENT]") print(f" DP: {ce.get('dp','')} ({ce.get('dp_libelle','')})") print(f" DR: {ce.get('dr','')}") for das in ce.get("das", []): print(f" DAS: {das.get('code','')} pos={das.get('position','')} {das.get('libelle','')}") cr = p.get("codage_reco", {}) print(f"\n [RECODAGE]") print(f" DP: {cr.get('dp','')}") print(f" DR: {cr['dr']}") for das in cr.get("das", []): print(f" DAS: {das.get('code','')} pos={das.get('position','')} {das.get('libelle','')}") if p.get("actes_etab"): print(f"\n [ACTES ÉTABLISSEMENT]") for a in p["actes_etab"]: if isinstance(a, dict): print(f" {a.get('code','')} pos={a.get('position','')} {a.get('libelle','')}") else: print(f" {a}") if p.get("actes_reco"): print(f"\n [ACTES RECODAGE]") for a in p["actes_reco"]: if isinstance(a, dict): print(f" {a.get('code','')} pos={a.get('position','')} {a.get('libelle','')}") else: print(f" {a}") print(f"\n [GHM/GHS]") print(f" Établissement : GHM={p['ghm_etab']} GHS={p['ghs_etab']}") print(f" Recodage : GHM={p['ghm_reco']} GHS={p['ghs_reco']}") print(f"\n [DÉCISIONS]") print(f" Recodage impactant : {p['recodage_impactant']}") print(f" GHS injustifié : {p['ghs_injustifie']}") print(f" Accord/Désaccord : {p['accord_desaccord']}") print(f" Praticien : {p['praticien_conseil']}") c2 = result.get("concertation_2", {}) if c2: p2 = c2["parsed"] print(f"\n [CONCERTATION — Décision finale]") print(f" GHS initial : {p2['ghs_initial']}") print(f" GHS avant concertation : {p2['ghs_avant_concertation']}") print(f" GHS final : {p2['ghs_final']}") print(f" Décision : {p2['decision']}") print(f" Date : {p2['date_concertation']}") c1 = result.get("concertation_1", {}) if c1: p1 = c1["parsed"] arg = p1.get("argumentaire", "") print(f"\n [ARGUMENTAIRE]") print(f" Date : {p1['date_concertation']}") if arg: for line in arg.split("|")[:5]: print(f" {line.strip()}") # ============================================================ # Export Excel consolidé # ============================================================ def export_excel(all_results, output_path): """Exporte tous les résultats en un fichier Excel consolidé""" try: import openpyxl from openpyxl.styles import Font, Alignment, PatternFill, Border, Side except ImportError: print(" ERREUR: openpyxl requis pour l'export Excel. pip install openpyxl") return wb = openpyxl.Workbook() ws = wb.active ws.title = "OGC Résultats" # Styles header_font = Font(bold=True, size=10, color="FFFFFF") header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid") header_align = Alignment(horizontal="center", vertical="center", wrap_text=True) cell_align = Alignment(vertical="center", wrap_text=True) thin_border = Border( left=Side(style="thin"), right=Side(style="thin"), top=Side(style="thin"), bottom=Side(style="thin"), ) # En-têtes headers = [ "N° OGC", "N° Champ", "Établissement", "FINESS", "Date contrôle", "Dates séjour", # Séjour étab "Âge étab", "Sexe étab", "Durée séjour étab", "Mode entrée étab", "Provenance étab", "Mode sortie étab", "Destination étab", # Séjour reco "Âge reco", "Sexe reco", "Durée séjour reco", "Mode entrée reco", "Provenance reco", "Mode sortie reco", "Destination reco", # RUM "UM étab", "IGS étab", "Durée RUM étab", "Dates RUM", # Codage "DP étab", "DP libellé", "DR étab", "DAS étab", "DP reco", "DR reco", "DAS reco", # Actes "Actes étab", "Actes reco", # GHM/GHS "GHM étab", "GHS étab", "GHM reco", "GHS reco", # Décisions "Recodage impactant", "GHS injustifié", "Accord/Désaccord", "Praticien", # Concertation "GHS initial conc.", "GHS avant conc.", "GHS final conc.", "Décision conc.", "Date concertation", "Argumentaire (extrait)", ] for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = header_align cell.border = thin_border # Données for row_idx, result in enumerate(all_results, 2): rec = result.get("recueil", {}) p = rec.get("parsed", {}) if rec else {} se = p.get("sejour_etab", {}) sr = p.get("sejour_reco", {}) rum = p.get("rum_etab", {}) ce = p.get("codage_etab", {}) cr = p.get("codage_reco", {}) # Formater DAS/actes comme texte (robuste aux formats VLM variables) def fmt_entry(d, with_libelle=False): if isinstance(d, str): return d code = d.get("code", "") pos = d.get("position", "") lib = d.get("libelle", "") if with_libelle else "" s = code if pos: s += f" (pos {pos})" if lib: s += f" {lib}" return s das_etab_str = "; ".join(fmt_entry(d, True) for d in ce.get("das", [])) das_reco_str = "; ".join(fmt_entry(d) for d in cr.get("das", [])) actes_etab_str = "; ".join(fmt_entry(a) for a in p.get("actes_etab", [])) actes_reco_str = "; ".join(fmt_entry(a) for a in p.get("actes_reco", [])) # Concertation c2 = result.get("concertation_2", {}) p2 = c2.get("parsed", {}) if c2 else {} c1 = result.get("concertation_1", {}) p1 = c1.get("parsed", {}) if c1 else {} arg = p1.get("argumentaire", "") # Tronquer l'argumentaire pour Excel arg_short = arg.replace(" | ", "\n")[:500] if arg else "" values = [ p.get("n_ogc", ""), p.get("n_champ", ""), p.get("etablissement", ""), p.get("finess", ""), p.get("date_debut_controle", ""), p.get("dates_sejour", ""), se.get("age", ""), se.get("sexe", ""), se.get("duree_sejour", ""), se.get("mode_entree", ""), se.get("provenance", ""), se.get("mode_sortie", ""), se.get("destination", ""), sr.get("age", ""), sr.get("sexe", ""), sr.get("duree_sejour", ""), sr.get("mode_entree", ""), sr.get("provenance", ""), sr.get("mode_sortie", ""), sr.get("destination", ""), rum.get("um", ""), rum.get("igs", ""), rum.get("duree", ""), rum.get("dates", ""), ce.get("dp", ""), ce.get("dp_libelle", ""), ce.get("dr", ""), das_etab_str, cr.get("dp", ""), cr.get("dr", ""), das_reco_str, actes_etab_str, actes_reco_str, p.get("ghm_etab", ""), p.get("ghs_etab", ""), p.get("ghm_reco", ""), p.get("ghs_reco", ""), p.get("recodage_impactant", ""), p.get("ghs_injustifie", ""), p.get("accord_desaccord", ""), p.get("praticien_conseil", ""), p2.get("ghs_initial", ""), p2.get("ghs_avant_concertation", ""), p2.get("ghs_final", ""), p2.get("decision", ""), p2.get("date_concertation", ""), arg_short, ] for col, value in enumerate(values, 1): cell = ws.cell(row=row_idx, column=col, value=value) cell.alignment = cell_align cell.border = thin_border # Largeurs de colonnes automatiques for col in range(1, len(headers) + 1): max_len = len(str(ws.cell(row=1, column=col).value)) for row in range(2, ws.max_row + 1): val = ws.cell(row=row, column=col).value if val: max_len = max(max_len, min(len(str(val)), 40)) ws.column_dimensions[openpyxl.utils.get_column_letter(col)].width = max_len + 2 # Figer la première ligne ws.freeze_panes = "A2" wb.save(output_path) print(f" Excel exporté: {output_path}") # ============================================================ # Main — traitement de tous les PDFs # ============================================================ if __name__ == "__main__": pdf_dir = "/home/dom/ai/Aivanov_scan_ogc/2018 CARC" output_dir = "/home/dom/ai/Aivanov_scan_ogc/output" os.makedirs(output_dir, exist_ok=True) print("Chargement du modèle docTR...") model = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True) # Tous les PDFs du répertoire pdf_files = sorted(glob.glob(os.path.join(pdf_dir, "OGC *.pdf"))) print(f" {len(pdf_files)} fichiers PDF trouvés") all_results = [] for pdf_path in pdf_files: try: result = extract_ogc(pdf_path, model, debug=True) print_results(result) all_results.append(result) # JSON par dossier json_path = os.path.join(output_dir, f"{result['fichier']}.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) except Exception as e: print(f" ERREUR sur {os.path.basename(pdf_path)}: {e}") import traceback traceback.print_exc() # Export Excel consolidé if all_results: excel_path = os.path.join(output_dir, "OGC_2018_CARC_resultats.xlsx") export_excel(all_results, excel_path) print(f"\n{'='*60}") print(f" {len(all_results)} dossiers traités sur {len(pdf_files)}") print(f" JSON individuels: {output_dir}/") print(f" Excel consolidé: {output_dir}/OGC_2018_CARC_resultats.xlsx") print(f"{'='*60}")