Files
ScanOGC_extract/extraction.py
oussi 8192834f31 V2
2026-04-27 17:32:21 +02:00

1299 lines
62 KiB
Python

"""
Extraction OGC → Excel
Modèle : qwen3-vl:235b-cloud (vision multimodal) via Ollama
"""
import base64
import io
import json
import re
import sys
import time
from datetime import datetime
from pathlib import Path
import pandas as pd
import requests
from pdf2image import convert_from_path
from PIL import Image
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable
)
# ─── Config ───────────────────────────────────────────────────────────────────
SCAN_DIR = Path(__file__).parent / "scanOgc"
OUTPUT_DIR = Path(__file__).parent / "output"
OUTPUT_DIR.mkdir(exist_ok=True)
OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL = "qwen3-vl:235b-cloud"
PDF_DPI = 250
# Rate-limit : pause entre chaque appel et retry sur 429
INTER_REQUEST_DELAY = 2 # secondes
RETRY_MAX = 6
RETRY_DELAY_429 = 60 # secondes — plafond à 120s dans ask_vision
# ─── Utilitaires image ────────────────────────────────────────────────────────
def image_to_b64(img: Image.Image) -> str:
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
return base64.b64encode(buf.getvalue()).decode()
# ─── Appel Ollama ─────────────────────────────────────────────────────────────
def ask_vision(prompt: str, img: Image.Image,
timeout: int = 240, num_predict: int = 8192,
timing_record: dict = None) -> str:
"""
Envoie une image + prompt à Ollama en mode streaming.
- qwen3-vl utilise ~4000 tokens de "thinking" avant la réponse :
num_predict=8192 est nécessaire pour avoir assez de budget.
- Retry automatique sur 429 (rate limit cloud).
- timing_record : dict optionnel pour enregistrer retries/blocages.
"""
payload = {
"model": MODEL,
"prompt": prompt,
"images": [image_to_b64(img)],
"stream": True,
"options": {"temperature": 0, "num_predict": num_predict},
}
for attempt in range(1, RETRY_MAX + 1):
try:
resp = requests.post(OLLAMA_URL, json=payload,
timeout=timeout, stream=True)
if resp.status_code == 429:
wait = min(RETRY_DELAY_429 * attempt, 120)
print(f" ⏳ Rate limit — attente {wait}s "
f"(tentative {attempt}/{RETRY_MAX})...")
if timing_record is not None:
timing_record.setdefault("blocages_429", []).append({
"tentative": attempt,
"attente_s": wait,
"ts": datetime.now().isoformat(),
})
time.sleep(wait)
continue
resp.raise_for_status()
tokens = []
for line in resp.iter_lines():
if not line:
continue
try:
chunk = json.loads(line)
except json.JSONDecodeError:
continue
if chunk.get("response"):
tokens.append(chunk["response"])
if chunk.get("done"):
break
if timing_record is not None and attempt > 1:
timing_record["retries_total"] = \
timing_record.get("retries_total", 0) + (attempt - 1)
time.sleep(INTER_REQUEST_DELAY)
return "".join(tokens)
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
wait = min(RETRY_DELAY_429 * attempt, 120)
print(f" ⏳ Rate limit — attente {wait}s "
f"(tentative {attempt}/{RETRY_MAX})...")
if timing_record is not None:
timing_record.setdefault("blocages_429", []).append({
"tentative": attempt,
"attente_s": wait,
"ts": datetime.now().isoformat(),
})
time.sleep(wait)
continue
raise
raise RuntimeError(f"Echec après {RETRY_MAX} tentatives (rate limit persistant)")
# ─── Extraction JSON depuis la réponse ───────────────────────────────────────
def _try_parse(text: str):
for candidate in (
text,
text.replace("\n", " ").replace("\r", " "),
re.sub(r",\s*([}\]])", r"\1", text), # trailing commas
re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text), # control chars
):
try:
return json.loads(candidate)
except json.JSONDecodeError:
pass
return None
def _extract_balanced(text: str, open_c: str, close_c: str):
"""Extrait la première structure équilibrée open_c…close_c du texte."""
start = text.find(open_c)
if start == -1:
return None
depth = 0
in_str = False
escape = False
for i, ch in enumerate(text[start:], start):
if escape:
escape = False
continue
if ch == "\\" and in_str:
escape = True
continue
if ch == '"' and not escape:
in_str = not in_str
continue
if in_str:
continue
if ch == open_c:
depth += 1
elif ch == close_c:
depth -= 1
if depth == 0:
return text[start:i+1]
return None
def extract_json(text: str):
# 1. Bloc ```json … ```
m = re.search(r"```json\s*([\s\S]*?)```", text)
if m:
result = _try_parse(m.group(1).strip())
if result is not None:
return result
# 2. Extraction par accolades équilibrées (plus robuste que greedy regex)
for open_c, close_c in (('{', '}'), ('[', ']')):
candidate = _extract_balanced(text, open_c, close_c)
if candidate:
result = _try_parse(candidate)
if result is not None:
return result
# 3. Fallback greedy regex (comportement original)
for pattern in (r"(\{[\s\S]*\})", r"(\[[\s\S]*\])"):
m = re.search(pattern, text)
if m:
result = _try_parse(m.group(1))
if result is not None:
return result
return None
# ─── Prompts ──────────────────────────────────────────────────────────────────
PROMPT_IDENTIFY = """\
Tu es un assistant d'analyse de documents médicaux français.
Regarde cette image et identifie son type parmi :
- FICHE_RECUEIL : "FICHE MEDICALE DE RECUEIL DU PRATICIEN CONSEIL"
- FICHE_CONCERTATION_VIDE: "FICHE MEDICALE DE CONCERTATION" (page quasi vide)
- SEJOUR_MANUSCRIT : "Séjour d'hospitalisation complète" (colonnes manuscrites)
- ELEMENTS_PREUVE : "Eléments de preuve tracés au dossier du patient"
- FICHE_ADMIN_2_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 2/2"
- FICHE_ADMIN_1_2 : "FICHE ADMINISTRATIVE DE CONCERTATION 1/2"
- AUTRE : autre type
Réponds UNIQUEMENT avec le code du type, sans aucune explication.\
"""
PROMPT_FICHE_RECUEIL = """\
Tu es un assistant d'extraction de données médicales.
Extrait toutes les informations imprimées de cette fiche médicale de recueil du praticien conseil.
RÈGLES STRICTES :
- Si un champ n'a pas de valeur clairement visible et imprimée, retourner une chaîne vide "".
- Ne jamais deviner, inférer ou compléter un champ absent.
- Le champ "provenance" est souvent vide : ne pas le remplir sauf si une valeur est explicitement imprimée.
- Le champ "se_coche" correspond aux cases 1/2/3/4 : retourner "SE1", "SE2", "SE3" ou "SE4" si une case est explicitement cochée, sinon "". Ce champ est TRÈS SOUVENT vide — ne rien mettre par défaut. NE PAS confondre avec "accord_desaccord" qui est un champ séparé.
- Le champ "accord_desaccord" est distinct de "se_coche" : il indique accord/désaccord du praticien conseil, pas les cases SE.
- Le champ "dr_etab" (Diagnostic Relié) est distinct des DAS : ne mettre un code que s'il y a une ligne DR EXPLICITEMENT RENSEIGNÉE sur la fiche. Si la ligne DR est vide ou absente sur le document, retourner "" obligatoirement. NE JAMAIS copier le premier DAS dans DR — ce sont deux lignes séparées sur la grille.
- Le tableau "Données du séjour" contient ces colonnes DANS CET ORDRE EXACT, de gauche à droite :
Age(ans) | Age(jours) | Sexe | Délai dern. règles | Age gestation | Poids d'entrée |
Durée de séjour | Mode d'entrée | Provenance | Mode de sortie | Destination |
Nb séances | Nb RUM | Nb j EXH | Type EXB | Nb j EXB
RÈGLE ABSOLUE : lire chaque valeur dans sa colonne uniquement.
Si une colonne est vide, retourner "" pour ce champ.
Ne jamais décaler les valeurs vers la gauche pour compenser une cellule vide.
Exemple : si "Provenance" est vide, "Mode de sortie" reste dans "mode_sortie", pas dans "provenance".
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
- IMPORTANT : extraire TOUTES les lignes non vides de das_etab, actes_etab, das_reco et actes_reco sans limite de nombre. Ne jamais tronquer ces listes.
- Les actes (CCAM, codes à 7+ caractères commençant par des lettres ex: JDPE002, NJFA008) vont dans "actes_etab", pas dans "das_etab". Les diagnostics (CIM-10, codes courts ex: N320, R33) vont dans "das_etab".
{"n_ogc":"","etablissement":"","finess":"","date_debut_controle":"","n_champ":"","libelle_champ":"","dossier_manquant":"","date_debut_sejour":"","date_fin_sejour":"",
"sejour_etab":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
"sejour_reco":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
"rum_etab":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
"rum_reco":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
"dp_etab":{"code":"","libelle":""},"dr_etab":{"code":"","libelle":""},
"das_etab": [] ou [{"code":"","niveau":"","libelle":""}] ou plus,
"actes_etab":[{"code":"","niveau":"","libelle":""}],
"dp_reco":{"code":""},"dr_reco":{"code":""},
"das_reco":[{"code":"","niveau":""}],"actes_reco":[{"code":"","niveau":""}],
"ghm_etab":"","ghs_etab":"","ghm_reco":"","ghs_reco":"",
"recodage_impactant_facturation":"","ghs_injustifie":"",
"se_coche":"","atu":"","ffm":"","fsd":"","accord_desaccord":"","nom_praticien_conseil":""}\
"""
PROMPT_ELEMENTS_PREUVE = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de cette page "Eléments de preuve tracés au dossier du patient".
Pour chaque ligne : "present"=oui/non, "photocopie"=nombre écrit, dates si présentes.
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"date":"","medecin_controleur_signataire":"","medecin_dim_signataire":"",
"elements":{"compte_rendu_acte":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_operatoire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_accouchement":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_examen_complementaire":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_imagerie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_anatomopathologie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"observations_medicales":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"dossier_transfusion":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"dossier_anesthesie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"administration_therapeutique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"compte_rendu_hospitalisation":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"lettre_sortie":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"surveillance_dossier_infirmier":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"prise_en_charge_psychologue":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"prise_en_charge_kinesitherapeute":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"prise_en_charge_dietetique":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""},
"autre":{"present":"","photocopie":"","absent_date_1ere_demande":"","date_obtention":""}}}\
"""
PROMPT_FICHE_ADMIN_2_2 = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de cette fiche administrative de concertation 2/2.
RÈGLES STRICTES :
- Pour "maintien_avis_controleur", "retour_groupage_dim", "autre_groupage" : retourner "oui" si la case est cochée (X, ✓ ou toute marque), "non" si la case est décochée, "" si absent.
- Pour les champs GHS (nombres) : retourner uniquement les chiffres sans point ni espace (ex: "6173" et non "6.173").
- Si un champ est absent ou illisible, retourner "".
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"n_ogc":"","ghs_initial":"","ghs_avant_concertation":"","ghs_final_apres_concertation":"",
"maintien_avis_controleur":"","retour_groupage_dim":"","autre_groupage":"",
"avis_dim_final":"","date_concertation":"",
"nom_medecin_responsable_controle":"","nom_medecin_dim":""}\
"""
PROMPT_FICHE_ADMIN_1_2 = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de cette fiche administrative de concertation 1/2.
L'argumentaire est un texte long imprimé (pas manuscrit).
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"n_ogc":"","date_concertation":"","argumentaire_medecin_controleur":""}\
"""
PROMPT_SEJOUR_MANUSCRIT = """\
Tu es un assistant d'extraction de données médicales.
Cette page est intitulée "Séjour d'hospitalisation complète".
Elle comporte deux colonnes de texte entièrement manuscrit :
- Colonne gauche : "Commentaires du médecin contrôleur"
- Colonne droite : "Commentaires du médecin du DIM"
RÈGLES STRICTES :
- Le texte peut déborder largement EN DESSOUS du tableau et légèrement sur les côtés des colonnes : inclure TOUT le texte visible de chaque colonne, y compris celui qui dépasse les bordures.
- Retranscrire le texte manuscrit tel quel, y compris abréviations et codes médicaux.
- Pour attribuer un texte débordant à la bonne colonne : le texte débordant d'une colonne lui appartient même s'il dépasse physiquement le cadre.
- Si une colonne est illisible ou vide, retourner "".
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"commentaire_medecin_controleur":"","commentaire_medecin_dim":""}\
"""
PROMPTS = {
"FICHE_RECUEIL": PROMPT_FICHE_RECUEIL,
"SEJOUR_MANUSCRIT": PROMPT_SEJOUR_MANUSCRIT,
"ELEMENTS_PREUVE": PROMPT_ELEMENTS_PREUVE,
"FICHE_ADMIN_2_2": PROMPT_FICHE_ADMIN_2_2,
"FICHE_ADMIN_1_2": PROMPT_FICHE_ADMIN_1_2,
}
SKIP_TYPES = {"FICHE_CONCERTATION_VIDE", "AUTRE"}
# ─── Découpage zones FICHE_RECUEIL ───────────────────────────────────────────
def crop_zone(img: Image.Image, y_start: float, y_end: float) -> Image.Image:
W, H = img.size
return img.crop((0, int(y_start * H), W, int(y_end * H)))
PROMPT_RECUEIL_Z1 = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations d'en-tête et des tableaux "Données du séjour" et "Données du RUM"
de cette portion de fiche médicale de recueil du praticien conseil.
RÈGLES STRICTES :
- Si un champ n'a pas de valeur clairement visible, retourner "".
- Ne jamais deviner, inférer ou compléter un champ absent.
- Le champ "provenance" est très souvent vide : ne le remplir QUE si une valeur est explicitement imprimée.
- Le tableau "Données du séjour" contient ces colonnes DANS CET ORDRE EXACT, de gauche à droite :
Age(ans) | Age(jours) | Sexe | Délai dern. règles | Age gestation | Poids d'entrée |
Durée de séjour | Mode d'entrée | Provenance | Mode de sortie | Destination |
Nb séances | Nb RUM | Nb j EXH | Type EXB | Nb j EXB
RÈGLE ABSOLUE : lire chaque valeur dans sa colonne uniquement. Si une colonne est vide, retourner "".
Ne jamais décaler les valeurs vers la gauche pour compenser une cellule vide.
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"n_ogc":"","etablissement":"","finess":"","date_debut_controle":"","n_champ":"","libelle_champ":"","dossier_manquant":"","date_debut_sejour":"","date_fin_sejour":"",
"sejour_etab":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
"sejour_reco":{"age_ans":"","age_jours":"","sexe":"","poids_entree":"","duree_sejour":"","mode_entree":"","provenance":"","mode_sortie":"","destination":"","nb_seances":"","nb_rum":"","nb_j_exh":"","type_exb":"","nb_j_exb":""},
"rum_etab":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""},
"rum_reco":{"n_rum":"","lits_dedies_sp":"","um":"","igs_ii":"","duree_rum_debut":"","duree_rum_fin":"","nature_suppl":"","nb_suppl":""}}\
"""
PROMPT_RECUEIL_Z2 = """\
Tu es un assistant d'extraction de données médicales.
Extrait uniquement les lignes DP et DR de la section "Codage de l'Établissement"
et leur colonne "Recodage" correspondante.
RÈGLES STRICTES :
- Le formulaire comporte exactement UNE ligne pour le DP et UNE ligne pour le DR.
- Pour chaque ligne : si la colonne "Recodage" en face est vide sur le document, retourner "" pour dp_reco/dr_reco.
- Si la ligne DR est entièrement vide sur le document, retourner {"code":"","libelle":""} pour dr_etab et "" pour dr_reco.
- Ne jamais copier le code d'une autre ligne dans DP ou DR.
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"dp_etab":{"code":"","libelle":""},"dr_etab":{"code":"","libelle":""},"dp_reco":{"code":""},"dr_reco":{"code":""}}\
"""
PROMPT_RECUEIL_Z3 = """\
Tu es un assistant d'extraction de données médicales.
Extrait toutes les lignes DAS et Actes de cette section de la fiche médicale de recueil.
RÈGLES STRICTES :
- Extraire TOUTES les lignes non vides, sans limite de nombre.
- Les diagnostics (codes CIM-10 courts, ex: R33, E43, Z515) vont dans "das_etab" / "das_reco".
- Les actes (codes CCAM longs, 7+ caractères commençant par lettres, ex: JDPE002, NJFA008) vont dans "actes_etab" / "actes_reco".
- Ne jamais mettre un code CCAM dans das_etab, ni un code CIM-10 dans actes_etab.
- Si la colonne "Recodage" d'une ligne est vide, ne pas créer d'entrée dans das_reco/actes_reco pour cette ligne.
- Ne pas retourner les lignes entièrement vides.
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"das_etab":[{"code":"","niveau":"","libelle":""}],"actes_etab":[{"code":"","niveau":"","libelle":""}],"das_reco":[{"code":"","niveau":""}],"actes_reco":[{"code":"","niveau":""}]}\
"""
PROMPT_RECUEIL_Z4 = """\
Tu es un assistant d'extraction de données médicales.
Extrait les informations de la barre GHM/GHS et de la zone décision de cette portion de fiche médicale.
RÈGLES STRICTES :
- Pour "se_coche" : retourner "SE1", "SE2", "SE3" ou "SE4" si une case est explicitement cochée, sinon "". Ce champ est TRÈS SOUVENT vide.
- Pour les valeurs GHS (nombres) : retourner uniquement les chiffres sans point ni espace (ex: "4169" et non "4.169").
- Pour "accord_desaccord" : retourner "accord" ou "désaccord" selon la case cochée.
- Pour "atu", "ffm", "fsd" : retourner "oui" si la case est cochée, "" sinon.
- Si un champ est absent ou illisible, retourner "".
Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après.
{"ghm_etab":"","ghs_etab":"","ghm_reco":"","ghs_reco":"","recodage_impactant_facturation":"","ghs_injustifie":"","se_coche":"","atu":"","ffm":"","fsd":"","accord_desaccord":"","decision_finale":"","nom_praticien_conseil":""}\
"""
# (y_start, y_end, prompt, num_predict) — légère superposition aux jointures pour ne pas couper une ligne
# Z2 finit à 0.415 pour inclure les lignes DP+DR qui sont sous le header "Codage"
# Z4 commence à 0.822 pour capturer la barre GHM/GHS qui précède les cases SE
RECUEIL_ZONES = [
(0.000, 0.325, PROMPT_RECUEIL_Z1, 8192),
(0.310, 0.415, PROMPT_RECUEIL_Z2, 3000),
(0.358, 0.815, PROMPT_RECUEIL_Z3, 8192),
# Z4 commence à 0.800 pour absorber les décalages de scan sur la ligne GHM/GHS
(0.800, 1.000, PROMPT_RECUEIL_Z4, 3000),
]
def _ask_zone(zone_img: Image.Image, prompt: str, num_predict: int,
timing_record: dict | None) -> dict:
"""Appelle ask_vision sur une zone, parse le JSON, gère un retry si nécessaire."""
raw = ask_vision(prompt, zone_img, timeout=240, num_predict=num_predict,
timing_record=timing_record)
data = extract_json(raw)
if data is None:
retry_prompt = (
"Ta réponse précédente n'était pas un JSON valide. "
"Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après, "
"sans bloc ```json```. Voici le schéma attendu :\n\n" + prompt
)
raw2 = ask_vision(retry_prompt, zone_img, timeout=240, num_predict=num_predict,
timing_record=timing_record)
data = extract_json(raw2)
return data or {}
def extract_fiche_recueil_zones(img: Image.Image,
timing_record: dict | None = None) -> tuple[dict, list]:
"""
Extrait une FICHE_RECUEIL par découpage en 4 zones.
Retourne (merged_data, zones_timing).
"""
merged: dict = {}
zones_timing: list = []
for idx, (y0, y1, prompt, num_pred) in enumerate(RECUEIL_ZONES, start=1):
t0 = time.time()
zone_img = crop_zone(img, y0, y1)
try:
data = _ask_zone(zone_img, prompt, num_pred, timing_record)
except Exception as e:
print(f" ⚠ Zone {idx}/4 erreur : {e}")
data = {}
elapsed = round(time.time() - t0, 2)
zones_timing.append({"zone": idx, "duree_s": elapsed})
print(f" → Zone {idx}/4 OK ({elapsed:.1f}s)")
merged.update(data)
return merged, zones_timing
# ─── Traitement d'un PDF ──────────────────────────────────────────────────────
# ─── Normalisation post-extraction ───────────────────────────────────────────
_CHECKBOX_OUI = {"x", "oui", "", "", "coché", "v", "yes"}
def _norm_checkbox(val: str) -> str:
"""Convertit toute marque de case cochée en 'oui', conserve 'non', vide sinon."""
v = str(val).strip().lower()
if v in _CHECKBOX_OUI:
return "oui"
if v == "non":
return "non"
return ""
def _calc_duree_rum(debut: str, fin: str):
"""Calcule la durée en jours entre deux dates (DD/MM/YYYY ou YYYY-MM-DD). Retourne None si non parsable."""
for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y"):
try:
d1 = datetime.strptime(str(debut).strip(), fmt)
d2 = datetime.strptime(str(fin).strip(), fmt)
return (d2 - d1).days
except (ValueError, AttributeError):
pass
return None
def _strip_dot_number(val: str) -> str:
"""Supprime le point parasite dans un nombre (ex: '6.173''6173', '.0''0')."""
v = str(val).strip()
# Nombre entier avec un point au milieu (pas une vraie décimale)
if re.match(r"^\d+\.\d+$", v):
cleaned = v.replace(".", "")
return cleaned
# Point en début de chaîne
if v.startswith(".") and v[1:].isdigit():
return v[1:]
return v
def _fix_year(val: str) -> str:
"""
Corrige les erreurs de lecture d'année manuscrite.
Les dates OGC sont dans la plage 2015-2019.
Si une année hors plage est détectée (ex: 2021, 2022),
la remplace par l'année valide la plus proche.
"""
if not val:
return val
m = re.search(r"(20\d\d)", val)
if m:
year = m.group(1)
valid_years = ("2015", "2016", "2017", "2018", "2019")
if year not in valid_years:
best = min(valid_years, key=lambda y: abs(int(y) - int(year)))
val = val.replace(year, best)
return val
def _normalize_result(result: dict) -> None:
"""Normalise les données extraites en place (checkboxes, chiffres mal lus)."""
for pt in result.get("pages_traitees", []):
d = pt.get("data", {})
if not isinstance(d, dict):
continue
ptype = pt.get("type")
if ptype == "FICHE_ADMIN_2_2":
for field in ("maintien_avis_controleur", "retour_groupage_dim", "autre_groupage"):
if field in d:
d[field] = _norm_checkbox(d[field])
# GHS : supprimer points parasites
for field in ("ghs_initial", "ghs_avant_concertation", "ghs_final_apres_concertation"):
if d.get(field):
d[field] = _strip_dot_number(d[field])
if ptype == "FICHE_RECUEIL":
# Guard anti-confusion DR/DAS
for dr_k, das_k in (("dr_etab", "das_etab"), ("dr_reco", "das_reco")):
dr_code = (d.get(dr_k) or {}).get("code", "").strip()
if not dr_code:
continue
das = [x for x in (d.get(das_k) or []) if isinstance(x, dict) and x.get("code")]
das_codes = {x.get("code", "").strip() for x in das}
if dr_code in das_codes:
# Cas 1 : DR duplique un DAS existant → vider DR
d[dr_k] = {"code": "", "libelle": ""}
elif not das:
# Cas 2 : DAS vide mais DR renseigné → confusion modèle,
# déplacer le DR dans das comme premier DAS
dr_entry = d.get(dr_k) or {}
new_das_entry = {"code": dr_code, "rang": dr_entry.get("rang", "")}
if dr_k == "dr_etab":
new_das_entry["libelle"] = dr_entry.get("libelle", "")
d[das_k] = [new_das_entry]
d[dr_k] = {"code": "", "libelle": ""}
# nature_suppl souvent lu '.0' au lieu de '0'
for section in ("rum_etab", "rum_reco"):
sec = d.get(section) or {}
if sec.get("nature_suppl"):
sec["nature_suppl"] = _strip_dot_number(sec["nature_suppl"])
# durée calculée à partir des dates (plus fiable que la valeur extraite)
duree = _calc_duree_rum(sec.get("duree_rum_debut", ""), sec.get("duree_rum_fin", ""))
if duree is not None:
sec["duree_rum_calculee_j"] = duree
# se_coche : normaliser "1"→"SE1", rejeter toute valeur non SE1-4
se_raw = str(d.get("se_coche", "")).strip()
if se_raw.upper() in {"SE1", "SE2", "SE3", "SE4"}:
d["se_coche"] = se_raw.upper()
elif se_raw in {"1", "2", "3", "4"}:
d["se_coche"] = ""
elif se_raw:
d["se_coche"] = ""
# accord_desaccord : forcer minuscule + alias orthographiques
acc = str(d.get("accord_desaccord", "")).strip().lower()
acc = acc.replace("é", "e").replace("desaccord", "désaccord")
if acc in {"accord", "désaccord"}:
d["accord_desaccord"] = acc
elif acc:
d["accord_desaccord"] = ""
# decision_finale : forcer minuscule
df = str(d.get("decision_finale", "")).strip().lower()
d["decision_finale"] = df
if ptype in ("FICHE_ADMIN_2_2", "FICHE_ADMIN_1_2"):
for date_field in ("date_concertation",):
if d.get(date_field):
d[date_field] = _fix_year(d[date_field])
if ptype == "ELEMENTS_PREUVE":
if d.get("date"):
d["date"] = _fix_year(d["date"])
# ─── Calcul d'audit de fiabilité ─────────────────────────────────────────────
def compute_audit(result: dict) -> dict:
"""
Calcule un bloc _audit pour l'OGC.
score_global ∈ [0,1] — seuil d'alerte : 0.80
alertes = champs dont le score < 0.80
"""
checks: list[tuple[str, float]] = [] # (champ, score)
for pt in result.get("pages_traitees", []):
ptype = pt.get("type")
d = pt.get("data", {})
page = pt.get("page", "?")
if not isinstance(d, dict):
continue
# JSON non parsé → données non fiables
if "raw_response" in d:
checks.append((f"page_{page}_json", 0.10))
continue
if ptype == "FICHE_RECUEIL":
# n_ogc vide
checks.append(("n_ogc", 1.0 if d.get("n_ogc") else 0.20))
# dr_etab non vide → historiquement souvent faux (confondu avec DAS)
dr_code = (d.get("dr_etab") or {}).get("code", "")
checks.append(("dr_etab", 0.31 if dr_code else 1.0))
# provenance non vide → souvent halluciné
prov = str((d.get("sejour_etab") or {}).get("provenance", "")).strip()
checks.append(("sejour_etab.provenance", 0.40 if prov else 1.0))
# se_coche non vide → souvent halluciné ; doit valoir SE1/SE2/SE3/SE4 ou ""
se_val = str(d.get("se_coche", "")).strip().lower()
if not se_val:
checks.append(("se_coche", 1.0))
elif se_val in {"se1", "se2", "se3", "se4", "1", "2", "3", "4"}:
checks.append(("se_coche", 0.90)) # valeur plausible mais vérifier format
else:
# confusion probable avec accord_desaccord ou autre valeur inattendue
checks.append(("se_coche", 0.20))
# DAS vide alors que DP présent → probablement tronqué
dp_code = (d.get("dp_etab") or {}).get("code", "")
das = [x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")]
if dp_code and not das:
checks.append(("das_etab", 0.50))
else:
checks.append(("das_etab", 1.0))
# Code DAS ressemble à un acte (≥7 chars, 4 premières lettres)
acte_like = any(
len(x.get("code", "")) >= 7 and x.get("code", "")[:4].isalpha()
for x in das
)
checks.append(("das_etab.codes", 0.35 if acte_like else 1.0))
elif ptype == "FICHE_ADMIN_2_2":
# Au moins une case doit être cochée
maintien = str(d.get("maintien_avis_controleur", "")).strip().lower()
retour = str(d.get("retour_groupage_dim", "")).strip().lower()
autre = str(d.get("autre_groupage", "")).strip().lower()
aucun_coche = not any(v == "oui" for v in (maintien, retour, autre))
checks.append(("maintien_retour_autre", 0.50 if aucun_coche else 1.0))
# GHS final encore avec point → mal lu
ghs = str(d.get("ghs_final_apres_concertation", "")).strip()
checks.append(("ghs_final", 0.40 if ("." in ghs and ghs) else 1.0))
elif ptype == "ELEMENTS_PREUVE":
# Ne flag que les lettres en début de chaîne seules (ex: "A", "B")
# ou des séquences de 3+ lettres consécutives — pas "A2", "1.a3" qui sont valides
suspect = any(
bool(re.search(r"(?<!\w)[A-Za-z]{3,}|^[A-Za-z]\s*$",
str((v or {}).get("photocopie", ""))))
for v in (d.get("elements") or {}).values()
if isinstance(v, dict)
)
checks.append(("elements.photocopie", 0.40 if suspect else 1.0))
if not checks:
score_global = 1.0
alertes = []
else:
scores = [s for _, s in checks]
score_global = round(sum(scores) / len(scores), 2)
alertes = [
{"champ": champ, "score": score}
for champ, score in checks
if score < 0.80
]
return {
"score_global": score_global,
"alertes": alertes,
"modele": MODEL,
"date_extraction": datetime.now().strftime("%Y-%m-%d"),
}
# ─── Traitement d'un PDF ──────────────────────────────────────────────────────
def process_pdf(pdf_path: Path) -> tuple[dict, dict]:
"""Retourne (result, timing) où timing contient toutes les métriques de temps."""
print(f"\n{'='*60}\nTraitement : {pdf_path.name}\n{'='*60}")
pdf_start = time.time()
timing = {
"fichier": pdf_path.name,
"debut": datetime.now().isoformat(),
"fin": None,
"duree_totale_s": None,
"nb_pages_total": 0,
"pages": [],
"erreurs": [],
"blocages_429": [],
"retries_total": 0,
}
pages = convert_from_path(str(pdf_path), dpi=PDF_DPI)
timing["nb_pages_total"] = len(pages)
result = {"fichier": pdf_path.name, "pages_traitees": [], "pages_ignorees": []}
for i, img in enumerate(pages, start=1):
print(f"\n Page {i}/{len(pages)} — identification...")
page_timing = {
"page": i,
"type": None,
"duree_identification_s": None,
"duree_extraction_s": None,
"statut": None,
"erreur": None,
}
t0 = time.time()
try:
raw_type = ask_vision(PROMPT_IDENTIFY, img,
timeout=200, num_predict=512,
timing_record=timing).strip().upper()
except Exception as e:
print(f" ⚠ Erreur identification : {e}")
page_timing["duree_identification_s"] = round(time.time() - t0, 2)
page_timing["statut"] = "erreur_identification"
page_timing["erreur"] = str(e)
timing["erreurs"].append({"page": i, "phase": "identification", "message": str(e)})
timing["pages"].append(page_timing)
result["pages_ignorees"].append({"page": i, "type": "ERREUR_IDENTIFICATION"})
continue
duree_id = round(time.time() - t0, 2)
page_timing["duree_identification_s"] = duree_id
page_type = "AUTRE"
for known in PROMPTS.keys() | SKIP_TYPES:
if known in raw_type:
page_type = known
break
page_timing["type"] = page_type
print(f" → Type : {page_type} ({duree_id:.1f}s)")
if page_type in SKIP_TYPES:
page_timing["statut"] = "ignoree"
timing["pages"].append(page_timing)
result["pages_ignorees"].append({"page": i, "type": page_type})
print(" → Ignorée.")
continue
print(" → Extraction en cours...")
t0 = time.time()
if page_type == "FICHE_RECUEIL":
try:
data, zones_t = extract_fiche_recueil_zones(img, timing_record=timing)
page_timing["duree_extraction_s"] = round(time.time() - t0, 2)
page_timing["zones_timing"] = zones_t
page_timing["statut"] = "ok"
except Exception as e:
print(f" ⚠ Erreur extraction zones : {e}")
page_timing["duree_extraction_s"] = round(time.time() - t0, 2)
page_timing["statut"] = "erreur_extraction"
page_timing["erreur"] = str(e)
timing["erreurs"].append({"page": i, "phase": "extraction", "type": page_type, "message": str(e)})
timing["pages"].append(page_timing)
result["pages_traitees"].append({"page": i, "type": page_type, "data": {"erreur": str(e)}})
continue
else:
try:
raw = ask_vision(PROMPTS[page_type], img, timeout=240,
num_predict=8192, timing_record=timing)
except Exception as e:
print(f" ⚠ Erreur extraction : {e}")
page_timing["duree_extraction_s"] = round(time.time() - t0, 2)
page_timing["statut"] = "erreur_extraction"
page_timing["erreur"] = str(e)
timing["erreurs"].append({"page": i, "phase": "extraction", "type": page_type, "message": str(e)})
timing["pages"].append(page_timing)
result["pages_traitees"].append({"page": i, "type": page_type, "data": {"erreur": str(e)}})
continue
page_timing["duree_extraction_s"] = round(time.time() - t0, 2)
print(f" → Réponse reçue ({page_timing['duree_extraction_s']:.1f}s)")
data = extract_json(raw)
if data is None:
print(f" ⚠ JSON non parsable — retry en cours...")
retry_prompt = (
"Ta réponse précédente n'était pas un JSON valide. "
"Réponds UNIQUEMENT avec un objet JSON valide, sans texte avant ni après, "
"sans bloc ```json```. Voici le schéma attendu :\n\n"
+ PROMPTS[page_type]
)
try:
raw2 = ask_vision(retry_prompt, img, timeout=240, num_predict=12000,
timing_record=timing)
data = extract_json(raw2)
except Exception as e:
print(f" ⚠ Erreur retry : {e}")
data = None
if data is None:
print(f" ⚠ Retry échoué — raw_response conservé")
page_timing["statut"] = "json_non_parsable"
timing["erreurs"].append({
"page": i, "phase": "parsing_json", "type": page_type,
"message": f"JSON non parsable après retry : {raw[:100]}",
"retry": True,
})
data = {"raw_response": raw}
else:
print(f" ✓ Retry réussi")
page_timing["statut"] = "ok_after_retry"
timing["erreurs"].append({
"page": i, "phase": "parsing_json", "type": page_type,
"message": "JSON non parsable au 1er appel, corrigé par retry",
"retry": True, "retry_ok": True,
})
else:
page_timing["statut"] = "ok"
timing["pages"].append(page_timing)
result["pages_traitees"].append({"page": i, "type": page_type, "data": data})
print(" ✓ OK")
timing["fin"] = datetime.now().isoformat()
timing["duree_totale_s"] = round(time.time() - pdf_start, 2)
_normalize_result(result)
result["_audit"] = compute_audit(result)
return result, timing
# ─── Aplatissement pour Excel ─────────────────────────────────────────────────
def flatten(result: dict) -> dict:
row = {"fichier": result["fichier"]}
general_done = False # champs généraux pris sur la 1re page FICHE_RECUEIL uniquement
for pt in result["pages_traitees"]:
d, ptype = pt["data"], pt["type"]
if ptype == "FICHE_RECUEIL":
# ── Champs généraux (par séjour, identiques sur chaque page RUM) ──
if not general_done:
for k in ["n_ogc","etablissement","finess","date_debut_controle","n_champ",
"libelle_champ","dossier_manquant","date_debut_sejour","date_fin_sejour"]:
row[k] = d.get(k, "")
for prefix in ("sejour_etab","sejour_reco"):
for k, v in (d.get(prefix) or {}).items():
row[f"{prefix}_{k}"] = v
row["dp_etab_code"] = (d.get("dp_etab") or {}).get("code", "")
row["dp_etab_libelle"] = (d.get("dp_etab") or {}).get("libelle", "")
row["dr_etab_code"] = (d.get("dr_etab") or {}).get("code", "")
row["dr_etab_libelle"] = (d.get("dr_etab") or {}).get("libelle", "")
row["dp_reco_code"] = (d.get("dp_reco") or {}).get("code", "")
row["dr_reco_code"] = (d.get("dr_reco") or {}).get("code", "")
for k in ["ghm_etab","ghs_etab","ghm_reco","ghs_reco",
"recodage_impactant_facturation","ghs_injustifie",
"se_coche","atu","ffm","fsd","accord_desaccord",
"decision_finale","nom_praticien_conseil"]:
row[k] = d.get(k, "")
general_done = True
# ── Comptages et durées agrégés sur tous les RUM ──
row["nb_das_etab"] = row.get("nb_das_etab", 0) + len([x for x in (d.get("das_etab") or []) if isinstance(x, dict) and x.get("code")])
row["nb_actes_etab"] = row.get("nb_actes_etab", 0) + len([x for x in (d.get("actes_etab") or []) if isinstance(x, dict) and x.get("code")])
row["nb_das_reco"] = row.get("nb_das_reco", 0) + len([x for x in (d.get("das_reco") or []) if isinstance(x, dict) and x.get("code")])
row["nb_actes_reco"] = row.get("nb_actes_reco", 0) + len([x for x in (d.get("actes_reco") or []) if isinstance(x, dict) and x.get("code")])
for section, col in (("rum_etab", "duree_sejour_calc_etab_j"),
("rum_reco", "duree_sejour_calc_reco_j")):
duree = (d.get(section) or {}).get("duree_rum_calculee_j")
if duree is not None:
row[col] = row.get(col, 0) + duree
elif ptype == "ELEMENTS_PREUVE":
row["ep_date"] = d.get("date", "")
row["ep_medecin_controleur"] = d.get("medecin_controleur_signataire", "")
row["ep_medecin_dim"] = d.get("medecin_dim_signataire", "")
for doc, vals in (d.get("elements") or {}).items():
for col, val in (vals or {}).items():
row[f"ep_{doc}_{col}"] = val
elif ptype == "FICHE_ADMIN_2_2":
if not row.get("n_ogc"):
row["n_ogc"] = d.get("n_ogc", "")
for k in ["ghs_initial","ghs_avant_concertation","ghs_final_apres_concertation",
"maintien_avis_controleur","retour_groupage_dim","autre_groupage",
"avis_dim_final","date_concertation",
"nom_medecin_responsable_controle","nom_medecin_dim"]:
row[f"admin22_{k}"] = d.get(k, "")
elif ptype == "FICHE_ADMIN_1_2":
row["admin12_date_concertation"] = d.get("date_concertation", "")
row["admin12_argumentaire"] = d.get("argumentaire_medecin_controleur", "")
elif ptype == "SEJOUR_MANUSCRIT":
row["sejour_ms_controleur"] = d.get("commentaire_medecin_controleur", "")
row["sejour_ms_dim"] = d.get("commentaire_medecin_dim", "")
# ── RÈGLE MÉTIER GHS FINAL ─────────────────────────────────────────────
# ghs_final_apres_concertation est manuscrit et souvent mal lu.
# On le recalcule depuis les cases cochées (valeurs imprimées, plus fiables) :
# - maintien_avis_controleur coché → ghs_final = ghs_initial
# - retour_groupage_dim coché → ghs_final = ghs_avant_concertation
# - autre_groupage coché → ghs_final = valeur manuscrite extraite (on garde)
# Pour désactiver cette règle : supprimez le bloc entre les deux lignes de tirets.
maintien = str(row.get("admin22_maintien_avis_controleur", "")).lower()
retour = str(row.get("admin22_retour_groupage_dim", "")).lower()
autre = str(row.get("admin22_autre_groupage", "")).lower()
if maintien == "oui":
row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_initial", "")
elif retour == "oui":
row["admin22_ghs_final_apres_concertation"] = row.get("admin22_ghs_avant_concertation", "")
# si autre_groupage == "oui" : on garde la valeur extraite (manuscrite)
# ── FIN RÈGLE MÉTIER GHS FINAL ────────────────────────────────────────
return row
def build_rum(result: dict) -> list:
"""1 ligne par RUM par OGC — données spécifiques au RUM."""
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "FICHE_RECUEIL":
continue
d = pt["data"]
ogc = d.get("n_ogc", result["fichier"])
row = {"n_ogc": ogc}
for prefix in ("rum_etab", "rum_reco"):
for k, v in (d.get(prefix) or {}).items():
row[f"{prefix}_{k}"] = v
rows.append(row)
return rows
def build_diagnostics(result: dict) -> list:
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "FICHE_RECUEIL":
continue
d = pt["data"]
ogc = d.get("n_ogc", result["fichier"])
n_rum = (d.get("rum_etab") or {}).get("n_rum", "")
for src, dp_k, dr_k, das_k in [
("etablissement", "dp_etab", "dr_etab", "das_etab"),
("recodage", "dp_reco", "dr_reco", "das_reco"),
]:
dp = d.get(dp_k) or {}
if dp.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DP",
"code": dp["code"], "niveau": "",
"libelle": dp.get("libelle", "")})
dr = d.get(dr_k) or {}
if dr.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DR",
"code": dr["code"], "niveau": "",
"libelle": dr.get("libelle", "")})
for das in (d.get(das_k) or []):
if isinstance(das, dict) and das.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src, "type": "DAS",
"code": das["code"], "niveau": das.get("niveau", ""),
"libelle": das.get("libelle", "")})
return rows
def build_actes(result: dict) -> list:
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "FICHE_RECUEIL":
continue
d = pt["data"]
ogc = d.get("n_ogc", result["fichier"])
n_rum = (d.get("rum_etab") or {}).get("n_rum", "")
for src, k in [("etablissement","actes_etab"), ("recodage","actes_reco")]:
for a in (d.get(k) or []):
if isinstance(a, dict) and a.get("code"):
rows.append({"n_ogc": ogc, "n_rum": n_rum, "source": src,
"code": a["code"], "niveau": a.get("niveau", ""),
"libelle": a.get("libelle", "")})
return rows
def build_elements_preuve(result: dict) -> list:
rows = []
for pt in result["pages_traitees"]:
if pt["type"] != "ELEMENTS_PREUVE":
continue
d = pt["data"]
ogc = result["fichier"]
for pt2 in result["pages_traitees"]:
if pt2["type"] == "FICHE_RECUEIL":
ogc = pt2["data"].get("n_ogc", ogc)
break
for doc, vals in (d.get("elements") or {}).items():
row = {"n_ogc": ogc, "document": doc}
row.update(vals or {})
rows.append(row)
return rows
# ─── Export Excel ─────────────────────────────────────────────────────────────
def export_excel(all_results: list, all_timings: list, path: Path):
df_main = pd.DataFrame([flatten(r) for r in all_results])
rum = sum((build_rum(r) for r in all_results), [])
diag = sum((build_diagnostics(r) for r in all_results), [])
actes = sum((build_actes(r) for r in all_results), [])
ep = sum((build_elements_preuve(r) for r in all_results), [])
df_rum = pd.DataFrame(rum) if rum else pd.DataFrame(columns=["n_ogc","rum_etab_n_rum","rum_reco_n_rum"])
df_diag = pd.DataFrame(diag) if diag else pd.DataFrame(columns=["n_ogc","n_rum","source","type","code","niveau","libelle"])
df_actes = pd.DataFrame(actes) if actes else pd.DataFrame(columns=["n_ogc","n_rum","source","code","niveau","libelle"])
df_ep = pd.DataFrame(ep) if ep else pd.DataFrame(columns=["n_ogc","document","present","photocopie"])
# Feuille Timing — résumé par dossier
timing_rows = []
for t in all_timings:
nb_erreurs = len(t.get("erreurs", []))
nb_429 = len(t.get("blocages_429", []))
attente_429 = sum(b["attente_s"] for b in t.get("blocages_429", []))
timing_rows.append({
"fichier": t["fichier"],
"debut": t.get("debut", ""),
"fin": t.get("fin", ""),
"duree_totale_s": t.get("duree_totale_s", ""),
"nb_pages": t.get("nb_pages_total", ""),
"nb_erreurs": nb_erreurs,
"nb_blocages_429": nb_429,
"attente_429_s": attente_429,
"retries_total": t.get("retries_total", 0),
})
df_timing = pd.DataFrame(timing_rows) if timing_rows else pd.DataFrame()
with pd.ExcelWriter(path, engine="openpyxl") as w:
df_main.to_excel(w, sheet_name="Données principales", index=False)
df_rum.to_excel(w, sheet_name="RUM", index=False)
df_diag.to_excel(w, sheet_name="Diagnostics", index=False)
df_actes.to_excel(w, sheet_name="Actes", index=False)
df_ep.to_excel(w, sheet_name="Eléments de preuve", index=False)
df_timing.to_excel(w, sheet_name="Timing", index=False)
print(f"\n✓ Excel : {path}")
print(f" Données principales : {len(df_main)} lignes")
print(f" RUM : {len(df_rum)} lignes")
print(f" Diagnostics : {len(df_diag)} lignes")
print(f" Actes : {len(df_actes)} lignes")
print(f" Eléments de preuve : {len(df_ep)} lignes")
print(f" Timing : {len(df_timing)} lignes")
# ─── Rapport PDF Timing ───────────────────────────────────────────────────────
def _fmt_s(s):
"""Formate des secondes en mm:ss ou hh:mm:ss lisible."""
if s is None:
return ""
s = int(s)
h, r = divmod(s, 3600)
m, sec = divmod(r, 60)
if h:
return f"{h}h{m:02d}m{sec:02d}s"
if m:
return f"{m}m{sec:02d}s"
return f"{sec}s"
def build_timing_pdf(all_timings: list, path: Path, model: str = MODEL):
"""Génère un rapport PDF d'analyse de temps d'extraction."""
doc = SimpleDocTemplate(
str(path), pagesize=A4,
leftMargin=2*cm, rightMargin=2*cm,
topMargin=2*cm, bottomMargin=2*cm,
)
styles = getSampleStyleSheet()
title_style = ParagraphStyle("title", parent=styles["Title"],
fontSize=18, spaceAfter=6)
h2_style = ParagraphStyle("h2", parent=styles["Heading2"],
fontSize=13, spaceBefore=14, spaceAfter=4)
h3_style = ParagraphStyle("h3", parent=styles["Heading3"],
fontSize=11, spaceBefore=10, spaceAfter=3)
body_style = ParagraphStyle("body", parent=styles["Normal"],
fontSize=9, spaceAfter=3)
warn_style = ParagraphStyle("warn", parent=styles["Normal"],
fontSize=9, textColor=colors.red, spaceAfter=3)
story = []
# ── Titre ──
story.append(Paragraph(f"Rapport d'analyse de temps — {model}", title_style))
story.append(Paragraph(f"Généré le {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}", body_style))
story.append(HRFlowable(width="100%", thickness=1, color=colors.grey))
story.append(Spacer(1, 0.4*cm))
# ── Résumé global ──
total_s = sum(t.get("duree_totale_s") or 0 for t in all_timings)
total_pages = sum(t.get("nb_pages_total") or 0 for t in all_timings)
total_err = sum(len(t.get("erreurs", [])) for t in all_timings)
total_429 = sum(len(t.get("blocages_429", [])) for t in all_timings)
total_wait = sum(b["attente_s"] for t in all_timings for b in t.get("blocages_429", []))
nb_dossiers = len(all_timings)
story.append(Paragraph("Résumé global", h2_style))
summary_data = [
["Métrique", "Valeur"],
["Nombre de dossiers traités", str(nb_dossiers)],
["Nombre de pages total", str(total_pages)],
["Durée totale d'extraction", _fmt_s(total_s)],
["Durée moyenne / dossier", _fmt_s(total_s / nb_dossiers) if nb_dossiers else ""],
["Durée moyenne / page", _fmt_s(total_s / total_pages) if total_pages else ""],
["Erreurs totales", str(total_err)],
["Blocages 429 (rate limit)", str(total_429)],
["Temps perdu en attente 429", _fmt_s(total_wait)],
]
t_sum = Table(summary_data, colWidths=[10*cm, 6*cm])
t_sum.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#2c3e50")),
("TEXTCOLOR", (0,0), (-1,0), colors.white),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("FONTSIZE", (0,0), (-1,-1), 9),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]),
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
("LEFTPADDING", (0,0), (-1,-1), 6),
("RIGHTPADDING",(0,0), (-1,-1), 6),
("TOPPADDING", (0,0), (-1,-1), 4),
("BOTTOMPADDING",(0,0),(-1,-1), 4),
]))
story.append(t_sum)
story.append(Spacer(1, 0.5*cm))
# ── Détail par dossier ──
story.append(Paragraph("Détail par dossier", h2_style))
for t in all_timings:
story.append(Paragraph(t["fichier"], h3_style))
nb_err = len(t.get("erreurs", []))
nb_b = len(t.get("blocages_429", []))
att = sum(b["attente_s"] for b in t.get("blocages_429", []))
# Calcul durée pages traitées
pages = t.get("pages", [])
duree_id = sum(p.get("duree_identification_s") or 0 for p in pages)
duree_ext = sum(p.get("duree_extraction_s") or 0 for p in pages)
nb_ok = sum(1 for p in pages if p.get("statut") == "ok")
nb_ign = sum(1 for p in pages if p.get("statut") == "ignoree")
rows = [
["Début", t.get("debut", "")[:19].replace("T", " ")],
["Fin", (t.get("fin") or "")[:19].replace("T", " ")],
["Durée totale", _fmt_s(t.get("duree_totale_s"))],
["Pages totales", str(t.get("nb_pages_total", ""))],
["Pages extraites (OK)", str(nb_ok)],
["Pages ignorées", str(nb_ign)],
["Temps identification", _fmt_s(duree_id)],
["Temps extraction", _fmt_s(duree_ext)],
["Erreurs", str(nb_err)],
["Blocages 429", str(nb_b)],
["Attente cumulée 429", _fmt_s(att)],
]
tbl = Table(rows, colWidths=[8*cm, 8*cm])
tbl.setStyle(TableStyle([
("FONTSIZE", (0,0), (-1,-1), 8),
("FONTNAME", (0,0), (0,-1), "Helvetica-Bold"),
("ROWBACKGROUNDS", (0,0), (-1,-1), [colors.HexColor("#f9f9f9"), colors.white]),
("GRID", (0,0), (-1,-1), 0.3, colors.lightgrey),
("LEFTPADDING", (0,0), (-1,-1), 5),
("TOPPADDING", (0,0), (-1,-1), 3),
("BOTTOMPADDING",(0,0),(-1,-1), 3),
]))
story.append(tbl)
# Détail pages
if pages:
story.append(Spacer(1, 0.2*cm))
story.append(Paragraph("Détail pages :", body_style))
page_rows = [["Page", "Type", "Identification", "Extraction", "Statut"]]
for p in pages:
page_rows.append([
str(p["page"]),
p.get("type") or "",
_fmt_s(p.get("duree_identification_s")),
_fmt_s(p.get("duree_extraction_s")),
p.get("statut") or "",
])
tp = Table(page_rows, colWidths=[1.5*cm, 5*cm, 3*cm, 3*cm, 3.5*cm])
tp.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#34495e")),
("TEXTCOLOR", (0,0), (-1,0), colors.white),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("FONTSIZE", (0,0), (-1,-1), 7.5),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.HexColor("#f2f2f2"), colors.white]),
("GRID", (0,0), (-1,-1), 0.3, colors.grey),
("LEFTPADDING", (0,0), (-1,-1), 4),
("TOPPADDING", (0,0), (-1,-1), 3),
("BOTTOMPADDING",(0,0),(-1,-1), 3),
]))
story.append(tp)
# Erreurs
if t.get("erreurs"):
story.append(Spacer(1, 0.2*cm))
story.append(Paragraph("Erreurs :", warn_style))
for err in t["erreurs"]:
msg = f"Page {err['page']}{err['phase']} : {err['message'][:120]}"
story.append(Paragraph(msg, warn_style))
# Blocages 429
if t.get("blocages_429"):
story.append(Paragraph("Blocages rate limit (429) :", warn_style))
for b in t["blocages_429"]:
msg = (f"Tentative {b['tentative']} — attente {b['attente_s']}s "
f"à {b['ts'][:19].replace('T', ' ')}")
story.append(Paragraph(msg, warn_style))
story.append(Spacer(1, 0.4*cm))
story.append(HRFlowable(width="100%", thickness=0.5, color=colors.lightgrey))
doc.build(story)
print(f"✓ Rapport timing PDF : {path}")
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
pdf_files = sorted(SCAN_DIR.glob("*.pdf"))
if not pdf_files:
print(f"Aucun PDF dans {SCAN_DIR}")
sys.exit(1)
if len(sys.argv) > 1:
pdf_files = [f for f in pdf_files if sys.argv[1] in f.name]
if not pdf_files:
print(f"Aucun fichier pour '{sys.argv[1]}'")
sys.exit(1)
print(f"Modèle : {MODEL}")
print(f"Fichiers: {len(pdf_files)}")
for f in pdf_files:
print(f" - {f.name}")
# Charge le cache existant pour relances partielles
json_path = OUTPUT_DIR / "extraction_ogc_raw_qwen.json"
timing_path = OUTPUT_DIR / "timing_stats.json"
cache: dict[str, dict] = {}
timing_cache: dict[str, dict] = {}
if json_path.exists() and len(sys.argv) > 1:
with open(json_path, encoding="utf-8") as f:
for r in json.load(f):
cache[r["fichier"]] = r
print(f"({len(cache)} fichiers en cache)")
if timing_path.exists() and len(sys.argv) > 1:
with open(timing_path, encoding="utf-8") as f:
for t in json.load(f):
timing_cache[t["fichier"]] = t
for pdf_path in pdf_files:
try:
result, timing = process_pdf(pdf_path)
cache[pdf_path.name] = result
timing_cache[pdf_path.name] = timing
except Exception as e:
print(f"\n⚠ Erreur {pdf_path.name} : {e}")
cache[pdf_path.name] = {"fichier": pdf_path.name, "erreur": str(e),
"pages_traitees": [], "pages_ignorees": []}
timing_cache[pdf_path.name] = {
"fichier": pdf_path.name, "erreur_globale": str(e),
"debut": None, "fin": None, "duree_totale_s": None,
"nb_pages_total": 0, "pages": [], "erreurs": [], "blocages_429": [],
}
all_results = sorted(cache.values(), key=lambda r: r["fichier"])
all_timings = sorted(timing_cache.values(), key=lambda t: t["fichier"])
export_excel(all_results, all_timings, OUTPUT_DIR / "extraction_ogc.xlsx")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
print(f"✓ JSON : {json_path}")
with open(timing_path, "w", encoding="utf-8") as f:
json.dump(all_timings, f, ensure_ascii=False, indent=2)
print(f"✓ Timing JSON : {timing_path}")
rapport_path = OUTPUT_DIR / "rapport_timing.pdf"
build_timing_pdf(all_timings, rapport_path)
print(f"✓ Rapport PDF : {rapport_path}")
if __name__ == "__main__":
main()