#!/usr/bin/env python3
"""
extract_t2a_llm.py — Extracteur T2A généraliste via OCR + LLM (Ollama)
Entrée : PDF (scanné ou natif) de document T2A (décision UCR, notification CPAM, rapport ARS…)
Sortie : Fichier Excel (.xlsx) avec les données structurées
Architecture :
PDF → OCR/texte natif → Détection type (1 appel LLM) → Extraction bloc par bloc (N appels LLM) → Excel
Usage :
python extract_t2a_llm.py FICHIER.pdf [--model gemma3:27b-it-qat] [--output out.xlsx] [--verbose]
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from pathlib import Path
import pymupdf
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
# ---------------------------------------------------------------------------
# 0. Normalisation texte OCR
# ---------------------------------------------------------------------------
def normalize_text(text: str) -> str:
"""Normalise les apostrophes, guillemets et espaces issus de l'OCR."""
text = text.replace("\u2018", "'").replace("\u2019", "'")
text = text.replace("\u201C", '"').replace("\u201D", '"')
text = text.replace("\u00AB", '"').replace("\u00BB", '"')
text = text.replace("''", "'")
text = text.replace("\u00A0", " ").replace("\u202F", " ")
text = re.sub(r"\bF'UCR\b", "l'UCR", text)
text = re.sub(r"\bl''UCR\b", "l'UCR", text)
return text
# ---------------------------------------------------------------------------
# 1. OCR / Extraction texte (docTR — deep learning, GPU)
# ---------------------------------------------------------------------------
_doctr_model = None
def _get_doctr_model():
"""Lazy-init du modèle docTR (chargé une seule fois, GPU si VRAM libre, sinon CPU)."""
global _doctr_model
if _doctr_model is not None:
return _doctr_model
from doctr.models import ocr_predictor
print(" Chargement du modèle docTR (première utilisation)...")
t0 = time.time()
_doctr_model = ocr_predictor(
det_arch="db_resnet50",
reco_arch="crnn_vgg16_bn",
pretrained=True,
)
# Déplacer sur GPU si disponible et assez de VRAM libre
try:
import torch
if torch.cuda.is_available():
free_vram = torch.cuda.mem_get_info()[0] / (1024 ** 3)
if free_vram > 1.0:
try:
_doctr_model = _doctr_model.cuda()
print(f" docTR sur GPU ({torch.cuda.get_device_name(0)}, "
f"{free_vram:.1f} Go libres) — {time.time() - t0:.1f}s")
except torch.cuda.OutOfMemoryError:
_doctr_model = _doctr_model.cpu()
torch.cuda.empty_cache()
print(f" GPU VRAM insuffisante, docTR sur CPU — {time.time() - t0:.1f}s")
else:
print(f" GPU VRAM trop basse ({free_vram:.1f} Go libres, Ollama ?), "
f"docTR sur CPU — {time.time() - t0:.1f}s")
else:
print(f" docTR sur CPU — {time.time() - t0:.1f}s")
except ImportError:
print(f" docTR sur CPU — {time.time() - t0:.1f}s")
return _doctr_model
def ocr_pdf(pdf_path: str, dpi: int = 300) -> str:
"""Extrait le texte du PDF : texte natif si disponible, sinon OCR docTR (GPU)."""
doc = pymupdf.open(pdf_path)
total = len(doc)
# Détection : texte natif vs scanné (sur la première page)
first_page_text = doc[0].get_text() if total > 0 else ""
is_native = len(first_page_text.strip()) > 100
if is_native:
print(" Mode : extraction texte natif (pymupdf)")
full_text = []
for i, page in enumerate(doc):
print(f" Extraction page {i+1}/{total}...", end="\r")
full_text.append(page.get_text())
print(f" Extraction terminée : {total} pages. ")
return normalize_text("\n\n".join(full_text))
# OCR docTR
print(" Mode : OCR docTR (deep learning, GPU)")
from doctr.io import DocumentFile
model = _get_doctr_model()
print(f" Lecture du PDF ({total} pages)...")
doc_pages = DocumentFile.from_pdf(pdf_path)
print(f" OCR en cours sur {len(doc_pages)} pages...")
t0 = time.time()
result = model(doc_pages)
elapsed = time.time() - t0
print(f" OCR terminé : {total} pages en {elapsed:.1f}s "
f"({elapsed/total:.1f}s/page)")
full_text = result.render()
return normalize_text(full_text)
# ---------------------------------------------------------------------------
# 2. Client Ollama
# ---------------------------------------------------------------------------
NO_FORMAT_JSON_PREFIXES = ("qwen3", "qwen2.5")
OLLAMA_URL = "http://localhost:11434"
def parse_json_response(raw: str) -> dict | list | None:
"""Parse une réponse JSON, en gérant les blocs markdown et le texte parasite."""
text = raw.strip()
# Supprimer les blocs ... (Qwen3)
text = re.sub(r".*?", "", text, flags=re.DOTALL).strip()
# Supprimer les blocs markdown ```json ... ```
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl != -1:
text = text[first_nl + 1:]
if text.rstrip().endswith("```"):
text = text.rstrip()[:-3]
text = text.strip()
# Tentative directe
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Extraire le premier objet ou tableau JSON
for start_char, end_char in [("{", "}"), ("[", "]")]:
start = text.find(start_char)
if start == -1:
continue
depth = 0
for i in range(start, len(text)):
if text[i] == start_char:
depth += 1
elif text[i] == end_char:
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
break
return None
def call_ollama(
prompt: str,
model: str,
temperature: float = 0.1,
max_tokens: int = 4000,
timeout: int = 120,
verbose: bool = False,
) -> dict | list | None:
"""Appelle Ollama. Utilise l'API chat avec think=false pour Qwen3."""
is_qwen = any(model.startswith(p) for p in NO_FORMAT_JSON_PREFIXES)
if is_qwen:
# API chat + think:false pour Qwen3 (pas de format JSON natif)
endpoint = f"{OLLAMA_URL}/api/chat"
body = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"think": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
}
else:
# API generate + format JSON natif pour les autres modèles
endpoint = f"{OLLAMA_URL}/api/generate"
body = {
"model": model,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
}
if verbose:
print(f"\n--- PROMPT ({model}) ---")
print(prompt[:500] + ("..." if len(prompt) > 500 else ""))
print("--- FIN PROMPT ---\n")
for attempt in range(2):
try:
t0 = time.time()
response = requests.post(endpoint, json=body, timeout=timeout)
elapsed = time.time() - t0
response.raise_for_status()
data = response.json()
# Extraire le texte de la réponse selon l'API utilisée
if is_qwen:
raw = data.get("message", {}).get("content", "")
else:
raw = data.get("response", "")
if verbose:
print(f"--- RÉPONSE ({elapsed:.1f}s) ---")
print(raw[:500] + ("..." if len(raw) > 500 else ""))
print("--- FIN RÉPONSE ---\n")
result = parse_json_response(raw)
if result is not None:
return result
if attempt == 0:
print(f" [warn] JSON invalide, retry... (raw: {raw[:100]})")
except requests.ConnectionError:
print("[ERREUR] Ollama non disponible sur localhost:11434")
sys.exit(1)
except requests.Timeout:
print(f" [warn] Timeout ({timeout}s) — tentative {attempt + 1}/2")
if attempt == 1:
return None
except requests.RequestException as e:
print(f" [warn] Erreur requête : {e}")
return None
return None
# ---------------------------------------------------------------------------
# 3. Phase 1 — Détection du type de document
# ---------------------------------------------------------------------------
PROMPT_PHASE1 = """\
Tu es un expert en codage PMSI et contrôle T2A. Analyse le début de ce document et identifie sa structure.
TEXTE (début du document) :
---
{text_preview}
---
Réponds UNIQUEMENT en JSON avec ces champs :
{{
"type_document": "decision_ucr | notification_cpam | rapport_controle | autre",
"organisme": "nom de l'organisme (CPAM, UCR, ARS...)",
"date_document": "date au format YYYY-MM-DD si trouvée, sinon vide",
"objet": "résumé en une phrase de l'objet du document",
"separateur_blocs": "regex Python pour séparer les dossiers individuels (ex: OGC \\\\d+ :)",
"colonnes_detectees": ["liste des champs/colonnes détectés dans la structure"]
}}
IMPORTANT :
- Le separateur_blocs doit être un regex Python valide
- Il doit capturer le motif qui sépare chaque dossier/cas individuel
- Si c'est un document UCR, le séparateur est typiquement "OGC \\\\d+ :"
- Si tu ne trouves pas de séparateur clair, mets une chaîne vide ""
"""
def detect_document_type(full_text: str, model: str, timeout: int, verbose: bool) -> dict:
"""Phase 1 : détection du type de document via LLM."""
preview = full_text[:3000]
prompt = PROMPT_PHASE1.format(text_preview=preview)
result = call_ollama(prompt, model=model, timeout=timeout, verbose=verbose)
if result is None:
print(" [warn] Phase 1 : détection échouée, utilisation des valeurs par défaut")
return {
"type_document": "autre",
"organisme": "",
"date_document": "",
"objet": "",
"separateur_blocs": "",
"colonnes_detectees": [],
}
return result
# ---------------------------------------------------------------------------
# 4. Découpage en blocs
# ---------------------------------------------------------------------------
def split_into_blocks(full_text: str, separator_pattern: str) -> list[str]:
"""Découpe le texte en blocs logiques (dossiers individuels)."""
blocks = []
# Tentative avec le séparateur détecté par le LLM
if separator_pattern:
try:
regex = re.compile(separator_pattern, re.MULTILINE | re.IGNORECASE)
parts = regex.split(full_text)
# Recombiner : le séparateur fait partie du bloc suivant
matches = list(regex.finditer(full_text))
if len(matches) >= 3:
for i, match in enumerate(matches):
start = match.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(full_text)
block = full_text[start:end].strip()
if block:
blocks.append(block)
print(f" Découpage par séparateur : {len(blocks)} blocs trouvés")
return blocks
else:
print(f" [warn] Séparateur '{separator_pattern}' → seulement {len(matches)} blocs, fallback")
except re.error as e:
print(f" [warn] Regex invalide '{separator_pattern}' : {e}, fallback")
# Fallback : découpage par taille (~6000 chars, chevauchement 500)
chunk_size = 6000
overlap = 500
text_len = len(full_text)
if text_len <= chunk_size:
return [full_text]
pos = 0
while pos < text_len:
end = min(pos + chunk_size, text_len)
# Essayer de couper à une fin de ligne
if end < text_len:
newline_pos = full_text.rfind("\n", pos + chunk_size - 200, end + 200)
if newline_pos > pos:
end = newline_pos
blocks.append(full_text[pos:end].strip())
pos = end - overlap if end < text_len else text_len
print(f" Découpage par taille : {len(blocks)} blocs ({chunk_size} chars, chevauchement {overlap})")
return blocks
# ---------------------------------------------------------------------------
# 5. Phase 2 — Extraction bloc par bloc
# ---------------------------------------------------------------------------
SCHEMA_FIELDS = """\
Champs à extraire (JSON) — remplis chaque champ ou laisse une chaîne vide "" si non trouvé :
- "champ": numéro de champ (entier, 0 si non trouvé)
- "ogc": numéro OGC / numéro de dossier (entier, 0 si non trouvé)
- "type_desaccord": type de désaccord — "DP", "DAS", "DP + DAS", ou ""
- "code_etablissement": code(s) CIM-10 de l'établissement (ex: "G40.0 + F10.2")
- "libelle_etablissement": libellé(s) correspondant aux codes établissement
- "code_controleurs": code(s) CIM-10 des contrôleurs (ou "non repris")
- "libelle_controleurs": libellé(s) correspondant aux codes contrôleurs
- "codes_retenus_final": code(s) finalement retenus par l'UCR/la décision
- "decision": classification — "Favorable établissement", "Défavorable établissement", "Mixte", ou "Indéterminé"
* "Favorable établissement" = la décision retient l'avis/le codage de l'établissement
* "Défavorable établissement" = la décision confirme l'avis des contrôleurs
* "Mixte" = partiellement favorable et partiellement défavorable
* "Indéterminé" = impossible à classifier clairement
- "texte_decision_complet": texte intégral de la décision/conclusion
- "resume_motif": résumé en 1-2 phrases du motif de la décision
- "regles_citees": règles de codage citées (ex: "T3, T7")
- "references_guide": références documentaires (guide méthodologique, fascicules ATIH, avis Agora…)
- "ghm_mentionne": tous les GHM mentionnés (ex: "05M09 / 05M092")
- "ghs_mentionne": tous les GHS mentionnés
- "ghm_final": le GHM final retenu
- "ghs_final": le GHS final retenu
- "impact_groupage": impact sur le groupage — "Mieux valorisé", "Pas de changement", ou ""
"""
PROMPT_PHASE2 = """\
Tu es un expert en codage PMSI et contrôle T2A.
CONTEXTE DOCUMENT :
- Type : {type_document}
- Organisme : {organisme}
- Objet : {objet}
BLOC DE TEXTE À ANALYSER :
---
{block_text}
---
CONSIGNES :
1. Extrais les informations de chaque dossier/cas présent dans ce bloc.
2. Si le bloc contient UN SEUL dossier, retourne un objet JSON.
3. Si le bloc contient PLUSIEURS dossiers, retourne une LISTE d'objets JSON.
4. Si le bloc ne contient aucun dossier exploitable (en-tête, pied de page, texte administratif sans cas individuel), retourne : {{"skip": true}}
{schema}
IMPORTANT :
- Sois précis sur les codes CIM-10 (format X00.0)
- Pour la décision, analyse attentivement le texte : "retient l'avis de l'établissement" = Favorable, "confirme l'avis des contrôleurs" = Défavorable
- Ne laisse aucun champ sans clé, utilise "" pour les valeurs inconnues
- Retourne UNIQUEMENT du JSON valide, sans texte avant ou après
"""
def extract_block(
block_text: str,
doc_info: dict,
model: str,
timeout: int,
verbose: bool,
) -> list[dict]:
"""Extrait les données d'un bloc via LLM. Retourne une liste de dossiers."""
prompt = PROMPT_PHASE2.format(
type_document=doc_info.get("type_document", "autre"),
organisme=doc_info.get("organisme", ""),
objet=doc_info.get("objet", ""),
block_text=block_text[:8000], # Limiter la taille
schema=SCHEMA_FIELDS,
)
result = call_ollama(prompt, model=model, max_tokens=4000, timeout=timeout, verbose=verbose)
if result is None:
return []
# Skip
if isinstance(result, dict) and result.get("skip"):
return []
# Normaliser en liste
if isinstance(result, dict):
items = [result]
elif isinstance(result, list):
items = [r for r in result if isinstance(r, dict) and not r.get("skip")]
else:
return []
return items
# ---------------------------------------------------------------------------
# 6. Fusion et dédoublonnage
# ---------------------------------------------------------------------------
# Mapping clés LLM (snake_case) → clés Excel (TitleCase)
KEY_MAP = {
"champ": "Champ",
"ogc": "OGC",
"type_desaccord": "Type_desaccord",
"code_etablissement": "Code_etablissement",
"libelle_etablissement": "Libelle_etablissement",
"code_controleurs": "Code_controleurs",
"libelle_controleurs": "Libelle_controleurs",
"codes_retenus_final": "Codes_retenus_final",
"decision": "Decision",
"texte_decision_complet": "Texte_decision_complet",
"resume_motif": "Resume_motif",
"regles_citees": "Regles_citees",
"references_guide": "References_guide",
"ghm_mentionne": "GHM_mentionne",
"ghs_mentionne": "GHS_mentionne",
"ghm_final": "GHM_final",
"ghs_final": "GHS_final",
"impact_groupage": "Impact_groupage",
}
def normalize_row(raw: dict) -> dict:
"""Convertit les clés LLM en clés Excel et normalise les types."""
row = {}
for llm_key, excel_key in KEY_MAP.items():
val = raw.get(llm_key, raw.get(excel_key, ""))
# Convertir en int pour Champ et OGC
if excel_key in ("Champ", "OGC"):
try:
val = int(val) if val else 0
except (ValueError, TypeError):
val = 0
elif not isinstance(val, str):
val = str(val) if val is not None else ""
row[excel_key] = val
return row
def merge_and_deduplicate(all_items: list[dict]) -> list[dict]:
"""Fusionne, déduplique par OGC, et trie les résultats."""
rows = [normalize_row(item) for item in all_items]
# Filtrer les lignes sans contenu utile
rows = [r for r in rows if r["OGC"] > 0 or r["Code_etablissement"] or r["Decision"]]
# Dédoublonnage par OGC (garder la version la plus complète)
seen: dict[int, dict] = {}
deduped: list[dict] = []
for r in rows:
key = r["OGC"]
if key == 0:
deduped.append(r)
continue
if key in seen:
old = seen[key]
old_score = sum(1 for v in old.values() if v and v != 0)
new_score = sum(1 for v in r.values() if v and v != 0)
if new_score > old_score:
deduped = [x for x in deduped if x["OGC"] != key]
deduped.append(r)
seen[key] = r
else:
seen[key] = r
deduped.append(r)
deduped.sort(key=lambda r: (r["Champ"], r["OGC"]))
return deduped
# ---------------------------------------------------------------------------
# 7. Export Excel
# ---------------------------------------------------------------------------
HEADERS = [
"Champ", "OGC", "Type_desaccord",
"Code_etablissement", "Libelle_etablissement",
"Code_controleurs", "Libelle_controleurs",
"Codes_retenus_final",
"Decision", "Texte_decision_complet", "Resume_motif",
"Regles_citees", "References_guide",
"GHM_mentionne", "GHS_mentionne", "GHM_final", "GHS_final",
"Impact_groupage",
]
HEADER_LABELS = [
"Champ", "N° OGC", "Type désaccord",
"Code(s) Établissement", "Libellé Établissement",
"Code(s) Contrôleurs", "Libellé Contrôleurs",
"Code(s) retenus (final)",
"Décision UCR", "Texte décision complet", "Résumé du motif",
"Règles codage citées", "Références (guide, fascicules, avis)",
"GHM mentionné(s)", "GHS mentionné(s)", "GHM final", "GHS final",
"Impact groupage",
]
def write_excel(rows: list[dict], output_path: str):
"""Écrit les résultats dans un fichier Excel (feuille unique)."""
wb = Workbook()
ws = wb.active
ws.title = "Décisions UCR"
# Styles
header_font = Font(bold=True, color="FFFFFF", size=11)
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin"), right=Side(style="thin"),
top=Side(style="thin"), bottom=Side(style="thin"),
)
fav_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
defav_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
mixte_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
# En-têtes
for col, label in enumerate(HEADER_LABELS, 1):
cell = ws.cell(row=1, column=col, value=label)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# Données
for row_idx, data in enumerate(rows, 2):
for col_idx, key in enumerate(HEADERS, 1):
val = data.get(key, "")
cell = ws.cell(row=row_idx, column=col_idx, value=val)
cell.border = thin_border
cell.alignment = Alignment(vertical="top", wrap_text=True)
# Colorer la colonne Décision
dec_col = HEADERS.index("Decision") + 1
decision_cell = ws.cell(row=row_idx, column=dec_col)
dv = str(decision_cell.value or "")
if "Favorable" in dv and "Défavorable" not in dv:
decision_cell.fill = fav_fill
elif "Défavorable" in dv:
decision_cell.fill = defav_fill
elif "Mixte" in dv:
decision_cell.fill = mixte_fill
# Largeurs de colonnes
col_widths = {
"Champ": 8, "OGC": 8, "Type_desaccord": 14,
"Code_etablissement": 22, "Libelle_etablissement": 40,
"Code_controleurs": 22, "Libelle_controleurs": 40,
"Codes_retenus_final": 22,
"Decision": 24, "Texte_decision_complet": 80,
"Resume_motif": 60,
"Regles_citees": 16, "References_guide": 50,
"GHM_mentionne": 16, "GHS_mentionne": 16,
"GHM_final": 12, "GHS_final": 10,
"Impact_groupage": 20,
}
for i, key in enumerate(HEADERS, 1):
ws.column_dimensions[ws.cell(row=1, column=i).column_letter].width = col_widths.get(key, 15)
# Filtre automatique + freeze
last_col_letter = ws.cell(row=1, column=len(HEADERS)).column_letter
ws.auto_filter.ref = f"A1:{last_col_letter}{len(rows)+1}"
ws.freeze_panes = "A2"
wb.save(output_path)
print(f"Excel enregistré : {output_path}")
# ---------------------------------------------------------------------------
# 8. CLI / Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Extracteur T2A généraliste via OCR + LLM (Ollama)",
)
parser.add_argument("pdf", help="Fichier PDF à traiter")
parser.add_argument("--model", default="gemma3:27b-it-qat",
help="Modèle Ollama (défaut: gemma3:27b-it-qat)")
parser.add_argument("--timeout", type=int, default=120,
help="Timeout par appel LLM en secondes (défaut: 120)")
parser.add_argument("--output", default=None,
help="Fichier Excel de sortie (défaut: _llm.xlsx)")
parser.add_argument("--dpi", type=int, default=300,
help="Résolution OCR (défaut: 300)")
parser.add_argument("--no-cache", action="store_true",
help="Désactiver le cache texte OCR")
parser.add_argument("--verbose", action="store_true",
help="Afficher les prompts/réponses LLM")
args = parser.parse_args()
pdf_path = args.pdf
if not Path(pdf_path).exists():
print(f"[ERREUR] Fichier non trouvé : {pdf_path}")
sys.exit(1)
output_path = args.output or str(Path(pdf_path).with_name(
Path(pdf_path).stem + "_llm.xlsx"
))
print(f"Fichier PDF : {pdf_path}")
print(f"Modèle LLM : {args.model}")
print(f"Sortie Excel : {output_path}")
print()
# --- Étape 1 : OCR ---
txt_cache = Path(pdf_path).with_suffix(".txt")
if txt_cache.exists() and not args.no_cache:
print("Étape 1/4 : Chargement du texte depuis le cache...")
full_text = txt_cache.read_text(encoding="utf-8")
full_text = normalize_text(full_text)
print(f" {len(full_text)} caractères chargés depuis {txt_cache}")
else:
print("Étape 1/4 : OCR du document...")
full_text = ocr_pdf(pdf_path, dpi=args.dpi)
if not args.no_cache:
txt_cache.write_text(full_text, encoding="utf-8")
print(f" Cache texte sauvegardé : {txt_cache}")
print(f" Longueur du texte : {len(full_text)} caractères")
print()
# --- Étape 2 : Détection du type de document ---
print("Étape 2/4 : Détection du type de document...")
t0 = time.time()
doc_info = detect_document_type(full_text, model=args.model, timeout=args.timeout, verbose=args.verbose)
print(f" Type : {doc_info.get('type_document', '?')}")
print(f" Organisme : {doc_info.get('organisme', '?')}")
print(f" Objet : {doc_info.get('objet', '?')}")
print(f" Séparateur: {doc_info.get('separateur_blocs', '(aucun)')}")
print(f" Colonnes : {doc_info.get('colonnes_detectees', [])}")
print(f" ({time.time() - t0:.1f}s)")
print()
# --- Étape 3 : Découpage et extraction ---
print("Étape 3/4 : Découpage en blocs et extraction LLM...")
separator = doc_info.get("separateur_blocs", "")
blocks = split_into_blocks(full_text, separator)
print(f" {len(blocks)} blocs à traiter")
all_items = []
t0 = time.time()
for i, block in enumerate(blocks):
print(f" Bloc {i+1}/{len(blocks)}...", end="\r")
items = extract_block(block, doc_info, model=args.model, timeout=args.timeout, verbose=args.verbose)
all_items.extend(items)
# Progression
elapsed = time.time() - t0
avg = elapsed / (i + 1)
remaining = avg * (len(blocks) - i - 1)
print(f" Bloc {i+1}/{len(blocks)} → {len(items)} dossier(s) "
f"[{elapsed:.0f}s écoulé, ~{remaining:.0f}s restant] ")
total_elapsed = time.time() - t0
print(f" Extraction terminée : {len(all_items)} dossiers bruts en {total_elapsed:.0f}s")
print()
# --- Étape 4 : Fusion et export ---
print("Étape 4/4 : Fusion, dédoublonnage et export Excel...")
rows = merge_and_deduplicate(all_items)
print(f" {len(rows)} dossiers après dédoublonnage")
# Statistiques
fav = sum(1 for r in rows if "Favorable" in r.get("Decision", "") and "Défavorable" not in r.get("Decision", ""))
defav = sum(1 for r in rows if "Défavorable" in r.get("Decision", ""))
mixte = sum(1 for r in rows if "Mixte" in r.get("Decision", ""))
indet = sum(1 for r in rows if r.get("Decision", "") in ("Indéterminé", ""))
print(f" Favorable établissement : {fav}")
print(f" Défavorable établissement : {defav}")
print(f" Mixte : {mixte}")
print(f" Indéterminé : {indet}")
write_excel(rows, output_path)
print()
print("Terminé.")
if __name__ == "__main__":
main()