Files
ScanOGC_extract/extraction.py
2026-04-24 11:04:31 +02:00

1049 lines
48 KiB
Python

"""
Extraction OGC → Excel
Modèle : ministral-3:8b-cloud via Ollama
"""
import base64
import io
import json
import re
import sys
import time
from datetime import datetime
from pathlib import Path
import pandas as pd
import requests
from pdf2image import convert_from_path
from PIL import Image
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
)
# ─── Config ───────────────────────────────────────────────────────────────────
SCAN_DIR = Path(__file__).parent / "scanOgc"
OUTPUT_DIR = Path(__file__).parent / "output"
OUTPUT_DIR.mkdir(exist_ok=True)
OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL = "ministral-3:8b-cloud"
PDF_DPI = 200
# Rate-limit : pause entre chaque appel et retry sur 429
INTER_REQUEST_DELAY = 3 # secondes (plus conservateur que qwen car quota plus serré)
RETRY_MAX = 6
RETRY_DELAY_429 = 60 # secondes — plafond à 120s dans ask_vision
# ─── Utilitaires image ────────────────────────────────────────────────────────
def image_to_b64(img: Image.Image) -> str:
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
return base64.b64encode(buf.getvalue()).decode()
# ─── Appel Ollama ─────────────────────────────────────────────────────────────
def ask_vision(prompt: str, img: Image.Image,
timeout: int = 120, num_predict: int = 2048,
timing_record: dict = None) -> str:
"""
Envoie une image + prompt à Ollama en mode streaming.
ministral-3 est un modèle texte/vision sans thinking mode :
num_predict=2048 suffit largement (réponses courtes et rapides).
Retry automatique sur 429 (rate limit cloud).
timing_record : dict optionnel pour enregistrer retries/blocages.
"""
payload = {
"model": MODEL,
"prompt": prompt,
"images": [image_to_b64(img)],
"stream": True,
"options": {"temperature": 0, "num_predict": num_predict},
}
for attempt in range(1, RETRY_MAX + 1):
try:
resp = requests.post(OLLAMA_URL, json=payload,
timeout=timeout, stream=True)
if resp.status_code == 429:
wait = min(RETRY_DELAY_429 * attempt, 120)
print(f" ⏳ Rate limit — attente {wait}s "
f"(tentative {attempt}/{RETRY_MAX})...")
if timing_record is not None:
timing_record.setdefault("blocages_429", []).append({
"tentative": attempt,
"attente_s": wait,
"ts": datetime.now().isoformat(),
})
time.sleep(wait)
continue
resp.raise_for_status()
tokens = []
for line in resp.iter_lines():
if not line:
continue
try:
chunk = json.loads(line)
except json.JSONDecodeError:
continue
if chunk.get("response"):
tokens.append(chunk["response"])
if chunk.get("done"):
break
if timing_record is not None and attempt > 1:
timing_record["retries_total"] = \
timing_record.get("retries_total", 0) + (attempt - 1)
time.sleep(INTER_REQUEST_DELAY)
return "".join(tokens)
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
wait = min(RETRY_DELAY_429 * attempt, 120)
print(f" ⏳ Rate limit — attente {wait}s "
f"(tentative {attempt}/{RETRY_MAX})...")
if timing_record is not None:
timing_record.setdefault("blocages_429", []).append({
"tentative": attempt,
"attente_s": wait,
"ts": datetime.now().isoformat(),
})
time.sleep(wait)
continue
raise
raise RuntimeError(f"Echec après {RETRY_MAX} tentatives (rate limit persistant)")
# ─── Extraction JSON depuis la réponse ───────────────────────────────────────
def _try_parse(text: str):
for candidate in (
text,
text.replace("\n", " ").replace("\r", " "),
re.sub(r",\s*([}\]])", r"\1", text), # trailing commas
re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text), # control chars
):
try:
return json.loads(candidate)
except json.JSONDecodeError:
pass
return None
def _extract_balanced(text: str, open_c: str, close_c: str):
"""Extrait la première structure équilibrée open_c…close_c du texte."""
start = text.find(open_c)
if start == -1:
return None
depth = 0
in_str = False
escape = False
for i, ch in enumerate(text[start:], start):
if escape:
escape = False
continue
if ch == "\\" and in_str:
escape = True
continue
if ch == '"' and not escape:
in_str = not in_str
continue
if in_str:
continue
if ch == open_c:
depth += 1
elif ch == close_c:
depth -= 1
if depth == 0:
return text[start:i+1]
return None
def extract_json(text: str):
# 1. Bloc ```json … ```
m = re.search(r"```json\s*([\s\S]*?)```", text)
if m:
result = _try_parse(m.group(1).strip())
if result is not None:
return result
# 2. Extraction par accolades équilibrées (plus robuste que greedy regex)
for open_c, close_c in (('{', '}'), ('[', ']')):
candidate = _extract_balanced(text, open_c, close_c)
if candidate:
result = _try_parse(candidate)
if result is not None:
return result
# 3. Fallback greedy regex
for pattern in (r"(\{[\s\S]*\})", r"(\[[\s\S]*\])"):
m = re.search(pattern, text)
if m:
result = _try_parse(m.group(1))
if result is not None:
return result
return None
# ─── Prompts ──────────────────────────────────────────────────────────────────
PROMPT_IDENTIFY = """\
Tu es un assistant d'analyse de documents médicaux français.
Regarde cette image et identifie son type parmi :
- FICHE_RECUEIL : "FICHE MEDICALE DE RECUEIL DU PRATICIEN CONSEIL"
- FICHE_CONCERTATION_VIDE: "FICHE MEDICALE DE CONCERTATION" (page quasi vide)
- SEJOUR_MANUSCRIT : "Séjour d'hospitalisation complète" (colonnes manuscrites)
- ELEMENTS_PREUVE : "Eléments de preuve tracés au dossier du patient"
- FICHE_ADMIN_2_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 2/2"
- FICHE_ADMIN_1_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 1/2"
- AUTRE : autre type
Réponds UNIQUEMENT avec le code du type, sans aucune explication.\
"""
PROMPT_FICHE_RECUEIL = """\
Tu es un assistant d'extraction de données médicales.
Extrait toutes les informations imprimées de cette fiche médicale de recueil du praticien conseil.
RÈGLES STRICTES :
- Si un champ n'a pas de valeur clairement visible et imprimée, retourner une chaîne vide "".
- Ne jamais deviner, inférer ou compléter un champ absent.
- Le champ "provenance" est souvent vide : ne pas le remplir sauf si une valeur est explicitement imprimée.
- Le champ "se_coche" correspond aux cases SE1/SE2/SE3/SE4 : retourner "SE1", "SE2", "SE3" ou "SE4" si une case est explicitement cochée, sinon "". Ce champ est TRÈS SOUVENT vide — ne rien mettre par défaut. NE PAS confondre avec "accord_desaccord" qui est un champ séparé.
- Le champ "accord_desaccord" est distinct de "se_coche" : il indique accord/désaccord du praticien conseil, pas les cases SE.
- Le champ "dr_etab" (Diagnostic Relié) est distinct des DAS : ne mettre un code que s'il y a une ligne DR EXPLICITEMENT RENSEIGNÉE sur la fiche. Si la ligne DR est vide ou absente sur le document, retourner "" obligatoirement. NE JAMAIS copier le premier DAS dans DR — ce sont deux lignes séparées sur la grille.
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
- IMPORTANT : extraire TOUTES les lignes non vides de das_etab, actes_etab, das_reco et actes_reco sans limite de nombre. Ne jamais tronquer ces listes.
- Les actes (CCAM, codes à 7+ caractères commençant par des lettres ex: JDPE002, NJFA008) vont dans "actes_etab", pas dans "das_etab". Les diagnostics (CIM-10, codes courts ex: N320, R33) vont dans "das_etab".
{"n_ogc":"","etablissement":"","finess":"","date_debut_controle":"","n_champ":"","libelle_champ":"","dossier_manquant":"","date_debut_sejour":"","date_fin_sejour":"",
"sejour_etab":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
"sejour_reco":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
"rum_etab":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
"rum_reco":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
"dp_etab":{"code":"","libelle":""},"dr_etab":{"code":"","libelle":""},
"das_etab":[
{"code":"","rang":"","libelle":""},
{"code":"","rang":"","libelle":""}
],
"actes_etab":[{"code":"","niveau":"","libelle":""}],
"dp_reco":{"code":""},"dr_reco":{"code":""},
"das_reco":[{"code":"","niveau":""}],"actes_reco":[{"code":"","niveau":""}],
"ghm_etab":"","ghs_etab":"","ghm_reco":"","ghs_reco":"",
"recodage_impactant_facturation":"","ghs_injustifie":"",
"se_coche":"","atu":"","ffm":"","fsd":"","accord_desaccord":"","nom_praticien_conseil":""}\
"""
PROMPT_ELEMENTS_PREUVE = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de cette page "Eléments de preuve tracés au dossier du patient".
Pour chaque ligne : "present"=oui/non, "photocopie"=nombre écrit, dates si présentes.
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"date":"","medecin_controleur_signataire":"","medecin_dim_signataire":"",
"elements":{"compte_rendu_acte":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_operatoire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_accouchement":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_examen_complementaire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_imagerie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_anatomopathologie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"observations_medicales":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"dossier_transfusion":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"dossier_anesthesie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"administration_therapeutique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_hospitalisation":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"lettre_sortie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"surveillance_dossier_infirmier":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"prise_en_charge_psychologue":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"prise_en_charge_kinesitherapeute":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"prise_en_charge_dietetique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"autre":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}}}\
"""
PROMPT_FICHE_ADMIN_2_2 = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de cette fiche administrative de concertation 2/2.
RÈGLES STRICTES :
- Pour "maintien_avis_controleur", "retour_groupage_dim", "autre_groupage" : retourner "oui" si la case est cochée (X, ✓ ou toute marque), "non" si la case est décochée, "" si absent.
- Pour les champs GHS (nombres) : retourner uniquement les chiffres sans point ni espace (ex: "6173" et non "6.173").
- Si un champ est absent ou illisible, retourner "".
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"n_ogc":"","ghs_initial":"","ghs_avant_concertation":"","ghs_final_apres_concertation":"",
"maintien_avis_controleur":"","retour_groupage_dim":"","autre_groupage":"",
"avis_dim_final":"","date_concertation":"",
"nom_medecin_responsable_controle":"","nom_medecin_dim":""}\
"""
PROMPT_FICHE_ADMIN_1_2 = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de cette fiche administrative de concertation 1/2.
L'argumentaire est un texte long imprimé (pas manuscrit).
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"n_ogc":"","date_concertation":"","argumentaire_medecin_controleur":""}\
"""
PROMPTS = {
"FICHE_RECUEIL": PROMPT_FICHE_RECUEIL,
"ELEMENTS_PREUVE": PROMPT_ELEMENTS_PREUVE,
"FICHE_ADMIN_2_2": PROMPT_FICHE_ADMIN_2_2,
"FICHE_ADMIN_1_2": PROMPT_FICHE_ADMIN_1_2,
}
SKIP_TYPES = {"SEJOUR_MANUSCRIT", "FICHE_CONCERTATION_VIDE", "AUTRE"}
# ─── Normalisation post-extraction ───────────────────────────────────────────
_CHECKBOX_OUI = {"x", "oui", "", "", "coché", "v", "yes"}
def _norm_checkbox(val: str) -> str:
"""Convertit toute marque de case cochée en 'oui', conserve 'non', vide sinon."""
v = str(val).strip().lower()
if v in _CHECKBOX_OUI:
return "oui"
if v == "non":
return "non"
return ""
def _strip_dot_number(val: str) -> str:
"""Supprime le point parasite dans un nombre (ex: '6.173''6173', '.0''0')."""
v = str(val).strip()
if re.match(r"^\d+\.\d+$", v):
return v.replace(".", "")
if v.startswith(".") and v[1:].isdigit():
return v[1:]
return v
def _calc_duree_rum(debut: str, fin: str):
"""Calcule la durée en jours entre deux dates (DD/MM/YYYY ou YYYY-MM-DD). Retourne None si non parsable."""
for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y"):
try:
d1 = datetime.strptime(str(debut).strip(), fmt)
d2 = datetime.strptime(str(fin).strip(), fmt)
return (d2 - d1).days
except (ValueError, AttributeError):
pass
return None
def _normalize_result(result: dict) -> None:
"""Normalise les données extraites en place (checkboxes, chiffres mal lus, se_coche, durées)."""
for pt in result.get("pages_traitees", []):
d = pt.get("data", {})
if not isinstance(d, dict):
continue
ptype = pt.get("type")
if ptype == "FICHE_ADMIN_2_2":
for field in ("maintien_avis_controleur", "retour_groupage_dim", "autre_groupage"):
if field in d:
d[field] = _norm_checkbox(d[field])
for field in ("ghs_initial", "ghs_avant_concertation", "ghs_final_apres_concertation"):
if d.get(field):
d[field] = _strip_dot_number(d[field])
if ptype == "FICHE_RECUEIL":
# Guard anti-confusion DR/DAS
for dr_k, das_k in (("dr_etab", "das_etab"), ("dr_reco", "das_reco")):
dr_code = (d.get(dr_k) or {}).get("code", "").strip()
if not dr_code:
continue
das = [x for x in (d.get(das_k) or []) if isinstance(x, dict) and x.get("code")]
das_codes = {x.get("code", "").strip() for x in das}
if dr_code in das_codes:
# Cas 1 : DR duplique un DAS existant → vider DR
d[dr_k] = {"code": "", "libelle": ""}
elif not das:
# Cas 2 : DAS vide mais DR renseigné → confusion modèle,
# déplacer le DR dans das comme premier DAS
dr_entry = d.get(dr_k) or {}
new_das_entry = {"code": dr_code, "rang": dr_entry.get("rang", "")}
if dr_k == "dr_etab":
new_das_entry["libelle"] = dr_entry.get("libelle", "")
d[das_k] = [new_das_entry]
d[dr_k] = {"code": "", "libelle": ""}
for section in ("rum_etab", "rum_reco"):
sec = d.get(section) or {}
if sec.get("nature_suppl"):
sec["nature_suppl"] = _strip_dot_number(sec["nature_suppl"])
duree = _calc_duree_rum(sec.get("duree_rum_debut", ""), sec.get("duree_rum_fin", ""))
if duree is not None:
sec["duree_rum_calculee_j"] = duree
# se_coche : normaliser "1"→"SE1", rejeter toute valeur non SE1-4
se_raw = str(d.get("se_coche", "")).strip()
if se_raw in {"1", "2", "3", "4"}:
d["se_coche"] = f"SE{se_raw}"
elif se_raw.upper() in {"SE1", "SE2", "SE3", "SE4"}:
d["se_coche"] = se_raw.upper()
elif se_raw:
d["se_coche"] = ""
def compute_audit(result: dict) -> dict:
"""
Calcule un bloc _audit pour l'OGC.
score_global ∈ [0,1] — seuil d'alerte : 0.80
"""
checks: list[tuple[str, float]] = []
for pt in result.get("pages_traitees", []):
ptype = pt.get("type")
d = pt.get("data", {})
page = pt.get("page", "?")
if not isinstance(d, dict):
continue
if "raw_response" in d:
checks.append((f"page_{page}_json", 0.10))
continue
if ptype == "FICHE_RECUEIL":
checks.append(("n_ogc", 1.0 if d.get("n_ogc") else 0.20))
dr_code = (d.get("dr_etab") or {}).get("code", "")
checks.append(("dr_etab", 0.31 if dr_code else 1.0))
prov = str((d.get("sejour_etab") or {}).get("provenance", "")).strip()
checks.append(("sejour_etab.provenance", 0.40 if prov else 1.0))
se_val = str(d.get("se_coche", "")).strip().lower()
if not se_val:
checks.append(("se_coche", 1.0))
elif se_val in {"se1", "se2", "se3", "se4", "1", "2", "3", "4"}:
checks.append(("se_coche", 0.90))
else:
checks.append(("se_coche", 0.20))
dp_code = (d.get("dp_etab") or {}).get("code", "")
das = [x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")]
checks.append(("das_etab", 0.50 if (dp_code and not das) else 1.0))
acte_like = any(
len(x.get("code", "")) >= 7 and x.get("code", "")[:4].isalpha()
for x in das
)
checks.append(("das_etab.codes", 0.35 if acte_like else 1.0))
elif ptype == "FICHE_ADMIN_2_2":
maintien = str(d.get("maintien_avis_controleur", "")).strip().lower()
retour = str(d.get("retour_groupage_dim", "")).strip().lower()
autre = str(d.get("autre_groupage", "")).strip().lower()
aucun_coche = not any(v == "oui" for v in (maintien, retour, autre))
checks.append(("maintien_retour_autre", 0.50 if aucun_coche else 1.0))
ghs = str(d.get("ghs_final_apres_concertation", "")).strip()
checks.append(("ghs_final", 0.40 if ("." in ghs and ghs) else 1.0))
elif ptype == "ELEMENTS_PREUVE":
suspect = any(
re.search(r"[A-Za-z]", str((v or {}).get("photocopie", "")))
for v in (d.get("elements") or {}).values()
if isinstance(v, dict)
)
checks.append(("elements.photocopie", 0.40 if suspect else 1.0))
if not checks:
score_global = 1.0
alertes = []
else:
scores = [s for _, s in checks]
score_global = round(sum(scores) / len(scores), 2)
alertes = [
{"champ": champ, "score": score}
for champ, score in checks
if score < 0.80
]
return {
"score_global": score_global,
"alertes": alertes,
"modele": MODEL,
"date_extraction": datetime.now().strftime("%Y-%m-%d"),
}
# ─── Traitement d'un PDF ──────────────────────────────────────────────────────
def process_pdf(pdf_path: Path) -> tuple[dict, dict]:
"""Retourne (result, timing) où timing contient toutes les métriques de temps."""
print(f"\n{'='*60}\nTraitement : {pdf_path.name}\n{'='*60}")
pdf_start = time.time()
timing = {
"fichier": pdf_path.name,
"debut": datetime.now().isoformat(),
"fin": None,
"duree_totale_s": None,
"nb_pages_total": 0,
"pages": [],
"erreurs": [],
"blocages_429": [],
"retries_total": 0,
}
pages = convert_from_path(str(pdf_path), dpi=PDF_DPI)
timing["nb_pages_total"] = len(pages)
result = {"fichier": pdf_path.name, "pages_traitees": [], "pages_ignorees": []}
for i, img in enumerate(pages, start=1):
print(f"\n Page {i}/{len(pages)} — identification...")
page_timing = {
"page": i,
"type": None,
"duree_identification_s": None,
"duree_extraction_s": None,
"statut": None,
"erreur": None,
}
t0 = time.time()
try:
raw_type = ask_vision(PROMPT_IDENTIFY, img, timeout=60, num_predict=32,
timing_record=timing)
except Exception as e:
print(f" ⚠ Erreur identification : {e}")
page_timing["duree_identification_s"] = round(time.time() - t0, 2)
page_timing["statut"] = "erreur_identification"
page_timing["erreur"] = str(e)
timing["erreurs"].append({"page": i, "phase": "identification", "message": str(e)})
timing["pages"].append(page_timing)
result["pages_ignorees"].append({"page": i, "type": "ERREUR_IDENTIFICATION"})
continue
duree_id = round(time.time() - t0, 2)
page_timing["duree_identification_s"] = duree_id
page_type = "AUTRE"
for known in list(PROMPTS.keys()) + list(SKIP_TYPES):
if known in raw_type:
page_type = known
break
page_timing["type"] = page_type
print(f" → Type : {page_type} ({duree_id:.1f}s)")
if page_type in SKIP_TYPES:
page_timing["statut"] = "ignoree"
timing["pages"].append(page_timing)
result["pages_ignorees"].append({"page": i, "type": page_type})
print(" → Ignorée.")
continue
print(" → Extraction en cours...")
t0 = time.time()
try:
raw = ask_vision(PROMPTS[page_type], img, timeout=120, num_predict=2048,
timing_record=timing)
except Exception as e:
print(f" ⚠ Erreur extraction : {e}")
duree_ext = round(time.time() - t0, 2)
page_timing["duree_extraction_s"] = duree_ext
page_timing["statut"] = "erreur_extraction"
page_timing["erreur"] = str(e)
timing["erreurs"].append({"page": i, "phase": "extraction", "type": page_type, "message": str(e)})
timing["pages"].append(page_timing)
result["pages_traitees"].append({"page": i, "type": page_type,
"data": {"erreur": str(e)}})
continue
duree_ext = round(time.time() - t0, 2)
page_timing["duree_extraction_s"] = duree_ext
print(f" → Réponse reçue ({duree_ext:.1f}s)")
data = extract_json(raw)
if data is None:
print(f" ⚠ JSON non parsable — retry en cours...")
retry_prompt = (
"Ta réponse précédente n'était pas un JSON valide. "
"Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après, "
"sans bloc ```json```. Voici le schéma attendu :\n\n"
+ PROMPTS[page_type]
)
try:
raw2 = ask_vision(retry_prompt, img, timeout=120, num_predict=4096,
timing_record=timing)
data = extract_json(raw2)
except Exception as e:
print(f" ⚠ Erreur retry : {e}")
data = None
if data is None:
print(f" ⚠ Retry échoué — raw_response conservé")
page_timing["statut"] = "json_non_parsable"
timing["erreurs"].append({
"page": i, "phase": "parsing_json", "type": page_type,
"message": f"JSON non parsable après retry : {raw[:100]}",
"retry": True,
})
data = {"raw_response": raw}
else:
print(f" ✓ Retry réussi")
page_timing["statut"] = "ok_after_retry"
timing["erreurs"].append({
"page": i, "phase": "parsing_json", "type": page_type,
"message": "JSON non parsable au 1er appel, corrigé par retry",
"retry": True, "retry_ok": True,
})
else:
page_timing["statut"] = "ok"
timing["pages"].append(page_timing)
result["pages_traitees"].append({"page": i, "type": page_type, "data": data})
print(" ✓ OK")
timing["fin"] = datetime.now().isoformat()
timing["duree_totale_s"] = round(time.time() - pdf_start, 2)
_normalize_result(result)
result["_audit"] = compute_audit(result)
return result, timing
# ─── Aplatissement pour Excel ─────────────────────────────────────────────────
def flatten(result: dict) -> dict:
row = {"fichier": result["fichier"]}
general_done = False # champs généraux pris sur la 1re page FICHE_RECUEIL uniquement
for pt in result["pages_traitees"]:
d, ptype = pt["data"], pt["type"]
if ptype == "FICHE_RECUEIL":
# ── Champs généraux (par séjour, identiques sur chaque page RUM) ──
if not general_done:
for k in ["n_ogc","etablissement","finess","date_debut_controle","n_champ",
"libelle_champ","dossier_manquant","date_debut_sejour","date_fin_sejour"]:
row[k] = d.get(k, "")
for prefix in ("sejour_etab","sejour_reco"):
for k, v in (d.get(prefix) or {}).items():
row[f"{prefix}_{k}"] = v
row["dp_etab_code"] = (d.get("dp_etab") or {}).get("code", "")
row["dp_etab_libelle"] = (d.get("dp_etab") or {}).get("libelle", "")
row["dr_etab_code"] = (d.get("dr_etab") or {}).get("code", "")
row["dr_etab_libelle"] = (d.get("dr_etab") or {}).get("libelle", "")
row["dp_reco_code"] = (d.get("dp_reco") or {}).get("code", "")
row["dr_reco_code"] = (d.get("dr_reco") or {}).get("code", "")
for k in ["ghm_etab","ghs_etab","ghm_reco","ghs_reco",
"recodage_impactant_facturation","ghs_injustifie",
"se_coche","atu","ffm","fsd","accord_desaccord","nom_praticien_conseil"]:
row[k] = d.get(k, "")
general_done = True
# ── Comptages et durées agrégés sur tous les RUM ──
row["nb_das_etab"] = row.get("nb_das_etab", 0) + len([x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")])
row["nb_actes_etab"] = row.get("nb_actes_etab", 0) + len([x for x in (d.get("actes_etab") or []) if isinstance(x, dict) and x.get("code")])
row["nb_das_reco"] = row.get("nb_das_reco", 0) + len([x for x in (d.get("das_reco") or []) if isinstance(x, dict) and x.get("code")])
row["nb_actes_reco"] = row.get("nb_actes_reco", 0) + len([x for x in (d.get("actes_reco") or []) if isinstance(x, dict) and x.get("code")])
for section, col in (("rum_etab", "duree_sejour_calc_etab_j"),
("rum_reco", "duree_sejour_calc_reco_j")):
duree = (d.get(section) or {}).get("duree_rum_calculee_j")
if duree is not None:
row[col] = row.get(col, 0) + duree
elif ptype == "ELEMENTS_PREUVE":
row["ep_date"] = d.get("date", "")
row["ep_medecin_controleur"] = d.get("medecin_controleur_signataire", "")
row["ep_medecin_dim"] = d.get("medecin_dim_signataire", "")
for doc, vals in (d.get("elements") or {}).items():
for col, val in (vals or {}).items():
row[f"ep_{doc}_{col}"] = val
elif ptype == "FICHE_ADMIN_2_2":
if not row.get("n_ogc"):
row["n_ogc"] = d.get("n_ogc", "")
for k in ["ghs_initial","ghs_avant_concertation","ghs_final_apres_concertation",
"maintien_avis_controleur","retour_groupage_dim","autre_groupage",
"avis_dim_final","date_concertation",
"nom_medecin_responsable_controle","nom_medecin_dim"]:
row[f"admin22_{k}"] = d.get(k, "")
elif ptype == "FICHE_ADMIN_1_2":
row["admin12_date_concertation"] = d.get("date_concertation", "")
row["admin12_argumentaire"] = d.get("argumentaire_medecin_controleur", "")
# ── RÈGLE MÉTIER GHS FINAL ─────────────────────────────────────────────
maintien = str(row.get("admin22_maintien_avis_controleur", "")).lower()
retour = str(row.get("admin22_retour_groupage_dim", "")).lower()
if maintien == "oui":
row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_initial", "")
elif retour == "oui":
row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_avant_concertation", "")
# ── FIN RÈGLE MÉTIER GHS FINAL ────────────────────────────────────────
return row
def build_rum(result: dict) -> list:
"""1 ligne par RUM par OGC — données spécifiques au RUM."""
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "FICHE_RECUEIL":
continue
d = pt["data"]
ogc = d.get("n_ogc", result["fichier"])
row = {"n_ogc": ogc}
for prefix in ("rum_etab", "rum_reco"):
for k, v in (d.get(prefix) or {}).items():
row[f"{prefix}_{k}"] = v
rows.append(row)
return rows
def build_diagnostics(result: dict) -> list:
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "FICHE_RECUEIL":
continue
d = pt["data"]
ogc = d.get("n_ogc", result["fichier"])
n_rum = (d.get("rum_etab") or {}).get("n_rum", "")
for src, dp_k, dr_k, das_k in [
("etablissement", "dp_etab", "dr_etab", "das_etab"),
("recodage", "dp_reco", "dr_reco", "das_reco"),
]:
dp = d.get(dp_k) or {}
if dp.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DP",
"code": dp["code"], "niveau": "",
"libelle": dp.get("libelle", "")})
dr = d.get(dr_k) or {}
if dr.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DR",
"code": dr["code"], "niveau": "",
"libelle": dr.get("libelle", "")})
for das in (d.get(das_k) or []):
if isinstance(das, dict) and das.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DAS",
"code": das["code"], "niveau": das.get("niveau", ""),
"libelle": das.get("libelle", "")})
return rows
def build_actes(result: dict) -> list:
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "FICHE_RECUEIL":
continue
d = pt["data"]
ogc = d.get("n_ogc", result["fichier"])
n_rum = (d.get("rum_etab") or {}).get("n_rum", "")
for src, k in [("etablissement","actes_etab"), ("recodage","actes_reco")]:
for a in (d.get(k) or []):
if isinstance(a, dict) and a.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src,
"code": a["code"], "niveau": a.get("niveau", ""),
"libelle": a.get("libelle", "")})
return rows
def build_elements_preuve(result: dict) -> list:
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "ELEMENTS_PREUVE":
continue
d = pt["data"]
ogc = result["fichier"]
for pt2 in result["pages_traitees"]:
if pt2["type"] == "FICHE_RECUEIL":
ogc = pt2["data"].get("n_ogc", ogc)
break
for doc, vals in (d.get("elements") or {}).items():
row = {"n_ogc": ogc, "document": doc}
row.update(vals or {})
rows.append(row)
return rows
# ─── Rapport PDF Timing ───────────────────────────────────────────────────────
def _fmt_s(s):
"""Formate des secondes en mm:ss ou hh:mm:ss lisible."""
if s is None:
return ""
s = int(s)
h, r = divmod(s, 3600)
m, sec = divmod(r, 60)
if h:
return f"{h}h{m:02d}m{sec:02d}s"
if m:
return f"{m}m{sec:02d}s"
return f"{sec}s"
def build_timing_pdf(all_timings: list, path: Path, model: str = MODEL):
"""Génère un rapport PDF d'analyse de temps d'extraction."""
doc = SimpleDocTemplate(
str(path), pagesize=A4,
leftMargin=2*cm, rightMargin=2*cm,
topMargin=2*cm, bottomMargin=2*cm,
)
styles = getSampleStyleSheet()
title_style = ParagraphStyle("title", parent=styles["Title"],
fontSize=18, spaceAfter=6)
h2_style = ParagraphStyle("h2", parent=styles["Heading2"],
fontSize=13, spaceBefore=14, spaceAfter=4)
h3_style = ParagraphStyle("h3", parent=styles["Heading3"],
fontSize=11, spaceBefore=10, spaceAfter=3)
body_style = ParagraphStyle("body", parent=styles["Normal"],
fontSize=9, spaceAfter=3)
warn_style = ParagraphStyle("warn", parent=styles["Normal"],
fontSize=9, textColor=colors.red, spaceAfter=3)
story = []
story.append(Paragraph(f"Rapport d'analyse de temps — {model}", title_style))
story.append(Paragraph(f"Généré le {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}", body_style))
story.append(HRFlowable(width="100%", thickness=1, color=colors.grey))
story.append(Spacer(1, 0.4*cm))
total_s = sum(t.get("duree_totale_s") or 0 for t in all_timings)
total_pages = sum(t.get("nb_pages_total") or 0 for t in all_timings)
total_err = sum(len(t.get("erreurs", [])) for t in all_timings)
total_429 = sum(len(t.get("blocages_429", [])) for t in all_timings)
total_wait = sum(b["attente_s"] for t in all_timings for b in t.get("blocages_429", []))
nb_dossiers = len(all_timings)
story.append(Paragraph("Résumé global", h2_style))
summary_data = [
["Métrique", "Valeur"],
["Nombre de dossiers traités", str(nb_dossiers)],
["Nombre de pages total", str(total_pages)],
["Durée totale d'extraction", _fmt_s(total_s)],
["Durée moyenne / dossier", _fmt_s(total_s / nb_dossiers) if nb_dossiers else ""],
["Durée moyenne / page", _fmt_s(total_s / total_pages) if total_pages else ""],
["Erreurs totales", str(total_err)],
["Blocages 429 (rate limit)", str(total_429)],
["Temps perdu en attente 429", _fmt_s(total_wait)],
]
t_sum = Table(summary_data, colWidths=[10*cm, 6*cm])
t_sum.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#2c3e50")),
("TEXTCOLOR", (0,0), (-1,0), colors.white),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("FONTSIZE", (0,0), (-1,-1), 9),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]),
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
("LEFTPADDING", (0,0), (-1,-1), 6),
("RIGHTPADDING",(0,0), (-1,-1), 6),
("TOPPADDING", (0,0), (-1,-1), 4),
("BOTTOMPADDING",(0,0),(-1,-1), 4),
]))
story.append(t_sum)
story.append(Spacer(1, 0.5*cm))
story.append(Paragraph("Détail par dossier", h2_style))
for t in all_timings:
story.append(Paragraph(t["fichier"], h3_style))
nb_err = len(t.get("erreurs", []))
nb_b = len(t.get("blocages_429", []))
att = sum(b["attente_s"] for b in t.get("blocages_429", []))
pages = t.get("pages", [])
duree_id = sum(p.get("duree_identification_s") or 0 for p in pages)
duree_ext = sum(p.get("duree_extraction_s") or 0 for p in pages)
nb_ok = sum(1 for p in pages if p.get("statut") == "ok")
nb_ign = sum(1 for p in pages if p.get("statut") == "ignoree")
rows = [
["Début", (t.get("debut") or "")[:19].replace("T", " ")],
["Fin", (t.get("fin") or "")[:19].replace("T", " ")],
["Durée totale", _fmt_s(t.get("duree_totale_s"))],
["Pages totales", str(t.get("nb_pages_total", ""))],
["Pages extraites (OK)", str(nb_ok)],
["Pages ignorées", str(nb_ign)],
["Temps identification", _fmt_s(duree_id)],
["Temps extraction", _fmt_s(duree_ext)],
["Erreurs", str(nb_err)],
["Blocages 429", str(nb_b)],
["Attente cumulée 429", _fmt_s(att)],
]
tbl = Table(rows, colWidths=[8*cm, 8*cm])
tbl.setStyle(TableStyle([
("FONTSIZE", (0,0), (-1,-1), 8),
("FONTNAME", (0,0), (0,-1), "Helvetica-Bold"),
("ROWBACKGROUNDS", (0,0), (-1,-1), [colors.HexColor("#f9f9f9"), colors.white]),
("GRID", (0,0), (-1,-1), 0.3, colors.lightgrey),
("LEFTPADDING", (0,0), (-1,-1), 5),
("TOPPADDING", (0,0), (-1,-1), 3),
("BOTTOMPADDING",(0,0),(-1,-1), 3),
]))
story.append(tbl)
if pages:
story.append(Spacer(1, 0.2*cm))
story.append(Paragraph("Détail pages :", body_style))
page_rows = [["Page", "Type", "Identification", "Extraction", "Statut"]]
for p in pages:
page_rows.append([
str(p["page"]),
p.get("type") or "",
_fmt_s(p.get("duree_identification_s")),
_fmt_s(p.get("duree_extraction_s")),
p.get("statut") or "",
])
tp = Table(page_rows, colWidths=[1.5*cm, 5*cm, 3*cm, 3*cm, 3.5*cm])
tp.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#34495e")),
("TEXTCOLOR", (0,0), (-1,0), colors.white),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("FONTSIZE", (0,0), (-1,-1), 7.5),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]),
("GRID", (0,0), (-1,-1), 0.3, colors.grey),
("LEFTPADDING", (0,0), (-1,-1), 4),
("TOPPADDING", (0,0), (-1,-1), 3),
("BOTTOMPADDING",(0,0),(-1,-1), 3),
]))
story.append(tp)
if t.get("erreurs"):
story.append(Spacer(1, 0.2*cm))
story.append(Paragraph("Erreurs :", warn_style))
for err in t["erreurs"]:
msg = f"Page {err['page']}{err['phase']} : {err['message'][:120]}"
story.append(Paragraph(msg, warn_style))
if t.get("blocages_429"):
story.append(Paragraph("Blocages rate limit (429) :", warn_style))
for b in t["blocages_429"]:
msg = (f"Tentative {b['tentative']} — attente {b['attente_s']}s "
f"à {b['ts'][:19].replace('T', ' ')}")
story.append(Paragraph(msg, warn_style))
story.append(Spacer(1, 0.4*cm))
story.append(HRFlowable(width="100%", thickness=0.5, color=colors.lightgrey))
doc.build(story)
print(f"✓ Rapport timing PDF : {path}")
# ─── Export Excel ─────────────────────────────────────────────────────────────
def export_excel(all_results: list, all_timings: list, path: Path):
df_main = pd.DataFrame([flatten(r) for r in all_results])
rum = sum((build_rum(r) for r in all_results), [])
diag = sum((build_diagnostics(r) for r in all_results), [])
actes = sum((build_actes(r) for r in all_results), [])
ep = sum((build_elements_preuve(r) for r in all_results), [])
df_rum = pd.DataFrame(rum) if rum else pd.DataFrame(columns=["n_ogc","rum_etab_n_rum","rum_reco_n_rum"])
df_diag = pd.DataFrame(diag) if diag else pd.DataFrame(columns=["n_ogc","n_rum","source","type","code","niveau","libelle"])
df_actes = pd.DataFrame(actes) if actes else pd.DataFrame(columns=["n_ogc","n_rum","source","code","niveau","libelle"])
df_ep = pd.DataFrame(ep) if ep else pd.DataFrame(columns=["n_ogc","document","present","photocopie"])
timing_rows = []
for t in all_timings:
nb_erreurs = len(t.get("erreurs", []))
nb_429 = len(t.get("blocages_429", []))
attente_429 = sum(b["attente_s"] for b in t.get("blocages_429", []))
timing_rows.append({
"fichier": t["fichier"],
"debut": t.get("debut", ""),
"fin": t.get("fin", ""),
"duree_totale_s": t.get("duree_totale_s", ""),
"nb_pages": t.get("nb_pages_total", ""),
"nb_erreurs": nb_erreurs,
"nb_blocages_429": nb_429,
"attente_429_s": attente_429,
"retries_total": t.get("retries_total", 0),
})
df_timing = pd.DataFrame(timing_rows) if timing_rows else pd.DataFrame()
with pd.ExcelWriter(path, engine="openpyxl") as w:
df_main.to_excel(w, sheet_name="Données principales", index=False)
df_rum.to_excel(w, sheet_name="RUM", index=False)
df_diag.to_excel(w, sheet_name="Diagnostics", index=False)
df_actes.to_excel(w, sheet_name="Actes", index=False)
df_ep.to_excel(w, sheet_name="Eléments de preuve", index=False)
df_timing.to_excel(w, sheet_name="Timing", index=False)
print(f"\n✓ Excel : {path}")
print(f" Données principales : {len(df_main)} lignes")
print(f" RUM : {len(df_rum)} lignes")
print(f" Diagnostics : {len(df_diag)} lignes")
print(f" Actes : {len(df_actes)} lignes")
print(f" Eléments de preuve : {len(df_ep)} lignes")
print(f" Timing : {len(df_timing)} lignes")
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
pdf_files = sorted(SCAN_DIR.glob("*.pdf"))
if not pdf_files:
print(f"Aucun PDF dans {SCAN_DIR}")
sys.exit(1)
if len(sys.argv) > 1:
pdf_files = [f for f in pdf_files if sys.argv[1] in f.name]
if not pdf_files:
print(f"Aucun fichier pour '{sys.argv[1]}'")
sys.exit(1)
print(f"Modèle : {MODEL}")
print(f"Fichiers: {len(pdf_files)}")
for f in pdf_files:
print(f" - {f.name}")
# Charge le cache existant pour relances partielles
json_path = OUTPUT_DIR / "extraction_ogc_raw_mistral.json"
timing_path = OUTPUT_DIR / "timing_stats.json"
cache: dict[str, dict] = {}
timing_cache: dict[str, dict] = {}
if json_path.exists() and len(sys.argv) > 1:
with open(json_path, encoding="utf-8") as f:
for r in json.load(f):
cache[r["fichier"]] = r
print(f"({len(cache)} fichiers en cache)")
if timing_path.exists() and len(sys.argv) > 1:
with open(timing_path, encoding="utf-8") as f:
for t in json.load(f):
timing_cache[t["fichier"]] = t
for pdf_path in pdf_files:
try:
result, timing = process_pdf(pdf_path)
cache[pdf_path.name] = result
timing_cache[pdf_path.name] = timing
except Exception as e:
print(f"\n⚠ Erreur {pdf_path.name} : {e}")
cache[pdf_path.name] = {"fichier": pdf_path.name, "erreur": str(e),
"pages_traitees": [], "pages_ignorees": []}
timing_cache[pdf_path.name] = {
"fichier": pdf_path.name, "erreur_globale": str(e),
"debut": None, "fin": None, "duree_totale_s": None,
"nb_pages_total": 0, "pages": [], "erreurs": [], "blocages_429": [],
}
all_results = sorted(cache.values(), key=lambda r: r["fichier"])
all_timings = sorted(timing_cache.values(), key=lambda t: t["fichier"])
export_excel(all_results, all_timings, OUTPUT_DIR / "extraction_ogc.xlsx")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
print(f"✓ JSON : {json_path}")
with open(timing_path, "w", encoding="utf-8") as f:
json.dump(all_timings, f, ensure_ascii=False, indent=2)
print(f"✓ Timing JSON : {timing_path}")
rapport_path = OUTPUT_DIR / "rapport_timing.pdf"
build_timing_pdf(all_timings, rapport_path)
print(f"✓ Rapport PDF : {rapport_path}")
if __name__ == "__main__":
main()