""" Extraction OGC → Excel Modèle : qwen3-vl:235b-cloud (vision multimodal) via Ollama """ import base64 import io import json import re import sys import time from datetime import datetime from pathlib import Path import pandas as pd import requests from pdf2image import convert_from_path from PIL import Image from reportlab.lib import colors from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import ( SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable ) # ─── Config ─────────────────────────────────────────────────────────────────── SCAN_DIR = Path(__file__).parent / "scanOgc" OUTPUT_DIR = Path(__file__).parent / "output" OUTPUT_DIR.mkdir(exist_ok=True) OLLAMA_URL = "http://localhost:11434/api/generate" MODEL = "qwen3-vl:235b-cloud" PDF_DPI = 250 # Rate-limit : pause entre chaque appel et retry sur 429 INTER_REQUEST_DELAY = 2 # secondes RETRY_MAX = 6 RETRY_DELAY_429 = 60 # secondes — plafond à 120s dans ask_vision # ─── Utilitaires image ──────────────────────────────────────────────────────── def image_to_b64(img: Image.Image) -> str: buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) return base64.b64encode(buf.getvalue()).decode() # ─── Appel Ollama ───────────────────────────────────────────────────────────── def ask_vision(prompt: str, img: Image.Image, timeout: int = 240, num_predict: int = 8192, timing_record: dict = None) -> str: """ Envoie une image + prompt à Ollama en mode streaming. - qwen3-vl utilise ~4000 tokens de "thinking" avant la réponse : num_predict=8192 est nécessaire pour avoir assez de budget. - Retry automatique sur 429 (rate limit cloud). - timing_record : dict optionnel pour enregistrer retries/blocages. """ payload = { "model": MODEL, "prompt": prompt, "images": [image_to_b64(img)], "stream": True, "options": {"temperature": 0, "num_predict": num_predict}, } for attempt in range(1, RETRY_MAX + 1): try: resp = requests.post(OLLAMA_URL, json=payload, timeout=timeout, stream=True) if resp.status_code == 429: wait = min(RETRY_DELAY_429 * attempt, 120) print(f" ⏳ Rate limit — attente {wait}s " f"(tentative {attempt}/{RETRY_MAX})...") if timing_record is not None: timing_record.setdefault("blocages_429", []).append({ "tentative": attempt, "attente_s": wait, "ts": datetime.now().isoformat(), }) time.sleep(wait) continue resp.raise_for_status() tokens = [] for line in resp.iter_lines(): if not line: continue try: chunk = json.loads(line) except json.JSONDecodeError: continue if chunk.get("response"): tokens.append(chunk["response"]) if chunk.get("done"): break if timing_record is not None and attempt > 1: timing_record["retries_total"] = \ timing_record.get("retries_total", 0) + (attempt - 1) time.sleep(INTER_REQUEST_DELAY) return "".join(tokens) except requests.exceptions.HTTPError as e: if e.response is not None and e.response.status_code == 429: wait = min(RETRY_DELAY_429 * attempt, 120) print(f" ⏳ Rate limit — attente {wait}s " f"(tentative {attempt}/{RETRY_MAX})...") if timing_record is not None: timing_record.setdefault("blocages_429", []).append({ "tentative": attempt, "attente_s": wait, "ts": datetime.now().isoformat(), }) time.sleep(wait) continue raise raise RuntimeError(f"Echec après {RETRY_MAX} tentatives (rate limit persistant)") # ─── Extraction JSON depuis la réponse ─────────────────────────────────────── def _try_parse(text: str): for candidate in ( text, text.replace("\n", " ").replace("\r", " "), re.sub(r",\s*([}\]])", r"\1", text), # trailing commas re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text), # control chars ): try: return json.loads(candidate) except json.JSONDecodeError: pass return None def _extract_balanced(text: str, open_c: str, close_c: str): """Extrait la première structure équilibrée open_c…close_c du texte.""" start = text.find(open_c) if start == -1: return None depth = 0 in_str = False escape = False for i, ch in enumerate(text[start:], start): if escape: escape = False continue if ch == "\\" and in_str: escape = True continue if ch == '"' and not escape: in_str = not in_str continue if in_str: continue if ch == open_c: depth += 1 elif ch == close_c: depth -= 1 if depth == 0: return text[start:i+1] return None def extract_json(text: str): # 1. Bloc ```json … ``` m = re.search(r"```json\s*([\s\S]*?)```", text) if m: result = _try_parse(m.group(1).strip()) if result is not None: return result # 2. Extraction par accolades équilibrées (plus robuste que greedy regex) for open_c, close_c in (('{', '}'), ('[', ']')): candidate = _extract_balanced(text, open_c, close_c) if candidate: result = _try_parse(candidate) if result is not None: return result # 3. Fallback greedy regex (comportement original) for pattern in (r"(\{[\s\S]*\})", r"(\[[\s\S]*\])"): m = re.search(pattern, text) if m: result = _try_parse(m.group(1)) if result is not None: return result return None # ─── Prompts ────────────────────────────────────────────────────────────────── PROMPT_IDENTIFY = """\ Tu es un assistant d'analyse de documents médicaux français. Regarde cette image et identifie son type parmi : - FICHE_RECUEIL : "FICHE MEDICALE DE RECUEIL DU PRATICIEN CONSEIL" - FICHE_CONCERTATION_VIDE: "FICHE MEDICALE DE CONCERTATION" (page quasi vide) - SEJOUR_MANUSCRIT : "Séjour d'hospitalisation complète" (colonnes manuscrites) - ELEMENTS_PREUVE : "Eléments de preuve tracés au dossier du patient" - FICHE_ADMIN_2_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 2/2" - FICHE_ADMIN_1_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 1/2" - AUTRE : autre type Réponds UNIQUEMENT avec le code du type, sans aucune explication.\ """ PROMPT_FICHE_RECUEIL = """\ Tu es un assistant d'extraction de données médicales. Extrait toutes les informations imprimées de cette fiche médicale de recueil du praticien conseil. RÈGLES STRICTES : - Si un champ n'a pas de valeur clairement visible et imprimée, retourner une chaîne vide "". - Ne jamais deviner, inférer ou compléter un champ absent. - Le champ "provenance" est souvent vide : ne pas le remplir sauf si une valeur est explicitement imprimée. - Le champ "se_coche" correspond aux cases 1/2/3/4 : retourner "SE1", "SE2", "SE3" ou "SE4" si une case est explicitement cochée, sinon "". Ce champ est TRÈS SOUVENT vide — ne rien mettre par défaut. NE PAS confondre avec "accord_desaccord" qui est un champ séparé. - Le champ "accord_desaccord" est distinct de "se_coche" : il indique accord/désaccord du praticien conseil, pas les cases SE. - Le champ "dr_etab" (Diagnostic Relié) est distinct des DAS : ne mettre un code que s'il y a une ligne DR EXPLICITEMENT RENSEIGNÉE sur la fiche. Si la ligne DR est vide ou absente sur le document, retourner "" obligatoirement. NE JAMAIS copier le premier DAS dans DR — ce sont deux lignes séparées sur la grille. - Le tableau "Données du séjour" contient ces colonnes DANS CET ORDRE EXACT, de gauche à droite : Age(ans) | Age(jours) | Sexe | Délai dern. règles | Age gestation | Poids d'entrée | Durée de séjour | Mode d'entrée | Provenance | Mode de sortie | Destination | Nb séances | Nb RUM | Nb j EXH | Type EXB | Nb j EXB RÈGLE ABSOLUE : lire chaque valeur dans sa colonne uniquement. Si une colonne est vide, retourner "" pour ce champ. Ne jamais décaler les valeurs vers la gauche pour compenser une cellule vide. Exemple : si "Provenance" est vide, "Mode de sortie" reste dans "mode_sortie", pas dans "provenance". Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après. - IMPORTANT : extraire TOUTES les lignes non vides de das_etab, actes_etab, das_reco et actes_reco sans limite de nombre. Ne jamais tronquer ces listes. - Les actes (CCAM, codes à 7+ caractères commençant par des lettres ex: JDPE002, NJFA008) vont dans "actes_etab", pas dans "das_etab". Les diagnostics (CIM-10, codes courts ex: N320, R33) vont dans "das_etab". {"n_ogc":"","etablissement":"","finess":"","date_debut_controle":"","n_champ":"","libelle_champ":"","dossier_manquant":"","date_debut_sejour":"","date_fin_sejour":"", "sejour_etab":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""}, "sejour_reco":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""}, "rum_etab":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""}, "rum_reco":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""}, "dp_etab":{"code":"","libelle":""},"dr_etab":{"code":"","libelle":""}, "das_etab": [] ou [{"code":"","niveau":"","libelle":""}] ou plus, "actes_etab":[{"code":"","niveau":"","libelle":""}], "dp_reco":{"code":""},"dr_reco":{"code":""}, "das_reco":[{"code":"","niveau":""}],"actes_reco":[{"code":"","niveau":""}], "ghm_etab":"","ghs_etab":"","ghm_reco":"","ghs_reco":"", "recodage_impactant_facturation":"","ghs_injustifie":"", "se_coche":"","atu":"","ffm":"","fsd":"","accord_desaccord":"","nom_praticien_conseil":""}\ """ PROMPT_ELEMENTS_PREUVE = """\ Tu es un assistant d'extraction de données médicales. Extrait les informations de cette page "Eléments de preuve tracés au dossier du patient". Pour chaque ligne : "present"=oui/non, "photocopie"=nombre écrit, dates si présentes. Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après. {"date":"","medecin_controleur_signataire":"","medecin_dim_signataire":"", "elements":{"compte_rendu_acte":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "compte_rendu_operatoire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "compte_rendu_accouchement":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "compte_rendu_examen_complementaire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "compte_rendu_imagerie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "compte_rendu_anatomopathologie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "observations_medicales":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "dossier_transfusion":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "dossier_anesthesie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "administration_therapeutique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "compte_rendu_hospitalisation":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "lettre_sortie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "surveillance_dossier_infirmier":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "prise_en_charge_psychologue":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "prise_en_charge_kinesitherapeute":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "prise_en_charge_dietetique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}, "autre":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}}}\ """ PROMPT_FICHE_ADMIN_2_2 = """\ Tu es un assistant d'extraction de données médicales. Extrait les informations de cette fiche administrative de concertation 2/2. RÈGLES STRICTES : - Pour "maintien_avis_controleur", "retour_groupage_dim", "autre_groupage" : retourner "oui" si la case est cochée (X, ✓ ou toute marque), "non" si la case est décochée, "" si absent. - Pour les champs GHS (nombres) : retourner uniquement les chiffres sans point ni espace (ex: "6173" et non "6.173"). - Si un champ est absent ou illisible, retourner "". Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après. {"n_ogc":"","ghs_initial":"","ghs_avant_concertation":"","ghs_final_apres_concertation":"", "maintien_avis_controleur":"","retour_groupage_dim":"","autre_groupage":"", "avis_dim_final":"","date_concertation":"", "nom_medecin_responsable_controle":"","nom_medecin_dim":""}\ """ PROMPT_FICHE_ADMIN_1_2 = """\ Tu es un assistant d'extraction de données médicales. Extrait les informations de cette fiche administrative de concertation 1/2. L'argumentaire est un texte long imprimé (pas manuscrit). Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après. {"n_ogc":"","date_concertation":"","argumentaire_medecin_controleur":""}\ """ PROMPTS = { "FICHE_RECUEIL": PROMPT_FICHE_RECUEIL, "ELEMENTS_PREUVE": PROMPT_ELEMENTS_PREUVE, "FICHE_ADMIN_2_2": PROMPT_FICHE_ADMIN_2_2, "FICHE_ADMIN_1_2": PROMPT_FICHE_ADMIN_1_2, } SKIP_TYPES = {"SEJOUR_MANUSCRIT", "FICHE_CONCERTATION_VIDE", "AUTRE"} # ─── Traitement d'un PDF ────────────────────────────────────────────────────── # ─── Normalisation post-extraction ─────────────────────────────────────────── _CHECKBOX_OUI = {"x", "oui", "✓", "✗", "coché", "v", "yes"} def _norm_checkbox(val: str) -> str: """Convertit toute marque de case cochée en 'oui', conserve 'non', vide sinon.""" v = str(val).strip().lower() if v in _CHECKBOX_OUI: return "oui" if v == "non": return "non" return "" def _calc_duree_rum(debut: str, fin: str): """Calcule la durée en jours entre deux dates (DD/MM/YYYY ou YYYY-MM-DD). Retourne None si non parsable.""" for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y"): try: d1 = datetime.strptime(str(debut).strip(), fmt) d2 = datetime.strptime(str(fin).strip(), fmt) return (d2 - d1).days except (ValueError, AttributeError): pass return None def _strip_dot_number(val: str) -> str: """Supprime le point parasite dans un nombre (ex: '6.173' → '6173', '.0' → '0').""" v = str(val).strip() # Nombre entier avec un point au milieu (pas une vraie décimale) if re.match(r"^\d+\.\d+$", v): cleaned = v.replace(".", "") return cleaned # Point en début de chaîne if v.startswith(".") and v[1:].isdigit(): return v[1:] return v def _fix_year(val: str) -> str: """ Corrige les erreurs de lecture d'année manuscrite. Les dates OGC sont dans la plage 2015-2019. Si une année hors plage est détectée (ex: 2021, 2022), la remplace par l'année valide la plus proche. """ if not val: return val m = re.search(r"(20\d\d)", val) if m: year = m.group(1) valid_years = ("2015", "2016", "2017", "2018", "2019") if year not in valid_years: best = min(valid_years, key=lambda y: abs(int(y) - int(year))) val = val.replace(year, best) return val def _normalize_result(result: dict) -> None: """Normalise les données extraites en place (checkboxes, chiffres mal lus).""" for pt in result.get("pages_traitees", []): d = pt.get("data", {}) if not isinstance(d, dict): continue ptype = pt.get("type") if ptype == "FICHE_ADMIN_2_2": for field in ("maintien_avis_controleur", "retour_groupage_dim", "autre_groupage"): if field in d: d[field] = _norm_checkbox(d[field]) # GHS : supprimer points parasites for field in ("ghs_initial", "ghs_avant_concertation", "ghs_final_apres_concertation"): if d.get(field): d[field] = _strip_dot_number(d[field]) if ptype == "FICHE_RECUEIL": # Guard anti-confusion DR/DAS for dr_k, das_k in (("dr_etab", "das_etab"), ("dr_reco", "das_reco")): dr_code = (d.get(dr_k) or {}).get("code", "").strip() if not dr_code: continue das = [x for x in (d.get(das_k) or []) if isinstance(x, dict) and x.get("code")] das_codes = {x.get("code", "").strip() for x in das} if dr_code in das_codes: # Cas 1 : DR duplique un DAS existant → vider DR d[dr_k] = {"code": "", "libelle": ""} elif not das: # Cas 2 : DAS vide mais DR renseigné → confusion modèle, # déplacer le DR dans das comme premier DAS dr_entry = d.get(dr_k) or {} new_das_entry = {"code": dr_code, "rang": dr_entry.get("rang", "")} if dr_k == "dr_etab": new_das_entry["libelle"] = dr_entry.get("libelle", "") d[das_k] = [new_das_entry] d[dr_k] = {"code": "", "libelle": ""} # nature_suppl souvent lu '.0' au lieu de '0' for section in ("rum_etab", "rum_reco"): sec = d.get(section) or {} if sec.get("nature_suppl"): sec["nature_suppl"] = _strip_dot_number(sec["nature_suppl"]) # durée calculée à partir des dates (plus fiable que la valeur extraite) duree = _calc_duree_rum(sec.get("duree_rum_debut", ""), sec.get("duree_rum_fin", "")) if duree is not None: sec["duree_rum_calculee_j"] = duree # se_coche : normaliser "1"→"SE1", rejeter toute valeur non SE1-4 se_raw = str(d.get("se_coche", "")).strip() if se_raw.upper() in {"SE1", "SE2", "SE3", "SE4"}: # Format déjà correct d["se_coche"] = se_raw.upper() elif se_raw in {"1", "2", "3", "4"}: # Chiffre seul = ambigu, le modèle confond avec le rang d'un DAS → vider d["se_coche"] = "" elif se_raw: # Valeur inattendue (ex: "accord", "désaccord") → vider d["se_coche"] = "" if ptype in ("FICHE_ADMIN_2_2", "FICHE_ADMIN_1_2"): for date_field in ("date_concertation",): if d.get(date_field): d[date_field] = _fix_year(d[date_field]) if ptype == "ELEMENTS_PREUVE": if d.get("date"): d["date"] = _fix_year(d["date"]) # ─── Calcul d'audit de fiabilité ───────────────────────────────────────────── def compute_audit(result: dict) -> dict: """ Calcule un bloc _audit pour l'OGC. score_global ∈ [0,1] — seuil d'alerte : 0.80 alertes = champs dont le score < 0.80 """ checks: list[tuple[str, float]] = [] # (champ, score) for pt in result.get("pages_traitees", []): ptype = pt.get("type") d = pt.get("data", {}) page = pt.get("page", "?") if not isinstance(d, dict): continue # JSON non parsé → données non fiables if "raw_response" in d: checks.append((f"page_{page}_json", 0.10)) continue if ptype == "FICHE_RECUEIL": # n_ogc vide checks.append(("n_ogc", 1.0 if d.get("n_ogc") else 0.20)) # dr_etab non vide → historiquement souvent faux (confondu avec DAS) dr_code = (d.get("dr_etab") or {}).get("code", "") checks.append(("dr_etab", 0.31 if dr_code else 1.0)) # provenance non vide → souvent halluciné prov = str((d.get("sejour_etab") or {}).get("provenance", "")).strip() checks.append(("sejour_etab.provenance", 0.40 if prov else 1.0)) # se_coche non vide → souvent halluciné ; doit valoir SE1/SE2/SE3/SE4 ou "" se_val = str(d.get("se_coche", "")).strip().lower() if not se_val: checks.append(("se_coche", 1.0)) elif se_val in {"se1", "se2", "se3", "se4", "1", "2", "3", "4"}: checks.append(("se_coche", 0.90)) # valeur plausible mais vérifier format else: # confusion probable avec accord_desaccord ou autre valeur inattendue checks.append(("se_coche", 0.20)) # DAS vide alors que DP présent → probablement tronqué dp_code = (d.get("dp_etab") or {}).get("code", "") das = [x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")] if dp_code and not das: checks.append(("das_etab", 0.50)) else: checks.append(("das_etab", 1.0)) # Code DAS ressemble à un acte (≥7 chars, 4 premières lettres) acte_like = any( len(x.get("code", "")) >= 7 and x.get("code", "")[:4].isalpha() for x in das ) checks.append(("das_etab.codes", 0.35 if acte_like else 1.0)) elif ptype == "FICHE_ADMIN_2_2": # Au moins une case doit être cochée maintien = str(d.get("maintien_avis_controleur", "")).strip().lower() retour = str(d.get("retour_groupage_dim", "")).strip().lower() autre = str(d.get("autre_groupage", "")).strip().lower() aucun_coche = not any(v == "oui" for v in (maintien, retour, autre)) checks.append(("maintien_retour_autre", 0.50 if aucun_coche else 1.0)) # GHS final encore avec point → mal lu ghs = str(d.get("ghs_final_apres_concertation", "")).strip() checks.append(("ghs_final", 0.40 if ("." in ghs and ghs) else 1.0)) elif ptype == "ELEMENTS_PREUVE": # Ne flag que les lettres en début de chaîne seules (ex: "A", "B") # ou des séquences de 3+ lettres consécutives — pas "A2", "1.a3" qui sont valides suspect = any( bool(re.search(r"(? tuple[dict, dict]: """Retourne (result, timing) où timing contient toutes les métriques de temps.""" print(f"\n{'='*60}\nTraitement : {pdf_path.name}\n{'='*60}") pdf_start = time.time() timing = { "fichier": pdf_path.name, "debut": datetime.now().isoformat(), "fin": None, "duree_totale_s": None, "nb_pages_total": 0, "pages": [], "erreurs": [], "blocages_429": [], "retries_total": 0, } pages = convert_from_path(str(pdf_path), dpi=PDF_DPI) timing["nb_pages_total"] = len(pages) result = {"fichier": pdf_path.name, "pages_traitees": [], "pages_ignorees": []} for i, img in enumerate(pages, start=1): print(f"\n Page {i}/{len(pages)} — identification...") page_timing = { "page": i, "type": None, "duree_identification_s": None, "duree_extraction_s": None, "statut": None, "erreur": None, } t0 = time.time() try: raw_type = ask_vision(PROMPT_IDENTIFY, img, timeout=200, num_predict=512, timing_record=timing).strip().upper() except Exception as e: print(f" ⚠ Erreur identification : {e}") page_timing["duree_identification_s"] = round(time.time() - t0, 2) page_timing["statut"] = "erreur_identification" page_timing["erreur"] = str(e) timing["erreurs"].append({"page": i, "phase": "identification", "message": str(e)}) timing["pages"].append(page_timing) result["pages_ignorees"].append({"page": i, "type": "ERREUR_IDENTIFICATION"}) continue duree_id = round(time.time() - t0, 2) page_timing["duree_identification_s"] = duree_id page_type = "AUTRE" for known in PROMPTS.keys() | SKIP_TYPES: if known in raw_type: page_type = known break page_timing["type"] = page_type print(f" → Type : {page_type} ({duree_id:.1f}s)") if page_type in SKIP_TYPES: page_timing["statut"] = "ignoree" timing["pages"].append(page_timing) result["pages_ignorees"].append({"page": i, "type": page_type}) print(" → Ignorée.") continue print(" → Extraction en cours...") t0 = time.time() try: num_predict = 12000 if page_type == "FICHE_RECUEIL" else 8192 raw = ask_vision(PROMPTS[page_type], img, timeout=240, num_predict=num_predict, timing_record=timing) except Exception as e: print(f" ⚠ Erreur extraction : {e}") duree_ext = round(time.time() - t0, 2) page_timing["duree_extraction_s"] = duree_ext page_timing["statut"] = "erreur_extraction" page_timing["erreur"] = str(e) timing["erreurs"].append({"page": i, "phase": "extraction", "type": page_type, "message": str(e)}) timing["pages"].append(page_timing) result["pages_traitees"].append({"page": i, "type": page_type, "data": {"erreur": str(e)}}) continue duree_ext = round(time.time() - t0, 2) page_timing["duree_extraction_s"] = duree_ext print(f" → Réponse reçue ({duree_ext:.1f}s)") data = extract_json(raw) if data is None: print(f" ⚠ JSON non parsable — retry en cours...") retry_prompt = ( "Ta réponse précédente n'était pas un JSON valide. " "Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après, " "sans bloc ```json```. Voici le schéma attendu :\n\n" + PROMPTS[page_type] ) try: raw2 = ask_vision(retry_prompt, img, timeout=240, num_predict=12000, timing_record=timing) data = extract_json(raw2) except Exception as e: print(f" ⚠ Erreur retry : {e}") data = None if data is None: print(f" ⚠ Retry échoué — raw_response conservé") page_timing["statut"] = "json_non_parsable" timing["erreurs"].append({ "page": i, "phase": "parsing_json", "type": page_type, "message": f"JSON non parsable après retry : {raw[:100]}", "retry": True, }) data = {"raw_response": raw} else: print(f" ✓ Retry réussi") page_timing["statut"] = "ok_after_retry" timing["erreurs"].append({ "page": i, "phase": "parsing_json", "type": page_type, "message": "JSON non parsable au 1er appel, corrigé par retry", "retry": True, "retry_ok": True, }) else: page_timing["statut"] = "ok" timing["pages"].append(page_timing) result["pages_traitees"].append({"page": i, "type": page_type, "data": data}) print(" ✓ OK") timing["fin"] = datetime.now().isoformat() timing["duree_totale_s"] = round(time.time() - pdf_start, 2) _normalize_result(result) result["_audit"] = compute_audit(result) return result, timing # ─── Aplatissement pour Excel ───────────────────────────────────────────────── def flatten(result: dict) -> dict: row = {"fichier": result["fichier"]} general_done = False # champs généraux pris sur la 1re page FICHE_RECUEIL uniquement for pt in result["pages_traitees"]: d, ptype = pt["data"], pt["type"] if ptype == "FICHE_RECUEIL": # ── Champs généraux (par séjour, identiques sur chaque page RUM) ── if not general_done: for k in ["n_ogc","etablissement","finess","date_debut_controle","n_champ", "libelle_champ","dossier_manquant","date_debut_sejour","date_fin_sejour"]: row[k] = d.get(k, "") for prefix in ("sejour_etab","sejour_reco"): for k, v in (d.get(prefix) or {}).items(): row[f"{prefix}_{k}"] = v row["dp_etab_code"] = (d.get("dp_etab") or {}).get("code", "") row["dp_etab_libelle"] = (d.get("dp_etab") or {}).get("libelle", "") row["dr_etab_code"] = (d.get("dr_etab") or {}).get("code", "") row["dr_etab_libelle"] = (d.get("dr_etab") or {}).get("libelle", "") row["dp_reco_code"] = (d.get("dp_reco") or {}).get("code", "") row["dr_reco_code"] = (d.get("dr_reco") or {}).get("code", "") for k in ["ghm_etab","ghs_etab","ghm_reco","ghs_reco", "recodage_impactant_facturation","ghs_injustifie", "se_coche","atu","ffm","fsd","accord_desaccord","nom_praticien_conseil"]: row[k] = d.get(k, "") general_done = True # ── Comptages et durées agrégés sur tous les RUM ── row["nb_das_etab"] = row.get("nb_das_etab", 0) + len([x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")]) row["nb_actes_etab"] = row.get("nb_actes_etab", 0) + len([x for x in (d.get("actes_etab") or []) if isinstance(x, dict) and x.get("code")]) row["nb_das_reco"] = row.get("nb_das_reco", 0) + len([x for x in (d.get("das_reco") or []) if isinstance(x, dict) and x.get("code")]) row["nb_actes_reco"] = row.get("nb_actes_reco", 0) + len([x for x in (d.get("actes_reco") or []) if isinstance(x, dict) and x.get("code")]) for section, col in (("rum_etab", "duree_sejour_calc_etab_j"), ("rum_reco", "duree_sejour_calc_reco_j")): duree = (d.get(section) or {}).get("duree_rum_calculee_j") if duree is not None: row[col] = row.get(col, 0) + duree elif ptype == "ELEMENTS_PREUVE": row["ep_date"] = d.get("date", "") row["ep_medecin_controleur"] = d.get("medecin_controleur_signataire", "") row["ep_medecin_dim"] = d.get("medecin_dim_signataire", "") for doc, vals in (d.get("elements") or {}).items(): for col, val in (vals or {}).items(): row[f"ep_{doc}_{col}"] = val elif ptype == "FICHE_ADMIN_2_2": if not row.get("n_ogc"): row["n_ogc"] = d.get("n_ogc", "") for k in ["ghs_initial","ghs_avant_concertation","ghs_final_apres_concertation", "maintien_avis_controleur","retour_groupage_dim","autre_groupage", "avis_dim_final","date_concertation", "nom_medecin_responsable_controle","nom_medecin_dim"]: row[f"admin22_{k}"] = d.get(k, "") elif ptype == "FICHE_ADMIN_1_2": row["admin12_date_concertation"] = d.get("date_concertation", "") row["admin12_argumentaire"] = d.get("argumentaire_medecin_controleur", "") # ── RÈGLE MÉTIER GHS FINAL ───────────────────────────────────────────── # ghs_final_apres_concertation est manuscrit et souvent mal lu. # On le recalcule depuis les cases cochées (valeurs imprimées, plus fiables) : # - maintien_avis_controleur coché → ghs_final = ghs_initial # - retour_groupage_dim coché → ghs_final = ghs_avant_concertation # - autre_groupage coché → ghs_final = valeur manuscrite extraite (on garde) # Pour désactiver cette règle : supprimez le bloc entre les deux lignes de tirets. maintien = str(row.get("admin22_maintien_avis_controleur", "")).lower() retour = str(row.get("admin22_retour_groupage_dim", "")).lower() autre = str(row.get("admin22_autre_groupage", "")).lower() if maintien == "oui": row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_initial", "") elif retour == "oui": row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_avant_concertation", "") # si autre_groupage == "oui" : on garde la valeur extraite (manuscrite) # ── FIN RÈGLE MÉTIER GHS FINAL ──────────────────────────────────────── return row def build_rum(result: dict) -> list: """1 ligne par RUM par OGC — données spécifiques au RUM.""" rows = [] for pt in result["pages_traitees"]: if pt["type"] != "FICHE_RECUEIL": continue d = pt["data"] ogc = d.get("n_ogc", result["fichier"]) row = {"n_ogc": ogc} for prefix in ("rum_etab", "rum_reco"): for k, v in (d.get(prefix) or {}).items(): row[f"{prefix}_{k}"] = v rows.append(row) return rows def build_diagnostics(result: dict) -> list: rows = [] for pt in result["pages_traitees"]: if pt["type"] != "FICHE_RECUEIL": continue d = pt["data"] ogc = d.get("n_ogc", result["fichier"]) n_rum = (d.get("rum_etab") or {}).get("n_rum", "") for src, dp_k, dr_k, das_k in [ ("etablissement", "dp_etab", "dr_etab", "das_etab"), ("recodage", "dp_reco", "dr_reco", "das_reco"), ]: dp = d.get(dp_k) or {} if dp.get("code"): rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DP", "code": dp["code"], "niveau": "", "libelle": dp.get("libelle", "")}) dr = d.get(dr_k) or {} if dr.get("code"): rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DR", "code": dr["code"], "niveau": "", "libelle": dr.get("libelle", "")}) for das in (d.get(das_k) or []): if isinstance(das, dict) and das.get("code"): rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DAS", "code": das["code"], "niveau": das.get("niveau", ""), "libelle": das.get("libelle", "")}) return rows def build_actes(result: dict) -> list: rows = [] for pt in result["pages_traitees"]: if pt["type"] != "FICHE_RECUEIL": continue d = pt["data"] ogc = d.get("n_ogc", result["fichier"]) n_rum = (d.get("rum_etab") or {}).get("n_rum", "") for src, k in [("etablissement","actes_etab"), ("recodage","actes_reco")]: for a in (d.get(k) or []): if isinstance(a, dict) and a.get("code"): rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "code": a["code"], "niveau": a.get("niveau", ""), "libelle": a.get("libelle", "")}) return rows def build_elements_preuve(result: dict) -> list: rows = [] for pt in result["pages_traitees"]: if pt["type"] != "ELEMENTS_PREUVE": continue d = pt["data"] ogc = result["fichier"] for pt2 in result["pages_traitees"]: if pt2["type"] == "FICHE_RECUEIL": ogc = pt2["data"].get("n_ogc", ogc) break for doc, vals in (d.get("elements") or {}).items(): row = {"n_ogc": ogc, "document": doc} row.update(vals or {}) rows.append(row) return rows # ─── Export Excel ───────────────────────────────────────────────────────────── def export_excel(all_results: list, all_timings: list, path: Path): df_main = pd.DataFrame([flatten(r) for r in all_results]) rum = sum((build_rum(r) for r in all_results), []) diag = sum((build_diagnostics(r) for r in all_results), []) actes = sum((build_actes(r) for r in all_results), []) ep = sum((build_elements_preuve(r) for r in all_results), []) df_rum = pd.DataFrame(rum) if rum else pd.DataFrame(columns=["n_ogc","rum_etab_n_rum","rum_reco_n_rum"]) df_diag = pd.DataFrame(diag) if diag else pd.DataFrame(columns=["n_ogc","n_rum","source","type","code","niveau","libelle"]) df_actes = pd.DataFrame(actes) if actes else pd.DataFrame(columns=["n_ogc","n_rum","source","code","niveau","libelle"]) df_ep = pd.DataFrame(ep) if ep else pd.DataFrame(columns=["n_ogc","document","present","photocopie"]) # Feuille Timing — résumé par dossier timing_rows = [] for t in all_timings: nb_erreurs = len(t.get("erreurs", [])) nb_429 = len(t.get("blocages_429", [])) attente_429 = sum(b["attente_s"] for b in t.get("blocages_429", [])) timing_rows.append({ "fichier": t["fichier"], "debut": t.get("debut", ""), "fin": t.get("fin", ""), "duree_totale_s": t.get("duree_totale_s", ""), "nb_pages": t.get("nb_pages_total", ""), "nb_erreurs": nb_erreurs, "nb_blocages_429": nb_429, "attente_429_s": attente_429, "retries_total": t.get("retries_total", 0), }) df_timing = pd.DataFrame(timing_rows) if timing_rows else pd.DataFrame() with pd.ExcelWriter(path, engine="openpyxl") as w: df_main.to_excel(w, sheet_name="Données principales", index=False) df_rum.to_excel(w, sheet_name="RUM", index=False) df_diag.to_excel(w, sheet_name="Diagnostics", index=False) df_actes.to_excel(w, sheet_name="Actes", index=False) df_ep.to_excel(w, sheet_name="Eléments de preuve", index=False) df_timing.to_excel(w, sheet_name="Timing", index=False) print(f"\n✓ Excel : {path}") print(f" Données principales : {len(df_main)} lignes") print(f" RUM : {len(df_rum)} lignes") print(f" Diagnostics : {len(df_diag)} lignes") print(f" Actes : {len(df_actes)} lignes") print(f" Eléments de preuve : {len(df_ep)} lignes") print(f" Timing : {len(df_timing)} lignes") # ─── Rapport PDF Timing ─────────────────────────────────────────────────────── def _fmt_s(s): """Formate des secondes en mm:ss ou hh:mm:ss lisible.""" if s is None: return "—" s = int(s) h, r = divmod(s, 3600) m, sec = divmod(r, 60) if h: return f"{h}h{m:02d}m{sec:02d}s" if m: return f"{m}m{sec:02d}s" return f"{sec}s" def build_timing_pdf(all_timings: list, path: Path, model: str = MODEL): """Génère un rapport PDF d'analyse de temps d'extraction.""" doc = SimpleDocTemplate( str(path), pagesize=A4, leftMargin=2*cm, rightMargin=2*cm, topMargin=2*cm, bottomMargin=2*cm, ) styles = getSampleStyleSheet() title_style = ParagraphStyle("title", parent=styles["Title"], fontSize=18, spaceAfter=6) h2_style = ParagraphStyle("h2", parent=styles["Heading2"], fontSize=13, spaceBefore=14, spaceAfter=4) h3_style = ParagraphStyle("h3", parent=styles["Heading3"], fontSize=11, spaceBefore=10, spaceAfter=3) body_style = ParagraphStyle("body", parent=styles["Normal"], fontSize=9, spaceAfter=3) warn_style = ParagraphStyle("warn", parent=styles["Normal"], fontSize=9, textColor=colors.red, spaceAfter=3) story = [] # ── Titre ── story.append(Paragraph(f"Rapport d'analyse de temps — {model}", title_style)) story.append(Paragraph(f"Généré le {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}", body_style)) story.append(HRFlowable(width="100%", thickness=1, color=colors.grey)) story.append(Spacer(1, 0.4*cm)) # ── Résumé global ── total_s = sum(t.get("duree_totale_s") or 0 for t in all_timings) total_pages = sum(t.get("nb_pages_total") or 0 for t in all_timings) total_err = sum(len(t.get("erreurs", [])) for t in all_timings) total_429 = sum(len(t.get("blocages_429", [])) for t in all_timings) total_wait = sum(b["attente_s"] for t in all_timings for b in t.get("blocages_429", [])) nb_dossiers = len(all_timings) story.append(Paragraph("Résumé global", h2_style)) summary_data = [ ["Métrique", "Valeur"], ["Nombre de dossiers traités", str(nb_dossiers)], ["Nombre de pages total", str(total_pages)], ["Durée totale d'extraction", _fmt_s(total_s)], ["Durée moyenne / dossier", _fmt_s(total_s / nb_dossiers) if nb_dossiers else "—"], ["Durée moyenne / page", _fmt_s(total_s / total_pages) if total_pages else "—"], ["Erreurs totales", str(total_err)], ["Blocages 429 (rate limit)", str(total_429)], ["Temps perdu en attente 429", _fmt_s(total_wait)], ] t_sum = Table(summary_data, colWidths=[10*cm, 6*cm]) t_sum.setStyle(TableStyle([ ("BACKGROUND", (0,0), (-1,0), colors.HexColor("#2c3e50")), ("TEXTCOLOR", (0,0), (-1,0), colors.white), ("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"), ("FONTSIZE", (0,0), (-1,-1), 9), ("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]), ("GRID", (0,0), (-1,-1), 0.5, colors.grey), ("LEFTPADDING", (0,0), (-1,-1), 6), ("RIGHTPADDING",(0,0), (-1,-1), 6), ("TOPPADDING", (0,0), (-1,-1), 4), ("BOTTOMPADDING",(0,0),(-1,-1), 4), ])) story.append(t_sum) story.append(Spacer(1, 0.5*cm)) # ── Détail par dossier ── story.append(Paragraph("Détail par dossier", h2_style)) for t in all_timings: story.append(Paragraph(t["fichier"], h3_style)) nb_err = len(t.get("erreurs", [])) nb_b = len(t.get("blocages_429", [])) att = sum(b["attente_s"] for b in t.get("blocages_429", [])) # Calcul durée pages traitées pages = t.get("pages", []) duree_id = sum(p.get("duree_identification_s") or 0 for p in pages) duree_ext = sum(p.get("duree_extraction_s") or 0 for p in pages) nb_ok = sum(1 for p in pages if p.get("statut") == "ok") nb_ign = sum(1 for p in pages if p.get("statut") == "ignoree") rows = [ ["Début", t.get("debut", "—")[:19].replace("T", " ")], ["Fin", (t.get("fin") or "—")[:19].replace("T", " ")], ["Durée totale", _fmt_s(t.get("duree_totale_s"))], ["Pages totales", str(t.get("nb_pages_total", "—"))], ["Pages extraites (OK)", str(nb_ok)], ["Pages ignorées", str(nb_ign)], ["Temps identification", _fmt_s(duree_id)], ["Temps extraction", _fmt_s(duree_ext)], ["Erreurs", str(nb_err)], ["Blocages 429", str(nb_b)], ["Attente cumulée 429", _fmt_s(att)], ] tbl = Table(rows, colWidths=[8*cm, 8*cm]) tbl.setStyle(TableStyle([ ("FONTSIZE", (0,0), (-1,-1), 8), ("FONTNAME", (0,0), (0,-1), "Helvetica-Bold"), ("ROWBACKGROUNDS", (0,0), (-1,-1), [colors.HexColor("#f9f9f9"), colors.white]), ("GRID", (0,0), (-1,-1), 0.3, colors.lightgrey), ("LEFTPADDING", (0,0), (-1,-1), 5), ("TOPPADDING", (0,0), (-1,-1), 3), ("BOTTOMPADDING",(0,0),(-1,-1), 3), ])) story.append(tbl) # Détail pages if pages: story.append(Spacer(1, 0.2*cm)) story.append(Paragraph("Détail pages :", body_style)) page_rows = [["Page", "Type", "Identification", "Extraction", "Statut"]] for p in pages: page_rows.append([ str(p["page"]), p.get("type") or "—", _fmt_s(p.get("duree_identification_s")), _fmt_s(p.get("duree_extraction_s")), p.get("statut") or "—", ]) tp = Table(page_rows, colWidths=[1.5*cm, 5*cm, 3*cm, 3*cm, 3.5*cm]) tp.setStyle(TableStyle([ ("BACKGROUND", (0,0), (-1,0), colors.HexColor("#34495e")), ("TEXTCOLOR", (0,0), (-1,0), colors.white), ("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"), ("FONTSIZE", (0,0), (-1,-1), 7.5), ("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]), ("GRID", (0,0), (-1,-1), 0.3, colors.grey), ("LEFTPADDING", (0,0), (-1,-1), 4), ("TOPPADDING", (0,0), (-1,-1), 3), ("BOTTOMPADDING",(0,0),(-1,-1), 3), ])) story.append(tp) # Erreurs if t.get("erreurs"): story.append(Spacer(1, 0.2*cm)) story.append(Paragraph("Erreurs :", warn_style)) for err in t["erreurs"]: msg = f"Page {err['page']} — {err['phase']} : {err['message'][:120]}" story.append(Paragraph(msg, warn_style)) # Blocages 429 if t.get("blocages_429"): story.append(Paragraph("Blocages rate limit (429) :", warn_style)) for b in t["blocages_429"]: msg = (f"Tentative {b['tentative']} — attente {b['attente_s']}s " f"à {b['ts'][:19].replace('T', ' ')}") story.append(Paragraph(msg, warn_style)) story.append(Spacer(1, 0.4*cm)) story.append(HRFlowable(width="100%", thickness=0.5, color=colors.lightgrey)) doc.build(story) print(f"✓ Rapport timing PDF : {path}") # ─── Main ───────────────────────────────────────────────────────────────────── def main(): pdf_files = sorted(SCAN_DIR.glob("*.pdf")) if not pdf_files: print(f"Aucun PDF dans {SCAN_DIR}") sys.exit(1) if len(sys.argv) > 1: pdf_files = [f for f in pdf_files if sys.argv[1] in f.name] if not pdf_files: print(f"Aucun fichier pour '{sys.argv[1]}'") sys.exit(1) print(f"Modèle : {MODEL}") print(f"Fichiers: {len(pdf_files)}") for f in pdf_files: print(f" - {f.name}") # Charge le cache existant pour relances partielles json_path = OUTPUT_DIR / "extraction_ogc_raw_qwen.json" timing_path = OUTPUT_DIR / "timing_stats.json" cache: dict[str, dict] = {} timing_cache: dict[str, dict] = {} if json_path.exists() and len(sys.argv) > 1: with open(json_path, encoding="utf-8") as f: for r in json.load(f): cache[r["fichier"]] = r print(f"({len(cache)} fichiers en cache)") if timing_path.exists() and len(sys.argv) > 1: with open(timing_path, encoding="utf-8") as f: for t in json.load(f): timing_cache[t["fichier"]] = t for pdf_path in pdf_files: try: result, timing = process_pdf(pdf_path) cache[pdf_path.name] = result timing_cache[pdf_path.name] = timing except Exception as e: print(f"\n⚠ Erreur {pdf_path.name} : {e}") cache[pdf_path.name] = {"fichier": pdf_path.name, "erreur": str(e), "pages_traitees": [], "pages_ignorees": []} timing_cache[pdf_path.name] = { "fichier": pdf_path.name, "erreur_globale": str(e), "debut": None, "fin": None, "duree_totale_s": None, "nb_pages_total": 0, "pages": [], "erreurs": [], "blocages_429": [], } all_results = sorted(cache.values(), key=lambda r: r["fichier"]) all_timings = sorted(timing_cache.values(), key=lambda t: t["fichier"]) export_excel(all_results, all_timings, OUTPUT_DIR / "extraction_ogc.xlsx") with open(json_path, "w", encoding="utf-8") as f: json.dump(all_results, f, ensure_ascii=False, indent=2) print(f"✓ JSON : {json_path}") with open(timing_path, "w", encoding="utf-8") as f: json.dump(all_timings, f, ensure_ascii=False, indent=2) print(f"✓ Timing JSON : {timing_path}") rapport_path = OUTPUT_DIR / "rapport_timing.pdf" build_timing_pdf(all_timings, rapport_path) print(f"✓ Rapport PDF : {rapport_path}") if __name__ == "__main__": main()