739 lines
27 KiB
Python
739 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
extract_t2a_llm.py — Extracteur T2A généraliste via OCR + LLM (Ollama)
|
|
|
|
Entrée : PDF (scanné ou natif) de document T2A (décision UCR, notification CPAM, rapport ARS…)
|
|
Sortie : Fichier Excel (.xlsx) avec les données structurées
|
|
|
|
Architecture :
|
|
PDF → OCR/texte natif → Détection type (1 appel LLM) → Extraction bloc par bloc (N appels LLM) → Excel
|
|
|
|
Usage :
|
|
python extract_t2a_llm.py FICHIER.pdf [--model gemma3:27b-it-qat] [--output out.xlsx] [--verbose]
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pymupdf
|
|
import requests
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 0. Normalisation texte OCR
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def normalize_text(text: str) -> str:
|
|
"""Normalise les apostrophes, guillemets et espaces issus de l'OCR."""
|
|
text = text.replace("\u2018", "'").replace("\u2019", "'")
|
|
text = text.replace("\u201C", '"').replace("\u201D", '"')
|
|
text = text.replace("\u00AB", '"').replace("\u00BB", '"')
|
|
text = text.replace("''", "'")
|
|
text = text.replace("\u00A0", " ").replace("\u202F", " ")
|
|
text = re.sub(r"\bF'UCR\b", "l'UCR", text)
|
|
text = re.sub(r"\bl''UCR\b", "l'UCR", text)
|
|
return text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. OCR / Extraction texte (docTR — deep learning, GPU)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_doctr_model = None
|
|
|
|
|
|
def _get_doctr_model():
|
|
"""Lazy-init du modèle docTR (chargé une seule fois, GPU si VRAM libre, sinon CPU)."""
|
|
global _doctr_model
|
|
if _doctr_model is not None:
|
|
return _doctr_model
|
|
|
|
from doctr.models import ocr_predictor
|
|
|
|
print(" Chargement du modèle docTR (première utilisation)...")
|
|
t0 = time.time()
|
|
_doctr_model = ocr_predictor(
|
|
det_arch="db_resnet50",
|
|
reco_arch="crnn_vgg16_bn",
|
|
pretrained=True,
|
|
)
|
|
|
|
# Déplacer sur GPU si disponible et assez de VRAM libre
|
|
try:
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
free_vram = torch.cuda.mem_get_info()[0] / (1024 ** 3)
|
|
if free_vram > 1.0:
|
|
try:
|
|
_doctr_model = _doctr_model.cuda()
|
|
print(f" docTR sur GPU ({torch.cuda.get_device_name(0)}, "
|
|
f"{free_vram:.1f} Go libres) — {time.time() - t0:.1f}s")
|
|
except torch.cuda.OutOfMemoryError:
|
|
_doctr_model = _doctr_model.cpu()
|
|
torch.cuda.empty_cache()
|
|
print(f" GPU VRAM insuffisante, docTR sur CPU — {time.time() - t0:.1f}s")
|
|
else:
|
|
print(f" GPU VRAM trop basse ({free_vram:.1f} Go libres, Ollama ?), "
|
|
f"docTR sur CPU — {time.time() - t0:.1f}s")
|
|
else:
|
|
print(f" docTR sur CPU — {time.time() - t0:.1f}s")
|
|
except ImportError:
|
|
print(f" docTR sur CPU — {time.time() - t0:.1f}s")
|
|
|
|
return _doctr_model
|
|
|
|
|
|
def ocr_pdf(pdf_path: str, dpi: int = 300) -> str:
|
|
"""Extrait le texte du PDF : texte natif si disponible, sinon OCR docTR (GPU)."""
|
|
doc = pymupdf.open(pdf_path)
|
|
total = len(doc)
|
|
|
|
# Détection : texte natif vs scanné (sur la première page)
|
|
first_page_text = doc[0].get_text() if total > 0 else ""
|
|
is_native = len(first_page_text.strip()) > 100
|
|
|
|
if is_native:
|
|
print(" Mode : extraction texte natif (pymupdf)")
|
|
full_text = []
|
|
for i, page in enumerate(doc):
|
|
print(f" Extraction page {i+1}/{total}...", end="\r")
|
|
full_text.append(page.get_text())
|
|
print(f" Extraction terminée : {total} pages. ")
|
|
return normalize_text("\n\n".join(full_text))
|
|
|
|
# OCR docTR
|
|
print(" Mode : OCR docTR (deep learning, GPU)")
|
|
from doctr.io import DocumentFile
|
|
|
|
model = _get_doctr_model()
|
|
|
|
print(f" Lecture du PDF ({total} pages)...")
|
|
doc_pages = DocumentFile.from_pdf(pdf_path)
|
|
print(f" OCR en cours sur {len(doc_pages)} pages...")
|
|
|
|
t0 = time.time()
|
|
result = model(doc_pages)
|
|
elapsed = time.time() - t0
|
|
print(f" OCR terminé : {total} pages en {elapsed:.1f}s "
|
|
f"({elapsed/total:.1f}s/page)")
|
|
|
|
full_text = result.render()
|
|
return normalize_text(full_text)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Client Ollama
|
|
# ---------------------------------------------------------------------------
|
|
|
|
NO_FORMAT_JSON_PREFIXES = ("qwen3", "qwen2.5")
|
|
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
|
|
|
|
def parse_json_response(raw: str) -> dict | list | None:
|
|
"""Parse une réponse JSON, en gérant les blocs markdown et le texte parasite."""
|
|
text = raw.strip()
|
|
|
|
# Supprimer les blocs <think>...</think> (Qwen3)
|
|
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
|
|
|
|
# Supprimer les blocs markdown ```json ... ```
|
|
if text.startswith("```"):
|
|
first_nl = text.find("\n")
|
|
if first_nl != -1:
|
|
text = text[first_nl + 1:]
|
|
if text.rstrip().endswith("```"):
|
|
text = text.rstrip()[:-3]
|
|
text = text.strip()
|
|
|
|
# Tentative directe
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Extraire le premier objet ou tableau JSON
|
|
for start_char, end_char in [("{", "}"), ("[", "]")]:
|
|
start = text.find(start_char)
|
|
if start == -1:
|
|
continue
|
|
depth = 0
|
|
for i in range(start, len(text)):
|
|
if text[i] == start_char:
|
|
depth += 1
|
|
elif text[i] == end_char:
|
|
depth -= 1
|
|
if depth == 0:
|
|
try:
|
|
return json.loads(text[start:i + 1])
|
|
except json.JSONDecodeError:
|
|
break
|
|
|
|
return None
|
|
|
|
|
|
def call_ollama(
|
|
prompt: str,
|
|
model: str,
|
|
temperature: float = 0.1,
|
|
max_tokens: int = 4000,
|
|
timeout: int = 120,
|
|
verbose: bool = False,
|
|
) -> dict | list | None:
|
|
"""Appelle Ollama. Utilise l'API chat avec think=false pour Qwen3."""
|
|
is_qwen = any(model.startswith(p) for p in NO_FORMAT_JSON_PREFIXES)
|
|
|
|
if is_qwen:
|
|
# API chat + think:false pour Qwen3 (pas de format JSON natif)
|
|
endpoint = f"{OLLAMA_URL}/api/chat"
|
|
body = {
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"stream": False,
|
|
"think": False,
|
|
"options": {
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens,
|
|
},
|
|
}
|
|
else:
|
|
# API generate + format JSON natif pour les autres modèles
|
|
endpoint = f"{OLLAMA_URL}/api/generate"
|
|
body = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"format": "json",
|
|
"options": {
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens,
|
|
},
|
|
}
|
|
|
|
if verbose:
|
|
print(f"\n--- PROMPT ({model}) ---")
|
|
print(prompt[:500] + ("..." if len(prompt) > 500 else ""))
|
|
print("--- FIN PROMPT ---\n")
|
|
|
|
for attempt in range(2):
|
|
try:
|
|
t0 = time.time()
|
|
response = requests.post(endpoint, json=body, timeout=timeout)
|
|
elapsed = time.time() - t0
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Extraire le texte de la réponse selon l'API utilisée
|
|
if is_qwen:
|
|
raw = data.get("message", {}).get("content", "")
|
|
else:
|
|
raw = data.get("response", "")
|
|
|
|
if verbose:
|
|
print(f"--- RÉPONSE ({elapsed:.1f}s) ---")
|
|
print(raw[:500] + ("..." if len(raw) > 500 else ""))
|
|
print("--- FIN RÉPONSE ---\n")
|
|
|
|
result = parse_json_response(raw)
|
|
if result is not None:
|
|
return result
|
|
if attempt == 0:
|
|
print(f" [warn] JSON invalide, retry... (raw: {raw[:100]})")
|
|
except requests.ConnectionError:
|
|
print("[ERREUR] Ollama non disponible sur localhost:11434")
|
|
sys.exit(1)
|
|
except requests.Timeout:
|
|
print(f" [warn] Timeout ({timeout}s) — tentative {attempt + 1}/2")
|
|
if attempt == 1:
|
|
return None
|
|
except requests.RequestException as e:
|
|
print(f" [warn] Erreur requête : {e}")
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Phase 1 — Détection du type de document
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PROMPT_PHASE1 = """\
|
|
Tu es un expert en codage PMSI et contrôle T2A. Analyse le début de ce document et identifie sa structure.
|
|
|
|
TEXTE (début du document) :
|
|
---
|
|
{text_preview}
|
|
---
|
|
|
|
Réponds UNIQUEMENT en JSON avec ces champs :
|
|
{{
|
|
"type_document": "decision_ucr | notification_cpam | rapport_controle | autre",
|
|
"organisme": "nom de l'organisme (CPAM, UCR, ARS...)",
|
|
"date_document": "date au format YYYY-MM-DD si trouvée, sinon vide",
|
|
"objet": "résumé en une phrase de l'objet du document",
|
|
"separateur_blocs": "regex Python pour séparer les dossiers individuels (ex: OGC \\\\d+ :)",
|
|
"colonnes_detectees": ["liste des champs/colonnes détectés dans la structure"]
|
|
}}
|
|
|
|
IMPORTANT :
|
|
- Le separateur_blocs doit être un regex Python valide
|
|
- Il doit capturer le motif qui sépare chaque dossier/cas individuel
|
|
- Si c'est un document UCR, le séparateur est typiquement "OGC \\\\d+ :"
|
|
- Si tu ne trouves pas de séparateur clair, mets une chaîne vide ""
|
|
"""
|
|
|
|
|
|
def detect_document_type(full_text: str, model: str, timeout: int, verbose: bool) -> dict:
|
|
"""Phase 1 : détection du type de document via LLM."""
|
|
preview = full_text[:3000]
|
|
prompt = PROMPT_PHASE1.format(text_preview=preview)
|
|
result = call_ollama(prompt, model=model, timeout=timeout, verbose=verbose)
|
|
if result is None:
|
|
print(" [warn] Phase 1 : détection échouée, utilisation des valeurs par défaut")
|
|
return {
|
|
"type_document": "autre",
|
|
"organisme": "",
|
|
"date_document": "",
|
|
"objet": "",
|
|
"separateur_blocs": "",
|
|
"colonnes_detectees": [],
|
|
}
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Découpage en blocs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def split_into_blocks(full_text: str, separator_pattern: str) -> list[str]:
|
|
"""Découpe le texte en blocs logiques (dossiers individuels)."""
|
|
blocks = []
|
|
|
|
# Tentative avec le séparateur détecté par le LLM
|
|
if separator_pattern:
|
|
try:
|
|
regex = re.compile(separator_pattern, re.MULTILINE | re.IGNORECASE)
|
|
parts = regex.split(full_text)
|
|
# Recombiner : le séparateur fait partie du bloc suivant
|
|
matches = list(regex.finditer(full_text))
|
|
if len(matches) >= 3:
|
|
for i, match in enumerate(matches):
|
|
start = match.start()
|
|
end = matches[i + 1].start() if i + 1 < len(matches) else len(full_text)
|
|
block = full_text[start:end].strip()
|
|
if block:
|
|
blocks.append(block)
|
|
print(f" Découpage par séparateur : {len(blocks)} blocs trouvés")
|
|
return blocks
|
|
else:
|
|
print(f" [warn] Séparateur '{separator_pattern}' → seulement {len(matches)} blocs, fallback")
|
|
except re.error as e:
|
|
print(f" [warn] Regex invalide '{separator_pattern}' : {e}, fallback")
|
|
|
|
# Fallback : découpage par taille (~6000 chars, chevauchement 500)
|
|
chunk_size = 6000
|
|
overlap = 500
|
|
text_len = len(full_text)
|
|
if text_len <= chunk_size:
|
|
return [full_text]
|
|
|
|
pos = 0
|
|
while pos < text_len:
|
|
end = min(pos + chunk_size, text_len)
|
|
# Essayer de couper à une fin de ligne
|
|
if end < text_len:
|
|
newline_pos = full_text.rfind("\n", pos + chunk_size - 200, end + 200)
|
|
if newline_pos > pos:
|
|
end = newline_pos
|
|
blocks.append(full_text[pos:end].strip())
|
|
pos = end - overlap if end < text_len else text_len
|
|
|
|
print(f" Découpage par taille : {len(blocks)} blocs ({chunk_size} chars, chevauchement {overlap})")
|
|
return blocks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. Phase 2 — Extraction bloc par bloc
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SCHEMA_FIELDS = """\
|
|
Champs à extraire (JSON) — remplis chaque champ ou laisse une chaîne vide "" si non trouvé :
|
|
- "champ": numéro de champ (entier, 0 si non trouvé)
|
|
- "ogc": numéro OGC / numéro de dossier (entier, 0 si non trouvé)
|
|
- "type_desaccord": type de désaccord — "DP", "DAS", "DP + DAS", ou ""
|
|
- "code_etablissement": code(s) CIM-10 de l'établissement (ex: "G40.0 + F10.2")
|
|
- "libelle_etablissement": libellé(s) correspondant aux codes établissement
|
|
- "code_controleurs": code(s) CIM-10 des contrôleurs (ou "non repris")
|
|
- "libelle_controleurs": libellé(s) correspondant aux codes contrôleurs
|
|
- "codes_retenus_final": code(s) finalement retenus par l'UCR/la décision
|
|
- "decision": classification — "Favorable établissement", "Défavorable établissement", "Mixte", ou "Indéterminé"
|
|
* "Favorable établissement" = la décision retient l'avis/le codage de l'établissement
|
|
* "Défavorable établissement" = la décision confirme l'avis des contrôleurs
|
|
* "Mixte" = partiellement favorable et partiellement défavorable
|
|
* "Indéterminé" = impossible à classifier clairement
|
|
- "texte_decision_complet": texte intégral de la décision/conclusion
|
|
- "resume_motif": résumé en 1-2 phrases du motif de la décision
|
|
- "regles_citees": règles de codage citées (ex: "T3, T7")
|
|
- "references_guide": références documentaires (guide méthodologique, fascicules ATIH, avis Agora…)
|
|
- "ghm_mentionne": tous les GHM mentionnés (ex: "05M09 / 05M092")
|
|
- "ghs_mentionne": tous les GHS mentionnés
|
|
- "ghm_final": le GHM final retenu
|
|
- "ghs_final": le GHS final retenu
|
|
- "impact_groupage": impact sur le groupage — "Mieux valorisé", "Pas de changement", ou ""
|
|
"""
|
|
|
|
PROMPT_PHASE2 = """\
|
|
Tu es un expert en codage PMSI et contrôle T2A.
|
|
|
|
CONTEXTE DOCUMENT :
|
|
- Type : {type_document}
|
|
- Organisme : {organisme}
|
|
- Objet : {objet}
|
|
|
|
BLOC DE TEXTE À ANALYSER :
|
|
---
|
|
{block_text}
|
|
---
|
|
|
|
CONSIGNES :
|
|
1. Extrais les informations de chaque dossier/cas présent dans ce bloc.
|
|
2. Si le bloc contient UN SEUL dossier, retourne un objet JSON.
|
|
3. Si le bloc contient PLUSIEURS dossiers, retourne une LISTE d'objets JSON.
|
|
4. Si le bloc ne contient aucun dossier exploitable (en-tête, pied de page, texte administratif sans cas individuel), retourne : {{"skip": true}}
|
|
|
|
{schema}
|
|
|
|
IMPORTANT :
|
|
- Sois précis sur les codes CIM-10 (format X00.0)
|
|
- Pour la décision, analyse attentivement le texte : "retient l'avis de l'établissement" = Favorable, "confirme l'avis des contrôleurs" = Défavorable
|
|
- Ne laisse aucun champ sans clé, utilise "" pour les valeurs inconnues
|
|
- Retourne UNIQUEMENT du JSON valide, sans texte avant ou après
|
|
"""
|
|
|
|
|
|
def extract_block(
|
|
block_text: str,
|
|
doc_info: dict,
|
|
model: str,
|
|
timeout: int,
|
|
verbose: bool,
|
|
) -> list[dict]:
|
|
"""Extrait les données d'un bloc via LLM. Retourne une liste de dossiers."""
|
|
prompt = PROMPT_PHASE2.format(
|
|
type_document=doc_info.get("type_document", "autre"),
|
|
organisme=doc_info.get("organisme", ""),
|
|
objet=doc_info.get("objet", ""),
|
|
block_text=block_text[:8000], # Limiter la taille
|
|
schema=SCHEMA_FIELDS,
|
|
)
|
|
result = call_ollama(prompt, model=model, max_tokens=4000, timeout=timeout, verbose=verbose)
|
|
if result is None:
|
|
return []
|
|
|
|
# Skip
|
|
if isinstance(result, dict) and result.get("skip"):
|
|
return []
|
|
|
|
# Normaliser en liste
|
|
if isinstance(result, dict):
|
|
items = [result]
|
|
elif isinstance(result, list):
|
|
items = [r for r in result if isinstance(r, dict) and not r.get("skip")]
|
|
else:
|
|
return []
|
|
|
|
return items
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. Fusion et dédoublonnage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Mapping clés LLM (snake_case) → clés Excel (TitleCase)
|
|
KEY_MAP = {
|
|
"champ": "Champ",
|
|
"ogc": "OGC",
|
|
"type_desaccord": "Type_desaccord",
|
|
"code_etablissement": "Code_etablissement",
|
|
"libelle_etablissement": "Libelle_etablissement",
|
|
"code_controleurs": "Code_controleurs",
|
|
"libelle_controleurs": "Libelle_controleurs",
|
|
"codes_retenus_final": "Codes_retenus_final",
|
|
"decision": "Decision",
|
|
"texte_decision_complet": "Texte_decision_complet",
|
|
"resume_motif": "Resume_motif",
|
|
"regles_citees": "Regles_citees",
|
|
"references_guide": "References_guide",
|
|
"ghm_mentionne": "GHM_mentionne",
|
|
"ghs_mentionne": "GHS_mentionne",
|
|
"ghm_final": "GHM_final",
|
|
"ghs_final": "GHS_final",
|
|
"impact_groupage": "Impact_groupage",
|
|
}
|
|
|
|
|
|
def normalize_row(raw: dict) -> dict:
|
|
"""Convertit les clés LLM en clés Excel et normalise les types."""
|
|
row = {}
|
|
for llm_key, excel_key in KEY_MAP.items():
|
|
val = raw.get(llm_key, raw.get(excel_key, ""))
|
|
# Convertir en int pour Champ et OGC
|
|
if excel_key in ("Champ", "OGC"):
|
|
try:
|
|
val = int(val) if val else 0
|
|
except (ValueError, TypeError):
|
|
val = 0
|
|
elif not isinstance(val, str):
|
|
val = str(val) if val is not None else ""
|
|
row[excel_key] = val
|
|
return row
|
|
|
|
|
|
def merge_and_deduplicate(all_items: list[dict]) -> list[dict]:
|
|
"""Fusionne, déduplique par OGC, et trie les résultats."""
|
|
rows = [normalize_row(item) for item in all_items]
|
|
|
|
# Filtrer les lignes sans contenu utile
|
|
rows = [r for r in rows if r["OGC"] > 0 or r["Code_etablissement"] or r["Decision"]]
|
|
|
|
# Dédoublonnage par OGC (garder la version la plus complète)
|
|
seen: dict[int, dict] = {}
|
|
deduped: list[dict] = []
|
|
for r in rows:
|
|
key = r["OGC"]
|
|
if key == 0:
|
|
deduped.append(r)
|
|
continue
|
|
if key in seen:
|
|
old = seen[key]
|
|
old_score = sum(1 for v in old.values() if v and v != 0)
|
|
new_score = sum(1 for v in r.values() if v and v != 0)
|
|
if new_score > old_score:
|
|
deduped = [x for x in deduped if x["OGC"] != key]
|
|
deduped.append(r)
|
|
seen[key] = r
|
|
else:
|
|
seen[key] = r
|
|
deduped.append(r)
|
|
|
|
deduped.sort(key=lambda r: (r["Champ"], r["OGC"]))
|
|
return deduped
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 7. Export Excel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
HEADERS = [
|
|
"Champ", "OGC", "Type_desaccord",
|
|
"Code_etablissement", "Libelle_etablissement",
|
|
"Code_controleurs", "Libelle_controleurs",
|
|
"Codes_retenus_final",
|
|
"Decision", "Texte_decision_complet", "Resume_motif",
|
|
"Regles_citees", "References_guide",
|
|
"GHM_mentionne", "GHS_mentionne", "GHM_final", "GHS_final",
|
|
"Impact_groupage",
|
|
]
|
|
|
|
HEADER_LABELS = [
|
|
"Champ", "N° OGC", "Type désaccord",
|
|
"Code(s) Établissement", "Libellé Établissement",
|
|
"Code(s) Contrôleurs", "Libellé Contrôleurs",
|
|
"Code(s) retenus (final)",
|
|
"Décision UCR", "Texte décision complet", "Résumé du motif",
|
|
"Règles codage citées", "Références (guide, fascicules, avis)",
|
|
"GHM mentionné(s)", "GHS mentionné(s)", "GHM final", "GHS final",
|
|
"Impact groupage",
|
|
]
|
|
|
|
|
|
def write_excel(rows: list[dict], output_path: str):
|
|
"""Écrit les résultats dans un fichier Excel (feuille unique)."""
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "Décisions UCR"
|
|
|
|
# Styles
|
|
header_font = Font(bold=True, color="FFFFFF", size=11)
|
|
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
|
|
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
|
|
thin_border = Border(
|
|
left=Side(style="thin"), right=Side(style="thin"),
|
|
top=Side(style="thin"), bottom=Side(style="thin"),
|
|
)
|
|
|
|
fav_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
|
|
defav_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
|
|
mixte_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
|
|
|
|
# En-têtes
|
|
for col, label in enumerate(HEADER_LABELS, 1):
|
|
cell = ws.cell(row=1, column=col, value=label)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = header_align
|
|
cell.border = thin_border
|
|
|
|
# Données
|
|
for row_idx, data in enumerate(rows, 2):
|
|
for col_idx, key in enumerate(HEADERS, 1):
|
|
val = data.get(key, "")
|
|
cell = ws.cell(row=row_idx, column=col_idx, value=val)
|
|
cell.border = thin_border
|
|
cell.alignment = Alignment(vertical="top", wrap_text=True)
|
|
|
|
# Colorer la colonne Décision
|
|
dec_col = HEADERS.index("Decision") + 1
|
|
decision_cell = ws.cell(row=row_idx, column=dec_col)
|
|
dv = str(decision_cell.value or "")
|
|
if "Favorable" in dv and "Défavorable" not in dv:
|
|
decision_cell.fill = fav_fill
|
|
elif "Défavorable" in dv:
|
|
decision_cell.fill = defav_fill
|
|
elif "Mixte" in dv:
|
|
decision_cell.fill = mixte_fill
|
|
|
|
# Largeurs de colonnes
|
|
col_widths = {
|
|
"Champ": 8, "OGC": 8, "Type_desaccord": 14,
|
|
"Code_etablissement": 22, "Libelle_etablissement": 40,
|
|
"Code_controleurs": 22, "Libelle_controleurs": 40,
|
|
"Codes_retenus_final": 22,
|
|
"Decision": 24, "Texte_decision_complet": 80,
|
|
"Resume_motif": 60,
|
|
"Regles_citees": 16, "References_guide": 50,
|
|
"GHM_mentionne": 16, "GHS_mentionne": 16,
|
|
"GHM_final": 12, "GHS_final": 10,
|
|
"Impact_groupage": 20,
|
|
}
|
|
for i, key in enumerate(HEADERS, 1):
|
|
ws.column_dimensions[ws.cell(row=1, column=i).column_letter].width = col_widths.get(key, 15)
|
|
|
|
# Filtre automatique + freeze
|
|
last_col_letter = ws.cell(row=1, column=len(HEADERS)).column_letter
|
|
ws.auto_filter.ref = f"A1:{last_col_letter}{len(rows)+1}"
|
|
ws.freeze_panes = "A2"
|
|
|
|
wb.save(output_path)
|
|
print(f"Excel enregistré : {output_path}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 8. CLI / Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Extracteur T2A généraliste via OCR + LLM (Ollama)",
|
|
)
|
|
parser.add_argument("pdf", help="Fichier PDF à traiter")
|
|
parser.add_argument("--model", default="gemma3:27b-it-qat",
|
|
help="Modèle Ollama (défaut: gemma3:27b-it-qat)")
|
|
parser.add_argument("--timeout", type=int, default=120,
|
|
help="Timeout par appel LLM en secondes (défaut: 120)")
|
|
parser.add_argument("--output", default=None,
|
|
help="Fichier Excel de sortie (défaut: <nom>_llm.xlsx)")
|
|
parser.add_argument("--dpi", type=int, default=300,
|
|
help="Résolution OCR (défaut: 300)")
|
|
parser.add_argument("--no-cache", action="store_true",
|
|
help="Désactiver le cache texte OCR")
|
|
parser.add_argument("--verbose", action="store_true",
|
|
help="Afficher les prompts/réponses LLM")
|
|
|
|
args = parser.parse_args()
|
|
|
|
pdf_path = args.pdf
|
|
if not Path(pdf_path).exists():
|
|
print(f"[ERREUR] Fichier non trouvé : {pdf_path}")
|
|
sys.exit(1)
|
|
|
|
output_path = args.output or str(Path(pdf_path).with_name(
|
|
Path(pdf_path).stem + "_llm.xlsx"
|
|
))
|
|
|
|
print(f"Fichier PDF : {pdf_path}")
|
|
print(f"Modèle LLM : {args.model}")
|
|
print(f"Sortie Excel : {output_path}")
|
|
print()
|
|
|
|
# --- Étape 1 : OCR ---
|
|
txt_cache = Path(pdf_path).with_suffix(".txt")
|
|
if txt_cache.exists() and not args.no_cache:
|
|
print("Étape 1/4 : Chargement du texte depuis le cache...")
|
|
full_text = txt_cache.read_text(encoding="utf-8")
|
|
full_text = normalize_text(full_text)
|
|
print(f" {len(full_text)} caractères chargés depuis {txt_cache}")
|
|
else:
|
|
print("Étape 1/4 : OCR du document...")
|
|
full_text = ocr_pdf(pdf_path, dpi=args.dpi)
|
|
if not args.no_cache:
|
|
txt_cache.write_text(full_text, encoding="utf-8")
|
|
print(f" Cache texte sauvegardé : {txt_cache}")
|
|
print(f" Longueur du texte : {len(full_text)} caractères")
|
|
print()
|
|
|
|
# --- Étape 2 : Détection du type de document ---
|
|
print("Étape 2/4 : Détection du type de document...")
|
|
t0 = time.time()
|
|
doc_info = detect_document_type(full_text, model=args.model, timeout=args.timeout, verbose=args.verbose)
|
|
print(f" Type : {doc_info.get('type_document', '?')}")
|
|
print(f" Organisme : {doc_info.get('organisme', '?')}")
|
|
print(f" Objet : {doc_info.get('objet', '?')}")
|
|
print(f" Séparateur: {doc_info.get('separateur_blocs', '(aucun)')}")
|
|
print(f" Colonnes : {doc_info.get('colonnes_detectees', [])}")
|
|
print(f" ({time.time() - t0:.1f}s)")
|
|
print()
|
|
|
|
# --- Étape 3 : Découpage et extraction ---
|
|
print("Étape 3/4 : Découpage en blocs et extraction LLM...")
|
|
separator = doc_info.get("separateur_blocs", "")
|
|
blocks = split_into_blocks(full_text, separator)
|
|
print(f" {len(blocks)} blocs à traiter")
|
|
|
|
all_items = []
|
|
t0 = time.time()
|
|
for i, block in enumerate(blocks):
|
|
print(f" Bloc {i+1}/{len(blocks)}...", end="\r")
|
|
items = extract_block(block, doc_info, model=args.model, timeout=args.timeout, verbose=args.verbose)
|
|
all_items.extend(items)
|
|
# Progression
|
|
elapsed = time.time() - t0
|
|
avg = elapsed / (i + 1)
|
|
remaining = avg * (len(blocks) - i - 1)
|
|
print(f" Bloc {i+1}/{len(blocks)} → {len(items)} dossier(s) "
|
|
f"[{elapsed:.0f}s écoulé, ~{remaining:.0f}s restant] ")
|
|
|
|
total_elapsed = time.time() - t0
|
|
print(f" Extraction terminée : {len(all_items)} dossiers bruts en {total_elapsed:.0f}s")
|
|
print()
|
|
|
|
# --- Étape 4 : Fusion et export ---
|
|
print("Étape 4/4 : Fusion, dédoublonnage et export Excel...")
|
|
rows = merge_and_deduplicate(all_items)
|
|
print(f" {len(rows)} dossiers après dédoublonnage")
|
|
|
|
# Statistiques
|
|
fav = sum(1 for r in rows if "Favorable" in r.get("Decision", "") and "Défavorable" not in r.get("Decision", ""))
|
|
defav = sum(1 for r in rows if "Défavorable" in r.get("Decision", ""))
|
|
mixte = sum(1 for r in rows if "Mixte" in r.get("Decision", ""))
|
|
indet = sum(1 for r in rows if r.get("Decision", "") in ("Indéterminé", ""))
|
|
print(f" Favorable établissement : {fav}")
|
|
print(f" Défavorable établissement : {defav}")
|
|
print(f" Mixte : {mixte}")
|
|
print(f" Indéterminé : {indet}")
|
|
|
|
write_excel(rows, output_path)
|
|
print()
|
|
print("Terminé.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|