1130 lines
52 KiB
Python
1130 lines
52 KiB
Python
"""
|
|
Extraction OGC → Excel
|
|
Modèle : qwen3-vl:235b-cloud (vision multimodal) via Ollama
|
|
"""
|
|
|
|
import base64
|
|
import io
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import requests
|
|
from pdf2image import convert_from_path
|
|
from PIL import Image
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.pagesizes import A4
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
|
from reportlab.lib.units import cm
|
|
from reportlab.platypus import (
|
|
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
|
|
)
|
|
|
|
# ─── Config ───────────────────────────────────────────────────────────────────
|
|
|
|
SCAN_DIR = Path(__file__).parent / "scanOgc"
|
|
OUTPUT_DIR = Path(__file__).parent / "output"
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
|
|
OLLAMA_URL = "http://localhost:11434/api/generate"
|
|
MODEL = "qwen3-vl:235b-cloud"
|
|
PDF_DPI = 250
|
|
|
|
# Rate-limit : pause entre chaque appel et retry sur 429
|
|
INTER_REQUEST_DELAY = 2 # secondes
|
|
RETRY_MAX = 6
|
|
RETRY_DELAY_429 = 60 # secondes — plafond à 120s dans ask_vision
|
|
|
|
# ─── Utilitaires image ────────────────────────────────────────────────────────
|
|
|
|
def image_to_b64(img: Image.Image) -> str:
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="JPEG", quality=90)
|
|
return base64.b64encode(buf.getvalue()).decode()
|
|
|
|
|
|
# ─── Appel Ollama ─────────────────────────────────────────────────────────────
|
|
|
|
def ask_vision(prompt: str, img: Image.Image,
|
|
timeout: int = 240, num_predict: int = 8192,
|
|
timing_record: dict = None) -> str:
|
|
"""
|
|
Envoie une image + prompt à Ollama en mode streaming.
|
|
- qwen3-vl utilise ~4000 tokens de "thinking" avant la réponse :
|
|
num_predict=8192 est nécessaire pour avoir assez de budget.
|
|
- Retry automatique sur 429 (rate limit cloud).
|
|
- timing_record : dict optionnel pour enregistrer retries/blocages.
|
|
"""
|
|
payload = {
|
|
"model": MODEL,
|
|
"prompt": prompt,
|
|
"images": [image_to_b64(img)],
|
|
"stream": True,
|
|
"options": {"temperature": 0, "num_predict": num_predict},
|
|
}
|
|
|
|
for attempt in range(1, RETRY_MAX + 1):
|
|
try:
|
|
resp = requests.post(OLLAMA_URL, json=payload,
|
|
timeout=timeout, stream=True)
|
|
if resp.status_code == 429:
|
|
wait = min(RETRY_DELAY_429 * attempt, 120)
|
|
print(f" ⏳ Rate limit — attente {wait}s "
|
|
f"(tentative {attempt}/{RETRY_MAX})...")
|
|
if timing_record is not None:
|
|
timing_record.setdefault("blocages_429", []).append({
|
|
"tentative": attempt,
|
|
"attente_s": wait,
|
|
"ts": datetime.now().isoformat(),
|
|
})
|
|
time.sleep(wait)
|
|
continue
|
|
resp.raise_for_status()
|
|
|
|
tokens = []
|
|
for line in resp.iter_lines():
|
|
if not line:
|
|
continue
|
|
try:
|
|
chunk = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if chunk.get("response"):
|
|
tokens.append(chunk["response"])
|
|
if chunk.get("done"):
|
|
break
|
|
|
|
if timing_record is not None and attempt > 1:
|
|
timing_record["retries_total"] = \
|
|
timing_record.get("retries_total", 0) + (attempt - 1)
|
|
|
|
time.sleep(INTER_REQUEST_DELAY)
|
|
return "".join(tokens)
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response is not None and e.response.status_code == 429:
|
|
wait = min(RETRY_DELAY_429 * attempt, 120)
|
|
print(f" ⏳ Rate limit — attente {wait}s "
|
|
f"(tentative {attempt}/{RETRY_MAX})...")
|
|
if timing_record is not None:
|
|
timing_record.setdefault("blocages_429", []).append({
|
|
"tentative": attempt,
|
|
"attente_s": wait,
|
|
"ts": datetime.now().isoformat(),
|
|
})
|
|
time.sleep(wait)
|
|
continue
|
|
raise
|
|
|
|
raise RuntimeError(f"Echec après {RETRY_MAX} tentatives (rate limit persistant)")
|
|
|
|
|
|
# ─── Extraction JSON depuis la réponse ───────────────────────────────────────
|
|
|
|
def _try_parse(text: str):
|
|
for candidate in (
|
|
text,
|
|
text.replace("\n", " ").replace("\r", " "),
|
|
re.sub(r",\s*([}\]])", r"\1", text), # trailing commas
|
|
re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text), # control chars
|
|
):
|
|
try:
|
|
return json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _extract_balanced(text: str, open_c: str, close_c: str):
|
|
"""Extrait la première structure équilibrée open_c…close_c du texte."""
|
|
start = text.find(open_c)
|
|
if start == -1:
|
|
return None
|
|
depth = 0
|
|
in_str = False
|
|
escape = False
|
|
for i, ch in enumerate(text[start:], start):
|
|
if escape:
|
|
escape = False
|
|
continue
|
|
if ch == "\\" and in_str:
|
|
escape = True
|
|
continue
|
|
if ch == '"' and not escape:
|
|
in_str = not in_str
|
|
continue
|
|
if in_str:
|
|
continue
|
|
if ch == open_c:
|
|
depth += 1
|
|
elif ch == close_c:
|
|
depth -= 1
|
|
if depth == 0:
|
|
return text[start:i+1]
|
|
return None
|
|
|
|
|
|
def extract_json(text: str):
|
|
# 1. Bloc ```json … ```
|
|
m = re.search(r"```json\s*([\s\S]*?)```", text)
|
|
if m:
|
|
result = _try_parse(m.group(1).strip())
|
|
if result is not None:
|
|
return result
|
|
|
|
# 2. Extraction par accolades équilibrées (plus robuste que greedy regex)
|
|
for open_c, close_c in (('{', '}'), ('[', ']')):
|
|
candidate = _extract_balanced(text, open_c, close_c)
|
|
if candidate:
|
|
result = _try_parse(candidate)
|
|
if result is not None:
|
|
return result
|
|
|
|
# 3. Fallback greedy regex (comportement original)
|
|
for pattern in (r"(\{[\s\S]*\})", r"(\[[\s\S]*\])"):
|
|
m = re.search(pattern, text)
|
|
if m:
|
|
result = _try_parse(m.group(1))
|
|
if result is not None:
|
|
return result
|
|
|
|
return None
|
|
|
|
|
|
# ─── Prompts ──────────────────────────────────────────────────────────────────
|
|
|
|
PROMPT_IDENTIFY = """\
|
|
Tu es un assistant d'analyse de documents médicaux français.
|
|
Regarde cette image et identifie son type parmi :
|
|
- FICHE_RECUEIL : "FICHE MEDICALE DE RECUEIL DU PRATICIEN CONSEIL"
|
|
- FICHE_CONCERTATION_VIDE: "FICHE MEDICALE DE CONCERTATION" (page quasi vide)
|
|
- SEJOUR_MANUSCRIT : "Séjour d'hospitalisation complète" (colonnes manuscrites)
|
|
- ELEMENTS_PREUVE : "Eléments de preuve tracés au dossier du patient"
|
|
- FICHE_ADMIN_2_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 2/2"
|
|
- FICHE_ADMIN_1_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 1/2"
|
|
- AUTRE : autre type
|
|
|
|
Réponds UNIQUEMENT avec le code du type, sans aucune explication.\
|
|
"""
|
|
|
|
PROMPT_FICHE_RECUEIL = """\
|
|
Tu es un assistant d'extraction de données médicales.
|
|
Extrait toutes les informations imprimées de cette fiche médicale de recueil du praticien conseil.
|
|
RÈGLES STRICTES :
|
|
- Si un champ n'a pas de valeur clairement visible et imprimée, retourner une chaîne vide "".
|
|
- Ne jamais deviner, inférer ou compléter un champ absent.
|
|
- Le champ "provenance" est souvent vide : ne pas le remplir sauf si une valeur est explicitement imprimée.
|
|
- Le champ "se_coche" correspond aux cases 1/2/3/4 : retourner "SE1", "SE2", "SE3" ou "SE4" si une case est explicitement cochée, sinon "". Ce champ est TRÈS SOUVENT vide — ne rien mettre par défaut. NE PAS confondre avec "accord_desaccord" qui est un champ séparé.
|
|
- Le champ "accord_desaccord" est distinct de "se_coche" : il indique accord/désaccord du praticien conseil, pas les cases SE.
|
|
- Le champ "dr_etab" (Diagnostic Relié) est distinct des DAS : ne mettre un code que s'il y a une ligne DR EXPLICITEMENT RENSEIGNÉE sur la fiche. Si la ligne DR est vide ou absente sur le document, retourner "" obligatoirement. NE JAMAIS copier le premier DAS dans DR — ce sont deux lignes séparées sur la grille.
|
|
- Le tableau "Données du séjour" contient ces colonnes DANS CET ORDRE EXACT, de gauche à droite :
|
|
Age(ans) | Age(jours) | Sexe | Délai dern. règles | Age gestation | Poids d'entrée |
|
|
Durée de séjour | Mode d'entrée | Provenance | Mode de sortie | Destination |
|
|
Nb séances | Nb RUM | Nb j EXH | Type EXB | Nb j EXB
|
|
RÈGLE ABSOLUE : lire chaque valeur dans sa colonne uniquement.
|
|
Si une colonne est vide, retourner "" pour ce champ.
|
|
Ne jamais décaler les valeurs vers la gauche pour compenser une cellule vide.
|
|
Exemple : si "Provenance" est vide, "Mode de sortie" reste dans "mode_sortie", pas dans "provenance".
|
|
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
|
|
- IMPORTANT : extraire TOUTES les lignes non vides de das_etab, actes_etab, das_reco et actes_reco sans limite de nombre. Ne jamais tronquer ces listes.
|
|
- Les actes (CCAM, codes à 7+ caractères commençant par des lettres ex: JDPE002, NJFA008) vont dans "actes_etab", pas dans "das_etab". Les diagnostics (CIM-10, codes courts ex: N320, R33) vont dans "das_etab".
|
|
|
|
{"n_ogc":"","etablissement":"","finess":"","date_debut_controle":"","n_champ":"","libelle_champ":"","dossier_manquant":"","date_debut_sejour":"","date_fin_sejour":"",
|
|
"sejour_etab":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
|
|
"sejour_reco":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
|
|
"rum_etab":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
|
|
"rum_reco":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
|
|
"dp_etab":{"code":"","libelle":""},"dr_etab":{"code":"","libelle":""},
|
|
"das_etab": [] ou [{"code":"","niveau":"","libelle":""}] ou plus,
|
|
"actes_etab":[{"code":"","niveau":"","libelle":""}],
|
|
"dp_reco":{"code":""},"dr_reco":{"code":""},
|
|
"das_reco":[{"code":"","niveau":""}],"actes_reco":[{"code":"","niveau":""}],
|
|
"ghm_etab":"","ghs_etab":"","ghm_reco":"","ghs_reco":"",
|
|
"recodage_impactant_facturation":"","ghs_injustifie":"",
|
|
"se_coche":"","atu":"","ffm":"","fsd":"","accord_desaccord":"","nom_praticien_conseil":""}\
|
|
"""
|
|
|
|
PROMPT_ELEMENTS_PREUVE = """\
|
|
Tu es un assistant d'extraction de données médicales.
|
|
Extrait les informations de cette page "Eléments de preuve tracés au dossier du patient".
|
|
Pour chaque ligne : "present"=oui/non, "photocopie"=nombre écrit, dates si présentes.
|
|
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
|
|
|
|
{"date":"","medecin_controleur_signataire":"","medecin_dim_signataire":"",
|
|
"elements":{"compte_rendu_acte":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"compte_rendu_operatoire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"compte_rendu_accouchement":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"compte_rendu_examen_complementaire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"compte_rendu_imagerie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"compte_rendu_anatomopathologie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"observations_medicales":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"dossier_transfusion":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"dossier_anesthesie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"administration_therapeutique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"compte_rendu_hospitalisation":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"lettre_sortie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"surveillance_dossier_infirmier":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"prise_en_charge_psychologue":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"prise_en_charge_kinesitherapeute":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"prise_en_charge_dietetique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
|
|
"autre":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}}}\
|
|
"""
|
|
|
|
PROMPT_FICHE_ADMIN_2_2 = """\
|
|
Tu es un assistant d'extraction de données médicales.
|
|
Extrait les informations de cette fiche administrative de concertation 2/2.
|
|
RÈGLES STRICTES :
|
|
- Pour "maintien_avis_controleur", "retour_groupage_dim", "autre_groupage" : retourner "oui" si la case est cochée (X, ✓ ou toute marque), "non" si la case est décochée, "" si absent.
|
|
- Pour les champs GHS (nombres) : retourner uniquement les chiffres sans point ni espace (ex: "6173" et non "6.173").
|
|
- Si un champ est absent ou illisible, retourner "".
|
|
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
|
|
|
|
{"n_ogc":"","ghs_initial":"","ghs_avant_concertation":"","ghs_final_apres_concertation":"",
|
|
"maintien_avis_controleur":"","retour_groupage_dim":"","autre_groupage":"",
|
|
"avis_dim_final":"","date_concertation":"",
|
|
"nom_medecin_responsable_controle":"","nom_medecin_dim":""}\
|
|
"""
|
|
|
|
PROMPT_FICHE_ADMIN_1_2 = """\
|
|
Tu es un assistant d'extraction de données médicales.
|
|
Extrait les informations de cette fiche administrative de concertation 1/2.
|
|
L'argumentaire est un texte long imprimé (pas manuscrit).
|
|
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
|
|
|
|
{"n_ogc":"","date_concertation":"","argumentaire_medecin_controleur":""}\
|
|
"""
|
|
|
|
PROMPTS = {
|
|
"FICHE_RECUEIL": PROMPT_FICHE_RECUEIL,
|
|
"ELEMENTS_PREUVE": PROMPT_ELEMENTS_PREUVE,
|
|
"FICHE_ADMIN_2_2": PROMPT_FICHE_ADMIN_2_2,
|
|
"FICHE_ADMIN_1_2": PROMPT_FICHE_ADMIN_1_2,
|
|
}
|
|
|
|
SKIP_TYPES = {"SEJOUR_MANUSCRIT", "FICHE_CONCERTATION_VIDE", "AUTRE"}
|
|
|
|
# ─── Traitement d'un PDF ──────────────────────────────────────────────────────
|
|
|
|
# ─── Normalisation post-extraction ───────────────────────────────────────────
|
|
|
|
_CHECKBOX_OUI = {"x", "oui", "✓", "✗", "coché", "v", "yes"}
|
|
|
|
def _norm_checkbox(val: str) -> str:
|
|
"""Convertit toute marque de case cochée en 'oui', conserve 'non', vide sinon."""
|
|
v = str(val).strip().lower()
|
|
if v in _CHECKBOX_OUI:
|
|
return "oui"
|
|
if v == "non":
|
|
return "non"
|
|
return ""
|
|
|
|
|
|
def _calc_duree_rum(debut: str, fin: str):
|
|
"""Calcule la durée en jours entre deux dates (DD/MM/YYYY ou YYYY-MM-DD). Retourne None si non parsable."""
|
|
for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y"):
|
|
try:
|
|
d1 = datetime.strptime(str(debut).strip(), fmt)
|
|
d2 = datetime.strptime(str(fin).strip(), fmt)
|
|
return (d2 - d1).days
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _strip_dot_number(val: str) -> str:
|
|
"""Supprime le point parasite dans un nombre (ex: '6.173' → '6173', '.0' → '0')."""
|
|
v = str(val).strip()
|
|
# Nombre entier avec un point au milieu (pas une vraie décimale)
|
|
if re.match(r"^\d+\.\d+$", v):
|
|
cleaned = v.replace(".", "")
|
|
return cleaned
|
|
# Point en début de chaîne
|
|
if v.startswith(".") and v[1:].isdigit():
|
|
return v[1:]
|
|
return v
|
|
|
|
|
|
def _fix_year(val: str) -> str:
|
|
"""
|
|
Corrige les erreurs de lecture d'année manuscrite.
|
|
Les dates OGC sont dans la plage 2015-2019.
|
|
Si une année hors plage est détectée (ex: 2021, 2022),
|
|
la remplace par l'année valide la plus proche.
|
|
"""
|
|
if not val:
|
|
return val
|
|
m = re.search(r"(20\d\d)", val)
|
|
if m:
|
|
year = m.group(1)
|
|
valid_years = ("2015", "2016", "2017", "2018", "2019")
|
|
if year not in valid_years:
|
|
best = min(valid_years, key=lambda y: abs(int(y) - int(year)))
|
|
val = val.replace(year, best)
|
|
return val
|
|
|
|
|
|
def _normalize_result(result: dict) -> None:
|
|
"""Normalise les données extraites en place (checkboxes, chiffres mal lus)."""
|
|
for pt in result.get("pages_traitees", []):
|
|
d = pt.get("data", {})
|
|
if not isinstance(d, dict):
|
|
continue
|
|
|
|
ptype = pt.get("type")
|
|
|
|
if ptype == "FICHE_ADMIN_2_2":
|
|
for field in ("maintien_avis_controleur", "retour_groupage_dim", "autre_groupage"):
|
|
if field in d:
|
|
d[field] = _norm_checkbox(d[field])
|
|
# GHS : supprimer points parasites
|
|
for field in ("ghs_initial", "ghs_avant_concertation", "ghs_final_apres_concertation"):
|
|
if d.get(field):
|
|
d[field] = _strip_dot_number(d[field])
|
|
|
|
if ptype == "FICHE_RECUEIL":
|
|
# Guard anti-confusion DR/DAS
|
|
for dr_k, das_k in (("dr_etab", "das_etab"), ("dr_reco", "das_reco")):
|
|
dr_code = (d.get(dr_k) or {}).get("code", "").strip()
|
|
if not dr_code:
|
|
continue
|
|
das = [x for x in (d.get(das_k) or []) if isinstance(x, dict) and x.get("code")]
|
|
das_codes = {x.get("code", "").strip() for x in das}
|
|
|
|
if dr_code in das_codes:
|
|
# Cas 1 : DR duplique un DAS existant → vider DR
|
|
d[dr_k] = {"code": "", "libelle": ""}
|
|
|
|
elif not das:
|
|
# Cas 2 : DAS vide mais DR renseigné → confusion modèle,
|
|
# déplacer le DR dans das comme premier DAS
|
|
dr_entry = d.get(dr_k) or {}
|
|
new_das_entry = {"code": dr_code, "rang": dr_entry.get("rang", "")}
|
|
if dr_k == "dr_etab":
|
|
new_das_entry["libelle"] = dr_entry.get("libelle", "")
|
|
d[das_k] = [new_das_entry]
|
|
d[dr_k] = {"code": "", "libelle": ""}
|
|
|
|
# nature_suppl souvent lu '.0' au lieu de '0'
|
|
for section in ("rum_etab", "rum_reco"):
|
|
sec = d.get(section) or {}
|
|
if sec.get("nature_suppl"):
|
|
sec["nature_suppl"] = _strip_dot_number(sec["nature_suppl"])
|
|
# durée calculée à partir des dates (plus fiable que la valeur extraite)
|
|
duree = _calc_duree_rum(sec.get("duree_rum_debut", ""), sec.get("duree_rum_fin", ""))
|
|
if duree is not None:
|
|
sec["duree_rum_calculee_j"] = duree
|
|
# se_coche : normaliser "1"→"SE1", rejeter toute valeur non SE1-4
|
|
se_raw = str(d.get("se_coche", "")).strip()
|
|
if se_raw.upper() in {"SE1", "SE2", "SE3", "SE4"}:
|
|
# Format déjà correct
|
|
d["se_coche"] = se_raw.upper()
|
|
elif se_raw in {"1", "2", "3", "4"}:
|
|
# Chiffre seul = ambigu, le modèle confond avec le rang d'un DAS → vider
|
|
d["se_coche"] = ""
|
|
elif se_raw:
|
|
# Valeur inattendue (ex: "accord", "désaccord") → vider
|
|
d["se_coche"] = ""
|
|
|
|
if ptype in ("FICHE_ADMIN_2_2", "FICHE_ADMIN_1_2"):
|
|
for date_field in ("date_concertation",):
|
|
if d.get(date_field):
|
|
d[date_field] = _fix_year(d[date_field])
|
|
|
|
if ptype == "ELEMENTS_PREUVE":
|
|
if d.get("date"):
|
|
d["date"] = _fix_year(d["date"])
|
|
|
|
|
|
# ─── Calcul d'audit de fiabilité ─────────────────────────────────────────────
|
|
|
|
def compute_audit(result: dict) -> dict:
|
|
"""
|
|
Calcule un bloc _audit pour l'OGC.
|
|
score_global ∈ [0,1] — seuil d'alerte : 0.80
|
|
alertes = champs dont le score < 0.80
|
|
"""
|
|
checks: list[tuple[str, float]] = [] # (champ, score)
|
|
|
|
for pt in result.get("pages_traitees", []):
|
|
ptype = pt.get("type")
|
|
d = pt.get("data", {})
|
|
page = pt.get("page", "?")
|
|
|
|
if not isinstance(d, dict):
|
|
continue
|
|
|
|
# JSON non parsé → données non fiables
|
|
if "raw_response" in d:
|
|
checks.append((f"page_{page}_json", 0.10))
|
|
continue
|
|
|
|
if ptype == "FICHE_RECUEIL":
|
|
# n_ogc vide
|
|
checks.append(("n_ogc", 1.0 if d.get("n_ogc") else 0.20))
|
|
|
|
# dr_etab non vide → historiquement souvent faux (confondu avec DAS)
|
|
dr_code = (d.get("dr_etab") or {}).get("code", "")
|
|
checks.append(("dr_etab", 0.31 if dr_code else 1.0))
|
|
|
|
# provenance non vide → souvent halluciné
|
|
prov = str((d.get("sejour_etab") or {}).get("provenance", "")).strip()
|
|
checks.append(("sejour_etab.provenance", 0.40 if prov else 1.0))
|
|
|
|
# se_coche non vide → souvent halluciné ; doit valoir SE1/SE2/SE3/SE4 ou ""
|
|
se_val = str(d.get("se_coche", "")).strip().lower()
|
|
if not se_val:
|
|
checks.append(("se_coche", 1.0))
|
|
elif se_val in {"se1", "se2", "se3", "se4", "1", "2", "3", "4"}:
|
|
checks.append(("se_coche", 0.90)) # valeur plausible mais vérifier format
|
|
else:
|
|
# confusion probable avec accord_desaccord ou autre valeur inattendue
|
|
checks.append(("se_coche", 0.20))
|
|
|
|
# DAS vide alors que DP présent → probablement tronqué
|
|
dp_code = (d.get("dp_etab") or {}).get("code", "")
|
|
das = [x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")]
|
|
if dp_code and not das:
|
|
checks.append(("das_etab", 0.50))
|
|
else:
|
|
checks.append(("das_etab", 1.0))
|
|
|
|
# Code DAS ressemble à un acte (≥7 chars, 4 premières lettres)
|
|
acte_like = any(
|
|
len(x.get("code", "")) >= 7 and x.get("code", "")[:4].isalpha()
|
|
for x in das
|
|
)
|
|
checks.append(("das_etab.codes", 0.35 if acte_like else 1.0))
|
|
|
|
elif ptype == "FICHE_ADMIN_2_2":
|
|
# Au moins une case doit être cochée
|
|
maintien = str(d.get("maintien_avis_controleur", "")).strip().lower()
|
|
retour = str(d.get("retour_groupage_dim", "")).strip().lower()
|
|
autre = str(d.get("autre_groupage", "")).strip().lower()
|
|
aucun_coche = not any(v == "oui" for v in (maintien, retour, autre))
|
|
checks.append(("maintien_retour_autre", 0.50 if aucun_coche else 1.0))
|
|
|
|
# GHS final encore avec point → mal lu
|
|
ghs = str(d.get("ghs_final_apres_concertation", "")).strip()
|
|
checks.append(("ghs_final", 0.40 if ("." in ghs and ghs) else 1.0))
|
|
|
|
elif ptype == "ELEMENTS_PREUVE":
|
|
# Ne flag que les lettres en début de chaîne seules (ex: "A", "B")
|
|
# ou des séquences de 3+ lettres consécutives — pas "A2", "1.a3" qui sont valides
|
|
suspect = any(
|
|
bool(re.search(r"(?<!\w)[A-Za-z]{3,}|^[A-Za-z]\s*$",
|
|
str((v or {}).get("photocopie", ""))))
|
|
for v in (d.get("elements") or {}).values()
|
|
if isinstance(v, dict)
|
|
)
|
|
checks.append(("elements.photocopie", 0.40 if suspect else 1.0))
|
|
|
|
if not checks:
|
|
score_global = 1.0
|
|
alertes = []
|
|
else:
|
|
scores = [s for _, s in checks]
|
|
score_global = round(sum(scores) / len(scores), 2)
|
|
alertes = [
|
|
{"champ": champ, "score": score}
|
|
for champ, score in checks
|
|
if score < 0.80
|
|
]
|
|
|
|
return {
|
|
"score_global": score_global,
|
|
"alertes": alertes,
|
|
"modele": MODEL,
|
|
"date_extraction": datetime.now().strftime("%Y-%m-%d"),
|
|
}
|
|
|
|
|
|
# ─── Traitement d'un PDF ──────────────────────────────────────────────────────
|
|
|
|
def process_pdf(pdf_path: Path) -> tuple[dict, dict]:
|
|
"""Retourne (result, timing) où timing contient toutes les métriques de temps."""
|
|
print(f"\n{'='*60}\nTraitement : {pdf_path.name}\n{'='*60}")
|
|
|
|
pdf_start = time.time()
|
|
timing = {
|
|
"fichier": pdf_path.name,
|
|
"debut": datetime.now().isoformat(),
|
|
"fin": None,
|
|
"duree_totale_s": None,
|
|
"nb_pages_total": 0,
|
|
"pages": [],
|
|
"erreurs": [],
|
|
"blocages_429": [],
|
|
"retries_total": 0,
|
|
}
|
|
|
|
pages = convert_from_path(str(pdf_path), dpi=PDF_DPI)
|
|
timing["nb_pages_total"] = len(pages)
|
|
result = {"fichier": pdf_path.name, "pages_traitees": [], "pages_ignorees": []}
|
|
|
|
for i, img in enumerate(pages, start=1):
|
|
print(f"\n Page {i}/{len(pages)} — identification...")
|
|
page_timing = {
|
|
"page": i,
|
|
"type": None,
|
|
"duree_identification_s": None,
|
|
"duree_extraction_s": None,
|
|
"statut": None,
|
|
"erreur": None,
|
|
}
|
|
t0 = time.time()
|
|
|
|
try:
|
|
raw_type = ask_vision(PROMPT_IDENTIFY, img,
|
|
timeout=200, num_predict=512,
|
|
timing_record=timing).strip().upper()
|
|
except Exception as e:
|
|
print(f" ⚠ Erreur identification : {e}")
|
|
page_timing["duree_identification_s"] = round(time.time() - t0, 2)
|
|
page_timing["statut"] = "erreur_identification"
|
|
page_timing["erreur"] = str(e)
|
|
timing["erreurs"].append({"page": i, "phase": "identification", "message": str(e)})
|
|
timing["pages"].append(page_timing)
|
|
result["pages_ignorees"].append({"page": i, "type": "ERREUR_IDENTIFICATION"})
|
|
continue
|
|
|
|
duree_id = round(time.time() - t0, 2)
|
|
page_timing["duree_identification_s"] = duree_id
|
|
|
|
page_type = "AUTRE"
|
|
for known in PROMPTS.keys() | SKIP_TYPES:
|
|
if known in raw_type:
|
|
page_type = known
|
|
break
|
|
page_timing["type"] = page_type
|
|
print(f" → Type : {page_type} ({duree_id:.1f}s)")
|
|
|
|
if page_type in SKIP_TYPES:
|
|
page_timing["statut"] = "ignoree"
|
|
timing["pages"].append(page_timing)
|
|
result["pages_ignorees"].append({"page": i, "type": page_type})
|
|
print(" → Ignorée.")
|
|
continue
|
|
|
|
print(" → Extraction en cours...")
|
|
t0 = time.time()
|
|
try:
|
|
num_predict = 12000 if page_type == "FICHE_RECUEIL" else 8192
|
|
raw = ask_vision(PROMPTS[page_type], img, timeout=240,
|
|
num_predict=num_predict, timing_record=timing)
|
|
except Exception as e:
|
|
print(f" ⚠ Erreur extraction : {e}")
|
|
duree_ext = round(time.time() - t0, 2)
|
|
page_timing["duree_extraction_s"] = duree_ext
|
|
page_timing["statut"] = "erreur_extraction"
|
|
page_timing["erreur"] = str(e)
|
|
timing["erreurs"].append({"page": i, "phase": "extraction", "type": page_type, "message": str(e)})
|
|
timing["pages"].append(page_timing)
|
|
result["pages_traitees"].append({"page": i, "type": page_type,
|
|
"data": {"erreur": str(e)}})
|
|
continue
|
|
|
|
duree_ext = round(time.time() - t0, 2)
|
|
page_timing["duree_extraction_s"] = duree_ext
|
|
print(f" → Réponse reçue ({duree_ext:.1f}s)")
|
|
|
|
data = extract_json(raw)
|
|
if data is None:
|
|
print(f" ⚠ JSON non parsable — retry en cours...")
|
|
retry_prompt = (
|
|
"Ta réponse précédente n'était pas un JSON valide. "
|
|
"Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après, "
|
|
"sans bloc ```json```. Voici le schéma attendu :\n\n"
|
|
+ PROMPTS[page_type]
|
|
)
|
|
try:
|
|
raw2 = ask_vision(retry_prompt, img, timeout=240, num_predict=12000,
|
|
timing_record=timing)
|
|
data = extract_json(raw2)
|
|
except Exception as e:
|
|
print(f" ⚠ Erreur retry : {e}")
|
|
data = None
|
|
|
|
if data is None:
|
|
print(f" ⚠ Retry échoué — raw_response conservé")
|
|
page_timing["statut"] = "json_non_parsable"
|
|
timing["erreurs"].append({
|
|
"page": i, "phase": "parsing_json", "type": page_type,
|
|
"message": f"JSON non parsable après retry : {raw[:100]}",
|
|
"retry": True,
|
|
})
|
|
data = {"raw_response": raw}
|
|
else:
|
|
print(f" ✓ Retry réussi")
|
|
page_timing["statut"] = "ok_after_retry"
|
|
timing["erreurs"].append({
|
|
"page": i, "phase": "parsing_json", "type": page_type,
|
|
"message": "JSON non parsable au 1er appel, corrigé par retry",
|
|
"retry": True, "retry_ok": True,
|
|
})
|
|
else:
|
|
page_timing["statut"] = "ok"
|
|
|
|
timing["pages"].append(page_timing)
|
|
result["pages_traitees"].append({"page": i, "type": page_type, "data": data})
|
|
print(" ✓ OK")
|
|
|
|
timing["fin"] = datetime.now().isoformat()
|
|
timing["duree_totale_s"] = round(time.time() - pdf_start, 2)
|
|
|
|
_normalize_result(result)
|
|
result["_audit"] = compute_audit(result)
|
|
|
|
return result, timing
|
|
|
|
|
|
# ─── Aplatissement pour Excel ─────────────────────────────────────────────────
|
|
|
|
def flatten(result: dict) -> dict:
|
|
row = {"fichier": result["fichier"]}
|
|
general_done = False # champs généraux pris sur la 1re page FICHE_RECUEIL uniquement
|
|
for pt in result["pages_traitees"]:
|
|
d, ptype = pt["data"], pt["type"]
|
|
if ptype == "FICHE_RECUEIL":
|
|
# ── Champs généraux (par séjour, identiques sur chaque page RUM) ──
|
|
if not general_done:
|
|
for k in ["n_ogc","etablissement","finess","date_debut_controle","n_champ",
|
|
"libelle_champ","dossier_manquant","date_debut_sejour","date_fin_sejour"]:
|
|
row[k] = d.get(k, "")
|
|
for prefix in ("sejour_etab","sejour_reco"):
|
|
for k, v in (d.get(prefix) or {}).items():
|
|
row[f"{prefix}_{k}"] = v
|
|
row["dp_etab_code"] = (d.get("dp_etab") or {}).get("code", "")
|
|
row["dp_etab_libelle"] = (d.get("dp_etab") or {}).get("libelle", "")
|
|
row["dr_etab_code"] = (d.get("dr_etab") or {}).get("code", "")
|
|
row["dr_etab_libelle"] = (d.get("dr_etab") or {}).get("libelle", "")
|
|
row["dp_reco_code"] = (d.get("dp_reco") or {}).get("code", "")
|
|
row["dr_reco_code"] = (d.get("dr_reco") or {}).get("code", "")
|
|
for k in ["ghm_etab","ghs_etab","ghm_reco","ghs_reco",
|
|
"recodage_impactant_facturation","ghs_injustifie",
|
|
"se_coche","atu","ffm","fsd","accord_desaccord","nom_praticien_conseil"]:
|
|
row[k] = d.get(k, "")
|
|
general_done = True
|
|
# ── Comptages et durées agrégés sur tous les RUM ──
|
|
row["nb_das_etab"] = row.get("nb_das_etab", 0) + len([x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")])
|
|
row["nb_actes_etab"] = row.get("nb_actes_etab", 0) + len([x for x in (d.get("actes_etab") or []) if isinstance(x, dict) and x.get("code")])
|
|
row["nb_das_reco"] = row.get("nb_das_reco", 0) + len([x for x in (d.get("das_reco") or []) if isinstance(x, dict) and x.get("code")])
|
|
row["nb_actes_reco"] = row.get("nb_actes_reco", 0) + len([x for x in (d.get("actes_reco") or []) if isinstance(x, dict) and x.get("code")])
|
|
for section, col in (("rum_etab", "duree_sejour_calc_etab_j"),
|
|
("rum_reco", "duree_sejour_calc_reco_j")):
|
|
duree = (d.get(section) or {}).get("duree_rum_calculee_j")
|
|
if duree is not None:
|
|
row[col] = row.get(col, 0) + duree
|
|
elif ptype == "ELEMENTS_PREUVE":
|
|
row["ep_date"] = d.get("date", "")
|
|
row["ep_medecin_controleur"] = d.get("medecin_controleur_signataire", "")
|
|
row["ep_medecin_dim"] = d.get("medecin_dim_signataire", "")
|
|
for doc, vals in (d.get("elements") or {}).items():
|
|
for col, val in (vals or {}).items():
|
|
row[f"ep_{doc}_{col}"] = val
|
|
elif ptype == "FICHE_ADMIN_2_2":
|
|
if not row.get("n_ogc"):
|
|
row["n_ogc"] = d.get("n_ogc", "")
|
|
for k in ["ghs_initial","ghs_avant_concertation","ghs_final_apres_concertation",
|
|
"maintien_avis_controleur","retour_groupage_dim","autre_groupage",
|
|
"avis_dim_final","date_concertation",
|
|
"nom_medecin_responsable_controle","nom_medecin_dim"]:
|
|
row[f"admin22_{k}"] = d.get(k, "")
|
|
elif ptype == "FICHE_ADMIN_1_2":
|
|
row["admin12_date_concertation"] = d.get("date_concertation", "")
|
|
row["admin12_argumentaire"] = d.get("argumentaire_medecin_controleur", "")
|
|
|
|
# ── RÈGLE MÉTIER GHS FINAL ─────────────────────────────────────────────
|
|
# ghs_final_apres_concertation est manuscrit et souvent mal lu.
|
|
# On le recalcule depuis les cases cochées (valeurs imprimées, plus fiables) :
|
|
# - maintien_avis_controleur coché → ghs_final = ghs_initial
|
|
# - retour_groupage_dim coché → ghs_final = ghs_avant_concertation
|
|
# - autre_groupage coché → ghs_final = valeur manuscrite extraite (on garde)
|
|
# Pour désactiver cette règle : supprimez le bloc entre les deux lignes de tirets.
|
|
maintien = str(row.get("admin22_maintien_avis_controleur", "")).lower()
|
|
retour = str(row.get("admin22_retour_groupage_dim", "")).lower()
|
|
autre = str(row.get("admin22_autre_groupage", "")).lower()
|
|
if maintien == "oui":
|
|
row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_initial", "")
|
|
elif retour == "oui":
|
|
row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_avant_concertation", "")
|
|
# si autre_groupage == "oui" : on garde la valeur extraite (manuscrite)
|
|
# ── FIN RÈGLE MÉTIER GHS FINAL ────────────────────────────────────────
|
|
|
|
return row
|
|
|
|
|
|
def build_rum(result: dict) -> list:
|
|
"""1 ligne par RUM par OGC — données spécifiques au RUM."""
|
|
rows = []
|
|
for pt in result["pages_traitees"]:
|
|
if pt["type"] != "FICHE_RECUEIL":
|
|
continue
|
|
d = pt["data"]
|
|
ogc = d.get("n_ogc", result["fichier"])
|
|
row = {"n_ogc": ogc}
|
|
for prefix in ("rum_etab", "rum_reco"):
|
|
for k, v in (d.get(prefix) or {}).items():
|
|
row[f"{prefix}_{k}"] = v
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def build_diagnostics(result: dict) -> list:
|
|
rows = []
|
|
for pt in result["pages_traitees"]:
|
|
if pt["type"] != "FICHE_RECUEIL":
|
|
continue
|
|
d = pt["data"]
|
|
ogc = d.get("n_ogc", result["fichier"])
|
|
n_rum = (d.get("rum_etab") or {}).get("n_rum", "")
|
|
for src, dp_k, dr_k, das_k in [
|
|
("etablissement", "dp_etab", "dr_etab", "das_etab"),
|
|
("recodage", "dp_reco", "dr_reco", "das_reco"),
|
|
]:
|
|
dp = d.get(dp_k) or {}
|
|
if dp.get("code"):
|
|
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DP",
|
|
"code": dp["code"], "niveau": "",
|
|
"libelle": dp.get("libelle", "")})
|
|
dr = d.get(dr_k) or {}
|
|
if dr.get("code"):
|
|
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DR",
|
|
"code": dr["code"], "niveau": "",
|
|
"libelle": dr.get("libelle", "")})
|
|
for das in (d.get(das_k) or []):
|
|
if isinstance(das, dict) and das.get("code"):
|
|
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DAS",
|
|
"code": das["code"], "niveau": das.get("niveau", ""),
|
|
"libelle": das.get("libelle", "")})
|
|
return rows
|
|
|
|
|
|
def build_actes(result: dict) -> list:
|
|
rows = []
|
|
for pt in result["pages_traitees"]:
|
|
if pt["type"] != "FICHE_RECUEIL":
|
|
continue
|
|
d = pt["data"]
|
|
ogc = d.get("n_ogc", result["fichier"])
|
|
n_rum = (d.get("rum_etab") or {}).get("n_rum", "")
|
|
for src, k in [("etablissement","actes_etab"), ("recodage","actes_reco")]:
|
|
for a in (d.get(k) or []):
|
|
if isinstance(a, dict) and a.get("code"):
|
|
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src,
|
|
"code": a["code"], "niveau": a.get("niveau", ""),
|
|
"libelle": a.get("libelle", "")})
|
|
return rows
|
|
|
|
|
|
def build_elements_preuve(result: dict) -> list:
|
|
rows = []
|
|
for pt in result["pages_traitees"]:
|
|
if pt["type"] != "ELEMENTS_PREUVE":
|
|
continue
|
|
d = pt["data"]
|
|
ogc = result["fichier"]
|
|
for pt2 in result["pages_traitees"]:
|
|
if pt2["type"] == "FICHE_RECUEIL":
|
|
ogc = pt2["data"].get("n_ogc", ogc)
|
|
break
|
|
for doc, vals in (d.get("elements") or {}).items():
|
|
row = {"n_ogc": ogc, "document": doc}
|
|
row.update(vals or {})
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
# ─── Export Excel ─────────────────────────────────────────────────────────────
|
|
|
|
def export_excel(all_results: list, all_timings: list, path: Path):
|
|
df_main = pd.DataFrame([flatten(r) for r in all_results])
|
|
rum = sum((build_rum(r) for r in all_results), [])
|
|
diag = sum((build_diagnostics(r) for r in all_results), [])
|
|
actes = sum((build_actes(r) for r in all_results), [])
|
|
ep = sum((build_elements_preuve(r) for r in all_results), [])
|
|
|
|
df_rum = pd.DataFrame(rum) if rum else pd.DataFrame(columns=["n_ogc","rum_etab_n_rum","rum_reco_n_rum"])
|
|
df_diag = pd.DataFrame(diag) if diag else pd.DataFrame(columns=["n_ogc","n_rum","source","type","code","niveau","libelle"])
|
|
df_actes = pd.DataFrame(actes) if actes else pd.DataFrame(columns=["n_ogc","n_rum","source","code","niveau","libelle"])
|
|
df_ep = pd.DataFrame(ep) if ep else pd.DataFrame(columns=["n_ogc","document","present","photocopie"])
|
|
|
|
# Feuille Timing — résumé par dossier
|
|
timing_rows = []
|
|
for t in all_timings:
|
|
nb_erreurs = len(t.get("erreurs", []))
|
|
nb_429 = len(t.get("blocages_429", []))
|
|
attente_429 = sum(b["attente_s"] for b in t.get("blocages_429", []))
|
|
timing_rows.append({
|
|
"fichier": t["fichier"],
|
|
"debut": t.get("debut", ""),
|
|
"fin": t.get("fin", ""),
|
|
"duree_totale_s": t.get("duree_totale_s", ""),
|
|
"nb_pages": t.get("nb_pages_total", ""),
|
|
"nb_erreurs": nb_erreurs,
|
|
"nb_blocages_429": nb_429,
|
|
"attente_429_s": attente_429,
|
|
"retries_total": t.get("retries_total", 0),
|
|
})
|
|
df_timing = pd.DataFrame(timing_rows) if timing_rows else pd.DataFrame()
|
|
|
|
with pd.ExcelWriter(path, engine="openpyxl") as w:
|
|
df_main.to_excel(w, sheet_name="Données principales", index=False)
|
|
df_rum.to_excel(w, sheet_name="RUM", index=False)
|
|
df_diag.to_excel(w, sheet_name="Diagnostics", index=False)
|
|
df_actes.to_excel(w, sheet_name="Actes", index=False)
|
|
df_ep.to_excel(w, sheet_name="Eléments de preuve", index=False)
|
|
df_timing.to_excel(w, sheet_name="Timing", index=False)
|
|
|
|
print(f"\n✓ Excel : {path}")
|
|
print(f" Données principales : {len(df_main)} lignes")
|
|
print(f" RUM : {len(df_rum)} lignes")
|
|
print(f" Diagnostics : {len(df_diag)} lignes")
|
|
print(f" Actes : {len(df_actes)} lignes")
|
|
print(f" Eléments de preuve : {len(df_ep)} lignes")
|
|
print(f" Timing : {len(df_timing)} lignes")
|
|
|
|
|
|
# ─── Rapport PDF Timing ───────────────────────────────────────────────────────
|
|
|
|
def _fmt_s(s):
|
|
"""Formate des secondes en mm:ss ou hh:mm:ss lisible."""
|
|
if s is None:
|
|
return "—"
|
|
s = int(s)
|
|
h, r = divmod(s, 3600)
|
|
m, sec = divmod(r, 60)
|
|
if h:
|
|
return f"{h}h{m:02d}m{sec:02d}s"
|
|
if m:
|
|
return f"{m}m{sec:02d}s"
|
|
return f"{sec}s"
|
|
|
|
|
|
def build_timing_pdf(all_timings: list, path: Path, model: str = MODEL):
|
|
"""Génère un rapport PDF d'analyse de temps d'extraction."""
|
|
doc = SimpleDocTemplate(
|
|
str(path), pagesize=A4,
|
|
leftMargin=2*cm, rightMargin=2*cm,
|
|
topMargin=2*cm, bottomMargin=2*cm,
|
|
)
|
|
styles = getSampleStyleSheet()
|
|
title_style = ParagraphStyle("title", parent=styles["Title"],
|
|
fontSize=18, spaceAfter=6)
|
|
h2_style = ParagraphStyle("h2", parent=styles["Heading2"],
|
|
fontSize=13, spaceBefore=14, spaceAfter=4)
|
|
h3_style = ParagraphStyle("h3", parent=styles["Heading3"],
|
|
fontSize=11, spaceBefore=10, spaceAfter=3)
|
|
body_style = ParagraphStyle("body", parent=styles["Normal"],
|
|
fontSize=9, spaceAfter=3)
|
|
warn_style = ParagraphStyle("warn", parent=styles["Normal"],
|
|
fontSize=9, textColor=colors.red, spaceAfter=3)
|
|
|
|
story = []
|
|
|
|
# ── Titre ──
|
|
story.append(Paragraph(f"Rapport d'analyse de temps — {model}", title_style))
|
|
story.append(Paragraph(f"Généré le {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}", body_style))
|
|
story.append(HRFlowable(width="100%", thickness=1, color=colors.grey))
|
|
story.append(Spacer(1, 0.4*cm))
|
|
|
|
# ── Résumé global ──
|
|
total_s = sum(t.get("duree_totale_s") or 0 for t in all_timings)
|
|
total_pages = sum(t.get("nb_pages_total") or 0 for t in all_timings)
|
|
total_err = sum(len(t.get("erreurs", [])) for t in all_timings)
|
|
total_429 = sum(len(t.get("blocages_429", [])) for t in all_timings)
|
|
total_wait = sum(b["attente_s"] for t in all_timings for b in t.get("blocages_429", []))
|
|
nb_dossiers = len(all_timings)
|
|
|
|
story.append(Paragraph("Résumé global", h2_style))
|
|
summary_data = [
|
|
["Métrique", "Valeur"],
|
|
["Nombre de dossiers traités", str(nb_dossiers)],
|
|
["Nombre de pages total", str(total_pages)],
|
|
["Durée totale d'extraction", _fmt_s(total_s)],
|
|
["Durée moyenne / dossier", _fmt_s(total_s / nb_dossiers) if nb_dossiers else "—"],
|
|
["Durée moyenne / page", _fmt_s(total_s / total_pages) if total_pages else "—"],
|
|
["Erreurs totales", str(total_err)],
|
|
["Blocages 429 (rate limit)", str(total_429)],
|
|
["Temps perdu en attente 429", _fmt_s(total_wait)],
|
|
]
|
|
t_sum = Table(summary_data, colWidths=[10*cm, 6*cm])
|
|
t_sum.setStyle(TableStyle([
|
|
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#2c3e50")),
|
|
("TEXTCOLOR", (0,0), (-1,0), colors.white),
|
|
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
|
|
("FONTSIZE", (0,0), (-1,-1), 9),
|
|
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]),
|
|
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
|
|
("LEFTPADDING", (0,0), (-1,-1), 6),
|
|
("RIGHTPADDING",(0,0), (-1,-1), 6),
|
|
("TOPPADDING", (0,0), (-1,-1), 4),
|
|
("BOTTOMPADDING",(0,0),(-1,-1), 4),
|
|
]))
|
|
story.append(t_sum)
|
|
story.append(Spacer(1, 0.5*cm))
|
|
|
|
# ── Détail par dossier ──
|
|
story.append(Paragraph("Détail par dossier", h2_style))
|
|
for t in all_timings:
|
|
story.append(Paragraph(t["fichier"], h3_style))
|
|
nb_err = len(t.get("erreurs", []))
|
|
nb_b = len(t.get("blocages_429", []))
|
|
att = sum(b["attente_s"] for b in t.get("blocages_429", []))
|
|
|
|
# Calcul durée pages traitées
|
|
pages = t.get("pages", [])
|
|
duree_id = sum(p.get("duree_identification_s") or 0 for p in pages)
|
|
duree_ext = sum(p.get("duree_extraction_s") or 0 for p in pages)
|
|
nb_ok = sum(1 for p in pages if p.get("statut") == "ok")
|
|
nb_ign = sum(1 for p in pages if p.get("statut") == "ignoree")
|
|
|
|
rows = [
|
|
["Début", t.get("debut", "—")[:19].replace("T", " ")],
|
|
["Fin", (t.get("fin") or "—")[:19].replace("T", " ")],
|
|
["Durée totale", _fmt_s(t.get("duree_totale_s"))],
|
|
["Pages totales", str(t.get("nb_pages_total", "—"))],
|
|
["Pages extraites (OK)", str(nb_ok)],
|
|
["Pages ignorées", str(nb_ign)],
|
|
["Temps identification", _fmt_s(duree_id)],
|
|
["Temps extraction", _fmt_s(duree_ext)],
|
|
["Erreurs", str(nb_err)],
|
|
["Blocages 429", str(nb_b)],
|
|
["Attente cumulée 429", _fmt_s(att)],
|
|
]
|
|
tbl = Table(rows, colWidths=[8*cm, 8*cm])
|
|
tbl.setStyle(TableStyle([
|
|
("FONTSIZE", (0,0), (-1,-1), 8),
|
|
("FONTNAME", (0,0), (0,-1), "Helvetica-Bold"),
|
|
("ROWBACKGROUNDS", (0,0), (-1,-1), [colors.HexColor("#f9f9f9"), colors.white]),
|
|
("GRID", (0,0), (-1,-1), 0.3, colors.lightgrey),
|
|
("LEFTPADDING", (0,0), (-1,-1), 5),
|
|
("TOPPADDING", (0,0), (-1,-1), 3),
|
|
("BOTTOMPADDING",(0,0),(-1,-1), 3),
|
|
]))
|
|
story.append(tbl)
|
|
|
|
# Détail pages
|
|
if pages:
|
|
story.append(Spacer(1, 0.2*cm))
|
|
story.append(Paragraph("Détail pages :", body_style))
|
|
page_rows = [["Page", "Type", "Identification", "Extraction", "Statut"]]
|
|
for p in pages:
|
|
page_rows.append([
|
|
str(p["page"]),
|
|
p.get("type") or "—",
|
|
_fmt_s(p.get("duree_identification_s")),
|
|
_fmt_s(p.get("duree_extraction_s")),
|
|
p.get("statut") or "—",
|
|
])
|
|
tp = Table(page_rows, colWidths=[1.5*cm, 5*cm, 3*cm, 3*cm, 3.5*cm])
|
|
tp.setStyle(TableStyle([
|
|
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#34495e")),
|
|
("TEXTCOLOR", (0,0), (-1,0), colors.white),
|
|
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
|
|
("FONTSIZE", (0,0), (-1,-1), 7.5),
|
|
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]),
|
|
("GRID", (0,0), (-1,-1), 0.3, colors.grey),
|
|
("LEFTPADDING", (0,0), (-1,-1), 4),
|
|
("TOPPADDING", (0,0), (-1,-1), 3),
|
|
("BOTTOMPADDING",(0,0),(-1,-1), 3),
|
|
]))
|
|
story.append(tp)
|
|
|
|
# Erreurs
|
|
if t.get("erreurs"):
|
|
story.append(Spacer(1, 0.2*cm))
|
|
story.append(Paragraph("Erreurs :", warn_style))
|
|
for err in t["erreurs"]:
|
|
msg = f"Page {err['page']} — {err['phase']} : {err['message'][:120]}"
|
|
story.append(Paragraph(msg, warn_style))
|
|
|
|
# Blocages 429
|
|
if t.get("blocages_429"):
|
|
story.append(Paragraph("Blocages rate limit (429) :", warn_style))
|
|
for b in t["blocages_429"]:
|
|
msg = (f"Tentative {b['tentative']} — attente {b['attente_s']}s "
|
|
f"à {b['ts'][:19].replace('T', ' ')}")
|
|
story.append(Paragraph(msg, warn_style))
|
|
|
|
story.append(Spacer(1, 0.4*cm))
|
|
story.append(HRFlowable(width="100%", thickness=0.5, color=colors.lightgrey))
|
|
|
|
doc.build(story)
|
|
print(f"✓ Rapport timing PDF : {path}")
|
|
|
|
|
|
# ─── Main ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
pdf_files = sorted(SCAN_DIR.glob("*.pdf"))
|
|
if not pdf_files:
|
|
print(f"Aucun PDF dans {SCAN_DIR}")
|
|
sys.exit(1)
|
|
if len(sys.argv) > 1:
|
|
pdf_files = [f for f in pdf_files if sys.argv[1] in f.name]
|
|
if not pdf_files:
|
|
print(f"Aucun fichier pour '{sys.argv[1]}'")
|
|
sys.exit(1)
|
|
|
|
print(f"Modèle : {MODEL}")
|
|
print(f"Fichiers: {len(pdf_files)}")
|
|
for f in pdf_files:
|
|
print(f" - {f.name}")
|
|
|
|
# Charge le cache existant pour relances partielles
|
|
json_path = OUTPUT_DIR / "extraction_ogc_raw_qwen.json"
|
|
timing_path = OUTPUT_DIR / "timing_stats.json"
|
|
cache: dict[str, dict] = {}
|
|
timing_cache: dict[str, dict] = {}
|
|
|
|
if json_path.exists() and len(sys.argv) > 1:
|
|
with open(json_path, encoding="utf-8") as f:
|
|
for r in json.load(f):
|
|
cache[r["fichier"]] = r
|
|
print(f"({len(cache)} fichiers en cache)")
|
|
|
|
if timing_path.exists() and len(sys.argv) > 1:
|
|
with open(timing_path, encoding="utf-8") as f:
|
|
for t in json.load(f):
|
|
timing_cache[t["fichier"]] = t
|
|
|
|
for pdf_path in pdf_files:
|
|
try:
|
|
result, timing = process_pdf(pdf_path)
|
|
cache[pdf_path.name] = result
|
|
timing_cache[pdf_path.name] = timing
|
|
except Exception as e:
|
|
print(f"\n⚠ Erreur {pdf_path.name} : {e}")
|
|
cache[pdf_path.name] = {"fichier": pdf_path.name, "erreur": str(e),
|
|
"pages_traitees": [], "pages_ignorees": []}
|
|
timing_cache[pdf_path.name] = {
|
|
"fichier": pdf_path.name, "erreur_globale": str(e),
|
|
"debut": None, "fin": None, "duree_totale_s": None,
|
|
"nb_pages_total": 0, "pages": [], "erreurs": [], "blocages_429": [],
|
|
}
|
|
|
|
all_results = sorted(cache.values(), key=lambda r: r["fichier"])
|
|
all_timings = sorted(timing_cache.values(), key=lambda t: t["fichier"])
|
|
|
|
export_excel(all_results, all_timings, OUTPUT_DIR / "extraction_ogc.xlsx")
|
|
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(all_results, f, ensure_ascii=False, indent=2)
|
|
print(f"✓ JSON : {json_path}")
|
|
|
|
with open(timing_path, "w", encoding="utf-8") as f:
|
|
json.dump(all_timings, f, ensure_ascii=False, indent=2)
|
|
print(f"✓ Timing JSON : {timing_path}")
|
|
|
|
rapport_path = OUTPUT_DIR / "rapport_timing.pdf"
|
|
build_timing_pdf(all_timings, rapport_path)
|
|
print(f"✓ Rapport PDF : {rapport_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|