Files
Aivanov_scan_ogc/extract_ogc.py
Dom 0c0f62fbf1 feat: extraction OGC et génération de PDFs propres
Pipeline complet pour extraire les données structurées des fiches OGC
scannées (recueil praticien conseil + concertation) et générer des PDFs
propres et lisibles à partir des JSON extraits.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:12:21 +01:00

1007 lines
39 KiB
Python

"""
Extraction structurée des fiches OGC (contrôle T2A)
Pipeline : PDF scan → images → docTR OCR par zones → VLM correction → données structurées
"""
import os
import re
import json
import glob
import base64
import io
import fitz # PyMuPDF
import numpy as np
import requests
from PIL import Image
from doctr.models import ocr_predictor
# ============================================================
# Configuration Ollama VLM
# ============================================================
OLLAMA_URL = "http://localhost:11434/api/generate"
VLM_MODEL = "gemma3:27b-cloud"
# ============================================================
# Zones d'extraction — coordonnées relatives (0-1) sur la page
# Format: (x_min, y_min, x_max, y_max)
# ============================================================
ZONES_PAGE1 = {
# --- En-tête ---
"etablissement": (0.020, 0.060, 0.520, 0.085),
"finess": (0.520, 0.060, 0.720, 0.085),
"date_debut_controle": (0.720, 0.060, 0.960, 0.085),
"n_ogc": (0.780, 0.088, 0.960, 0.115),
"n_champ": (0.020, 0.088, 0.120, 0.110),
"dates_sejour": (0.250, 0.110, 0.600, 0.135),
# --- Données du séjour — ligne Établissement ---
"age_etab": (0.120, 0.185, 0.195, 0.208),
"sexe_etab": (0.218, 0.185, 0.260, 0.208),
"duree_sejour_etab": (0.435, 0.185, 0.485, 0.208),
"mode_entree_etab": (0.495, 0.185, 0.540, 0.208),
"provenance_etab": (0.545, 0.185, 0.595, 0.208),
"mode_sortie_etab": (0.600, 0.185, 0.650, 0.208),
"destination_etab": (0.655, 0.185, 0.710, 0.208),
"nb_seances_etab": (0.710, 0.185, 0.760, 0.208),
"nb_rum_etab": (0.760, 0.185, 0.810, 0.208),
# --- Données du séjour — ligne Recodage ---
"age_reco": (0.120, 0.210, 0.195, 0.233),
"sexe_reco": (0.218, 0.210, 0.260, 0.233),
"duree_sejour_reco": (0.435, 0.210, 0.485, 0.233),
"mode_entree_reco": (0.495, 0.210, 0.540, 0.233),
"provenance_reco": (0.545, 0.210, 0.595, 0.233),
"mode_sortie_reco": (0.600, 0.210, 0.650, 0.233),
"destination_reco": (0.655, 0.210, 0.710, 0.233),
# --- Données du RUM ---
"um_etab": (0.370, 0.270, 0.435, 0.293),
"igs_etab": (0.460, 0.270, 0.520, 0.293),
"duree_rum_etab": (0.630, 0.258, 0.690, 0.280),
"dates_rum_etab": (0.545, 0.282, 0.770, 0.302),
"um_reco": (0.370, 0.312, 0.435, 0.335),
"igs_reco": (0.460, 0.312, 0.520, 0.335),
"duree_rum_reco": (0.630, 0.300, 0.690, 0.322),
# --- Codage Établissement (DP, DR, DAS) ---
"dp_code_etab": (0.065, 0.355, 0.170, 0.375),
"dp_libelle": (0.200, 0.355, 0.770, 0.375),
"dr_code_etab": (0.065, 0.375, 0.170, 0.392),
"das_bloc_etab": (0.065, 0.385, 0.770, 0.475),
# --- Recodage (colonne droite) ---
"dp_code_reco": (0.785, 0.355, 0.920, 0.375),
"dr_code_reco": (0.785, 0.375, 0.920, 0.392),
"das_bloc_reco": (0.785, 0.385, 0.960, 0.475),
# --- Actes ---
"actes_bloc_etab": (0.065, 0.680, 0.770, 0.790),
"actes_bloc_reco": (0.785, 0.680, 0.960, 0.790),
# --- GHM / GHS (ligne unique en bas) ---
"ghm_ghs_ligne": (0.010, 0.808, 0.960, 0.830),
# --- Décisions ---
"recodage_impactant": (0.010, 0.838, 0.400, 0.858),
"ghs_injustifie": (0.010, 0.857, 0.180, 0.878),
"praticien_conseil": (0.010, 0.898, 0.500, 0.920),
}
# Zones spéciales pour détection de checkboxes (accord/désaccord page recueil)
# Zone englobant les 2 checkboxes + labels
ZONE_ACCORD_DESACCORD = (0.580, 0.838, 0.850, 0.878)
# Sous-zones des checkboxes individuelles (carrés ~15x15px à 300dpi)
ZONE_CHECKBOX_ACCORD = (0.588, 0.840, 0.610, 0.858)
ZONE_CHECKBOX_DESACCORD = (0.588, 0.860, 0.610, 0.878)
# Page "Concertation 2/2" — Décision finale
# Corrigé : les zones GHS sont dans le bloc decision_bloc (plus bas que prévu)
ZONES_CONCERTATION_2 = {
"ghs_ligne": (0.030, 0.190, 0.960, 0.230),
"accord_concertation": (0.030, 0.270, 0.960, 0.320),
"date_concertation": (0.100, 0.690, 0.350, 0.730),
}
# Page "Concertation 1/2" — Argumentaire
ZONES_CONCERTATION_1 = {
"date_concertation_arg": (0.140, 0.125, 0.400, 0.160),
"argumentaire": (0.030, 0.220, 0.960, 0.560),
}
# ============================================================
# Post-traitement — nettoyage des valeurs extraites
# ============================================================
def clean_value(text, field_type="text"):
"""Nettoie une valeur extraite selon son type"""
if not text:
return ""
# Supprimer les labels courants capturés dans les zones
labels_to_strip = [
r"Etablissement\s*:?\s*",
r"FINESS\s*:?\s*",
r"Date?\s*début\s*contrôle\s*:?\s*",
r"[Nn][o°]\s*OGC\s*:?\s*",
r"[Nn][o°]\s*[Cc]hamp\s*:?\s*",
r"Dates?\s*du\s*séjour\s*:?\s*",
r"^[Dd][Pp]\s+",
r"^[Dd][Rr]\s*$",
r"GHM\s*[ée]tablissement\s*:?\s*",
r"GHS\s*[ée]tablissement\s*:?\s*",
r"GHM\s*apr[eè]s\s*recodage\s*:?\s*",
r"GHS\s*apr[eè]s\s*recodage\s*:?\s*",
r"GHS\s*initial\s*:?\s*",
r"GHS\s*avant\s*concertation\s*:?\s*",
r"GHS\s*final\s*apr[eè]s\s*concertation\s*:?\s*",
r"Recodage\s*impactant\s*la\s*facturation\s*:?\s*",
r"GHS\s*injustifi[ée]\s*:?\s*",
r"^Recoaage\s*", # OCR error fréquent pour "Recodage"
r"^Recodage\s*",
]
cleaned = text
for pattern in labels_to_strip:
cleaned = re.sub(pattern, "", cleaned, flags=re.IGNORECASE).strip()
# Supprimer les artefacts OCR courants
cleaned = re.sub(r"^\s*[\-/|.]+\s*$", "", cleaned) # Lignes de bruit pur
cleaned = re.sub(r"\s*\|\s*", " | ", cleaned) # Normaliser séparateurs
cleaned = cleaned.strip(" |-./")
if field_type == "number":
nums = re.findall(r"\d+", cleaned)
if not nums:
return ""
return max(nums, key=lambda x: len(x))
if field_type == "code_cim":
match = re.search(r"[A-Z]\d{2,4}\.?\d*\s*\*?", cleaned)
return match.group(0).strip() if match else cleaned
if field_type == "date":
match = re.search(r"\d{2}/\d{2}/\d{4}", cleaned)
return match.group(0) if match else cleaned
if field_type == "dates_range":
match = re.search(r"(\d{2}/\d{2}/\d{4})\s*au\s*(\d{2}/\d{2}/\d{4})", cleaned)
if match:
return f"{match.group(1)} au {match.group(2)}"
return cleaned
return cleaned
def clean_dp_libelle(text):
"""Nettoie le libellé du DP en supprimant le bruit des zones adjacentes"""
if not text:
return ""
parts = text.split("|")
best = ""
for part in parts:
cleaned = part.strip(" :-.|/")
# Ignorer les parties courtes, le bruit, et les headers capturés
if len(cleaned) > 3 and re.search(r"[A-Z]{3,}", cleaned):
# Exclure les headers de tableau capturés par erreur
if re.search(r"(?i)codage|etablissement|recodage", cleaned):
continue
if len(cleaned) > len(best):
best = cleaned
return best if best else text.strip(" :-.|/")
def parse_ghm_ghs_line(text):
"""Parse la ligne GHM/GHS qui contient les 4 valeurs"""
result = {
"ghm_etab": "",
"ghs_etab": "",
"ghm_reco": "",
"ghs_reco": "",
}
ghm_etab = re.search(r"GHM\s*[ée]tablissement\s*:?\s*(\w+)", text, re.IGNORECASE)
ghs_etab = re.search(r"GHS\s*[ée]tablissement\s*:?\s*(\d+)", text, re.IGNORECASE)
ghm_reco = re.search(r"GHM\s*apr[eè]s\s*recodage\s*:?\s*(\w+)", text, re.IGNORECASE)
ghs_reco = re.search(r"GHS\s*apr[eè]s\s*recodage\s*:?\s*(\d+)", text, re.IGNORECASE)
if ghm_etab:
result["ghm_etab"] = ghm_etab.group(1)
if ghs_etab:
result["ghs_etab"] = ghs_etab.group(1)
if ghm_reco:
result["ghm_reco"] = ghm_reco.group(1)
if ghs_reco:
result["ghs_reco"] = ghs_reco.group(1)
return result
def parse_ghs_concertation_line(text):
"""Parse la ligne GHS de la page concertation 2/2"""
result = {
"ghs_initial": "",
"ghs_avant_concertation": "",
"ghs_final": "",
}
m_init = re.search(r"GHS\s*initial\s*:?\s*(\d+)", text, re.IGNORECASE)
m_avant = re.search(r"GHS\s*avant\s*concertation\s*:?\s*(\d+)", text, re.IGNORECASE)
m_final = re.search(r"GHS\s*final\s*(?:apr[eè]s\s*concertation)?\s*:?\s*(\d+)", text, re.IGNORECASE)
if m_init:
result["ghs_initial"] = m_init.group(1)
if m_avant:
result["ghs_avant_concertation"] = m_avant.group(1)
if m_final:
result["ghs_final"] = m_final.group(1)
return result
def parse_das_bloc(text):
"""Parse un bloc DAS en liste de (code, position, libellé)"""
entries = []
parts = text.split("|")
i = 0
while i < len(parts):
part = parts[i].strip()
match = re.match(r"([A-Z]\d{2,4}\.?\d*)\s*(\*?)", part)
if match:
code = match.group(1) + (" *" if match.group(2) else "")
position = ""
libelle = ""
if i + 1 < len(parts):
pos_match = re.match(r"\s*(\d+)\s*$", parts[i + 1].strip())
if pos_match:
position = pos_match.group(1)
i += 1
if i + 1 < len(parts):
next_part = parts[i + 1].strip()
if not re.match(r"[A-Z]\d{2,4}", next_part):
libelle = next_part
i += 1
entries.append({"code": code.strip(), "position": position, "libelle": libelle})
i += 1
return entries
def parse_actes_bloc(text):
"""Parse un bloc d'actes CCAM"""
entries = []
parts = text.split("|")
i = 0
while i < len(parts):
part = parts[i].strip()
match = re.match(r"([A-Z]{4}\d{3})", part)
if match:
code = match.group(1)
position = ""
libelle = ""
if i + 1 < len(parts):
pos_match = re.match(r"\s*(\d+)\s*$", parts[i + 1].strip())
if pos_match:
position = pos_match.group(1)
i += 1
if i + 1 < len(parts):
next_part = parts[i + 1].strip()
if not re.match(r"[A-Z]{4}\d{3}", next_part):
libelle = next_part
i += 1
entries.append({"code": code, "position": position, "libelle": libelle})
i += 1
return entries
# ============================================================
# Détection de checkboxes par analyse de densité de pixels
# ============================================================
def detect_checkbox_state(image, zone):
"""Détecte si une checkbox est cochée en analysant la densité de pixels sombres"""
crop = crop_zone(image, zone)
gray = np.array(crop.convert("L"))
# Pixels sombres = < 128 (sur 0-255)
dark_ratio = np.mean(gray < 128)
# Une checkbox cochée (☒) a ~20-40% de pixels sombres
# Une checkbox vide (☐) a ~5-15% de pixels sombres (juste le contour)
return dark_ratio
def detect_accord_checkbox(image, debug_dir=None):
"""Détecte Accord/Désaccord via analyse visuelle des checkboxes"""
ratio_accord = detect_checkbox_state(image, ZONE_CHECKBOX_ACCORD)
ratio_desaccord = detect_checkbox_state(image, ZONE_CHECKBOX_DESACCORD)
if debug_dir:
crop_a = crop_zone(image, ZONE_CHECKBOX_ACCORD)
crop_d = crop_zone(image, ZONE_CHECKBOX_DESACCORD)
crop_a.save(os.path.join(debug_dir, "checkbox_accord.png"))
crop_d.save(os.path.join(debug_dir, "checkbox_desaccord.png"))
# La checkbox cochée a significativement plus de pixels sombres
diff = ratio_accord - ratio_desaccord
if abs(diff) < 0.03:
return "ambigu"
elif diff > 0:
return "accord"
else:
return "désaccord"
def detect_accord_concertation(text):
"""Détecte la décision de concertation depuis le texte OCR"""
text_lower = text.lower()
if re.search(r"maintien.*(?:avis|initial).*(?:médecin|controleur)", text_lower):
return "maintien_avis_controleur"
if "retour groupage" in text_lower:
return "retour_groupage_dim"
if "autre groupage" in text_lower:
return "autre_groupage"
return text
# ============================================================
# Correction VLM (Ollama gemma3)
# ============================================================
def image_to_base64(image, quality=85):
"""Convertit une image PIL en base64 JPEG"""
buf = io.BytesIO()
image.save(buf, format="JPEG", quality=quality)
return base64.b64encode(buf.getvalue()).decode()
PROMPT_CORRECTION_RECUEIL = """Tu es un expert en codage PMSI/T2A. Voici l'extraction OCR brute d'une fiche de recueil OGC. L'OCR a fait des erreurs. Corrige les erreurs en comparant avec l'image de la fiche.
Extraction OCR brute :
{doctr_json}
RÈGLES DE CORRECTION :
1. Corrige les textes garblés (noms, libellés) en lisant l'image
2. Remplis les champs vides si tu peux les lire sur l'image
3. NE MODIFIE PAS les codes GHM/GHS s'ils ont le bon format (2 chiffres + lettre + 3 chiffres, ex: 06C043). NE MODIFIE PAS les codes GHS numériques s'ils sont déjà remplis.
4. Les codes CIM-10 : lettre majuscule + 2-4 chiffres (ex: K650, T814, E8758). Corrige si le format est invalide.
5. Pour accord_desaccord : regarde les cases cochées ☒/☐ sur l'image. Réponds "accord" ou "désaccord".
6. Supprime tout bruit OCR (tirets parasites, caractères aléatoires, séparateurs |)
7. Pour les champs numériques (age, sexe, duree, mode_entree, provenance, mode_sortie, destination) : ne garde que le nombre, sans texte parasite
8. Le praticien_conseil est un nom de médecin (Dr/DR + nom)
Réponds UNIQUEMENT avec le JSON corrigé, même structure, sans commentaire ni markdown."""
PROMPT_CORRECTION_CONCERTATION = """Tu es un expert en codage PMSI/T2A. Voici l'extraction OCR brute d'une page de concertation OGC. Corrige les erreurs en comparant avec l'image.
Extraction OCR brute :
{doctr_json}
RÈGLES :
1. Les GHS sont des nombres à 4 chiffres. Corrige s'ils sont tronqués ou bruités.
2. Pour la décision : "maintien_avis_controleur", "retour_groupage_dim" ou "autre_groupage" selon la case cochée
3. Extrais la date de concertation si visible (format dd/mm/yyyy)
4. Pour l'argumentaire : corrige les erreurs OCR évidentes mais garde le texte complet
5. Supprime tout bruit OCR
Réponds UNIQUEMENT avec le JSON corrigé, même structure, sans commentaire ni markdown."""
def vlm_correct(image, doctr_parsed, prompt_template, page_type="recueil"):
"""Envoie l'image + JSON docTR au VLM pour correction"""
b64 = image_to_base64(image)
prompt = prompt_template.format(doctr_json=json.dumps(doctr_parsed, ensure_ascii=False, indent=2))
try:
resp = requests.post(OLLAMA_URL, json={
"model": VLM_MODEL,
"prompt": prompt,
"images": [b64],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 3000}
}, timeout=120)
text = resp.json().get("response", "")
# Extraire le JSON de la réponse (peut être entouré de ```)
text = re.sub(r"^```(?:json)?\s*", "", text.strip())
text = re.sub(r"\s*```$", "", text.strip())
corrected = json.loads(text)
return corrected
except Exception as e:
print(f" ⚠ VLM correction échouée ({e}), utilisation des données docTR")
return doctr_parsed
def merge_recueil(doctr_parsed, vlm_corrected):
"""Fusionne les résultats docTR et VLM, en préférant chacun pour ses forces"""
merged = json.loads(json.dumps(vlm_corrected)) # deep copy
# Préférer docTR pour les codes structurés quand ils sont valides
code_fields_to_protect = [
("ghm_etab", r"^\d{2}[A-Z]\d{3}$"),
("ghm_reco", r"^\d{2}[A-Z]\d{3}$"),
("ghs_etab", r"^\d{3,5}$"),
("ghs_reco", r"^\d{3,5}$"),
]
for field, pattern in code_fields_to_protect:
doctr_val = doctr_parsed.get(field, "")
vlm_val = merged.get(field, "")
# Si docTR a une valeur valide et VLM l'a changée, garder docTR
if doctr_val and re.match(pattern, doctr_val):
merged[field] = doctr_val
# Protéger les codes CIM valides de docTR dans codage_etab/reco
for section in ["codage_etab", "codage_reco"]:
if section in doctr_parsed and section in merged:
for code_field in ["dp", "dr"]:
doctr_code = doctr_parsed[section].get(code_field, "")
if doctr_code and re.match(r"^[A-Z]\d{2,4}\.?\d*", doctr_code):
# Garder docTR si valide, sauf si VLM a aussi un code valide
vlm_code = merged[section].get(code_field, "")
if not vlm_code or not re.match(r"^[A-Z]\d{2,4}\.?\d*", vlm_code):
merged[section][code_field] = doctr_code
return merged
# ============================================================
# Fonctions OCR
# ============================================================
def pdf_to_images(pdf_path, dpi=300):
"""Convertit un PDF en liste d'images PIL"""
doc = fitz.open(pdf_path)
images = []
mat = fitz.Matrix(dpi / 72, dpi / 72)
for page in doc:
pix = page.get_pixmap(matrix=mat)
img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
images.append(img)
doc.close()
return images
def crop_zone(image, zone):
"""Extrait une zone d'une image PIL (coordonnées relatives 0-1)"""
w, h = image.size
x1, y1, x2, y2 = zone
return image.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
def ocr_image(model, image):
"""OCR sur une image PIL, retourne les lignes avec confiance"""
crop_np = np.array(image)
result = model([crop_np])
lines = []
for page in result.pages:
for block in page.blocks:
for line in block.lines:
text = " ".join(w.value for w in line.words)
conf = sum(w.confidence for w in line.words) / len(line.words) if line.words else 0
lines.append({"text": text, "confidence": conf})
return lines
def ocr_zone(model, image, zone, debug_dir=None, zone_name=None):
"""OCR sur une zone spécifique"""
crop = crop_zone(image, zone)
if debug_dir and zone_name:
crop.save(os.path.join(debug_dir, f"zone_{zone_name}.png"))
return ocr_image(model, crop)
def extract_text(lines):
"""Concatène les lignes en texte simple"""
return " | ".join(l["text"] for l in lines if l["text"].strip())
def detect_page_type(model, image):
"""Détecte le type de page en cherchant des marqueurs textuels"""
header_zone = (0.020, 0.005, 0.950, 0.085)
lines = ocr_zone(model, image, header_zone)
header_text = extract_text(lines).upper()
if "RECUEIL DU PRATICIEN" in header_text:
return "recueil"
elif "CONCERTATION 2/2" in header_text:
return "concertation_2"
elif "CONCERTATION 1/2" in header_text:
return "concertation_1"
elif "HOSPITALISATION" in header_text:
return "commentaires"
elif "PREUVE" in header_text or "ELEMENT" in header_text:
return "elements_preuve"
elif "CONCERTATION" in header_text:
return "concertation_medicale"
else:
return "inconnu"
# ============================================================
# Extraction par type de page
# ============================================================
def extract_page_recueil(model, image, debug_dir=None):
"""Extrait les données de la fiche de recueil (page 1)"""
raw = {}
for zone_name, zone_coords in ZONES_PAGE1.items():
lines = ocr_zone(model, image, zone_coords, debug_dir, zone_name)
raw[zone_name] = extract_text(lines)
# Post-traitement structuré
ghm_ghs = parse_ghm_ghs_line(raw.get("ghm_ghs_ligne", ""))
das_etab = parse_das_bloc(raw.get("das_bloc_etab", ""))
das_reco = parse_das_bloc(raw.get("das_bloc_reco", ""))
actes_etab = parse_actes_bloc(raw.get("actes_bloc_etab", ""))
actes_reco = parse_actes_bloc(raw.get("actes_bloc_reco", ""))
# Détection accord/désaccord par analyse visuelle des checkboxes
accord = detect_accord_checkbox(image, debug_dir)
return {
"raw": raw,
"parsed": {
"etablissement": clean_value(raw["etablissement"]),
"finess": clean_value(raw["finess"]).split("|")[0].strip(),
"date_debut_controle": clean_value(raw["date_debut_controle"], "date"),
"n_ogc": clean_value(raw["n_ogc"], "number"),
"n_champ": clean_value(raw["n_champ"], "number"),
"dates_sejour": clean_value(raw["dates_sejour"], "dates_range"),
"sejour_etab": {
"age": clean_value(raw["age_etab"], "number"),
"sexe": clean_value(raw["sexe_etab"], "number"),
"duree_sejour": clean_value(raw["duree_sejour_etab"], "number"),
"mode_entree": clean_value(raw["mode_entree_etab"], "number"),
"provenance": clean_value(raw["provenance_etab"], "number"),
"mode_sortie": clean_value(raw["mode_sortie_etab"], "number"),
"destination": clean_value(raw["destination_etab"], "number"),
},
"sejour_reco": {
"age": clean_value(raw["age_reco"], "number"),
"sexe": clean_value(raw["sexe_reco"], "number"),
"duree_sejour": clean_value(raw["duree_sejour_reco"], "number"),
"mode_entree": clean_value(raw["mode_entree_reco"], "number"),
"provenance": clean_value(raw["provenance_reco"], "number"),
"mode_sortie": clean_value(raw["mode_sortie_reco"], "number"),
"destination": clean_value(raw["destination_reco"], "number"),
},
"rum_etab": {
"um": clean_value(raw["um_etab"]),
"igs": clean_value(raw["igs_etab"], "number"),
"duree": clean_value(raw["duree_rum_etab"], "number"),
"dates": clean_value(raw["dates_rum_etab"]),
},
"codage_etab": {
"dp": clean_value(raw["dp_code_etab"], "code_cim"),
"dp_libelle": clean_dp_libelle(raw["dp_libelle"]),
"dr": clean_value(raw["dr_code_etab"], "code_cim"),
"das": das_etab,
},
"codage_reco": {
"dp": clean_value(raw["dp_code_reco"], "code_cim"),
"dr": clean_value(raw["dr_code_reco"], "code_cim"),
"das": das_reco,
},
"actes_etab": actes_etab,
"actes_reco": actes_reco,
"ghm_etab": ghm_ghs["ghm_etab"],
"ghs_etab": ghm_ghs["ghs_etab"],
"ghm_reco": ghm_ghs["ghm_reco"],
"ghs_reco": ghm_ghs["ghs_reco"],
"recodage_impactant": clean_value(raw["recodage_impactant"], "number"),
"ghs_injustifie": clean_value(raw["ghs_injustifie"], "number"),
"accord_desaccord": accord,
"praticien_conseil": clean_value(raw["praticien_conseil"]),
},
}
def extract_page_concertation_1(model, image, debug_dir=None):
"""Extrait l'argumentaire du médecin contrôleur"""
raw = {}
for zone_name, zone_coords in ZONES_CONCERTATION_1.items():
lines = ocr_zone(model, image, zone_coords, debug_dir, f"conc1_{zone_name}")
raw[zone_name] = extract_text(lines)
return {
"raw": raw,
"parsed": {
"date_concertation": clean_value(raw.get("date_concertation_arg", ""), "date"),
"argumentaire": raw.get("argumentaire", ""),
},
}
def extract_page_concertation_2(model, image, debug_dir=None):
"""Extrait la décision finale après concertation"""
raw = {}
for zone_name, zone_coords in ZONES_CONCERTATION_2.items():
lines = ocr_zone(model, image, zone_coords, debug_dir, f"conc2_{zone_name}")
raw[zone_name] = extract_text(lines)
# Parser la ligne GHS depuis la zone corrigée
ghs_data = parse_ghs_concertation_line(raw.get("ghs_ligne", ""))
# Détecter la décision de concertation
accord_text = raw.get("accord_concertation", "")
decision = detect_accord_concertation(accord_text)
return {
"raw": raw,
"parsed": {
"ghs_initial": ghs_data["ghs_initial"],
"ghs_avant_concertation": ghs_data["ghs_avant_concertation"],
"ghs_final": ghs_data["ghs_final"],
"decision": decision,
"accord_concertation_raw": accord_text,
"date_concertation": clean_value(raw.get("date_concertation", ""), "date"),
},
}
# ============================================================
# Extraction complète d'un dossier OGC
# ============================================================
def extract_ogc(pdf_path, model, debug=False, use_vlm=True):
"""Extraction complète d'un dossier OGC"""
basename = os.path.splitext(os.path.basename(pdf_path))[0]
debug_dir = None
if debug:
debug_dir = os.path.join(os.path.dirname(pdf_path), "..", "debug_zones", basename)
os.makedirs(debug_dir, exist_ok=True)
print(f"\n{'='*60}")
print(f" Extraction: {basename}")
print(f"{'='*60}")
images = pdf_to_images(pdf_path)
# Auto-détection des types de pages
print(f" Détection des pages...")
page_types = {}
for i, img in enumerate(images):
ptype = detect_page_type(model, img)
page_types[i] = ptype
print(f" Page {i+1}: {ptype}")
result = {"fichier": basename, "recueil": None, "concertation_1": None, "concertation_2": None}
for page_idx, ptype in page_types.items():
if ptype == "recueil":
print(f" Extraction page {page_idx+1} (recueil)...")
recueil = extract_page_recueil(model, images[page_idx], debug_dir)
if use_vlm:
print(f" Correction VLM (recueil)...")
vlm_corrected = vlm_correct(
images[page_idx], recueil["parsed"],
PROMPT_CORRECTION_RECUEIL, "recueil"
)
recueil["parsed"] = merge_recueil(recueil["parsed"], vlm_corrected)
result["recueil"] = recueil
elif ptype == "concertation_1":
print(f" Extraction page {page_idx+1} (argumentaire)...")
conc1 = extract_page_concertation_1(model, images[page_idx], debug_dir)
if use_vlm:
print(f" Correction VLM (argumentaire)...")
vlm_corrected = vlm_correct(
images[page_idx], conc1["parsed"],
PROMPT_CORRECTION_CONCERTATION, "concertation_1"
)
conc1["parsed"] = vlm_corrected
result["concertation_1"] = conc1
elif ptype == "concertation_2":
print(f" Extraction page {page_idx+1} (décision finale)...")
conc2 = extract_page_concertation_2(model, images[page_idx], debug_dir)
if use_vlm:
print(f" Correction VLM (décision finale)...")
vlm_corrected = vlm_correct(
images[page_idx], conc2["parsed"],
PROMPT_CORRECTION_CONCERTATION, "concertation_2"
)
conc2["parsed"] = vlm_corrected
result["concertation_2"] = conc2
return result
def print_results(result):
"""Affiche les résultats de manière lisible"""
print(f"\n{''*60}")
print(f" RÉSULTATS: {result['fichier']}")
print(f"{''*60}")
rec = result.get("recueil", {})
if rec:
p = rec["parsed"]
print(f"\n [EN-TÊTE]")
print(f" Établissement : {p['etablissement']}")
print(f" FINESS : {p['finess']}")
print(f" Date contrôle : {p['date_debut_controle']}")
print(f" N° OGC : {p['n_ogc']}")
print(f" N° Champ : {p['n_champ']}")
print(f" Dates séjour : {p['dates_sejour']}")
se = p["sejour_etab"]
print(f"\n [SÉJOUR ÉTABLISSEMENT]")
print(f" Âge: {se['age']} Sexe: {se['sexe']} Durée: {se['duree_sejour']}")
print(f" Mode entrée: {se['mode_entree']} Prov: {se['provenance']} Mode sortie: {se['mode_sortie']} Dest: {se['destination']}")
sr = p["sejour_reco"]
print(f"\n [SÉJOUR RECODAGE]")
print(f" Âge: {sr['age']} Sexe: {sr['sexe']} Durée: {sr['duree_sejour']}")
print(f" Mode entrée: {sr['mode_entree']} Prov: {sr['provenance']} Mode sortie: {sr['mode_sortie']} Dest: {sr['destination']}")
rum = p["rum_etab"]
print(f"\n [RUM]")
print(f" UM: {rum['um']} IGS: {rum['igs']} Durée: {rum['duree']} Dates: {rum['dates']}")
ce = p.get("codage_etab", {})
print(f"\n [CODAGE ÉTABLISSEMENT]")
print(f" DP: {ce.get('dp','')} ({ce.get('dp_libelle','')})")
print(f" DR: {ce.get('dr','')}")
for das in ce.get("das", []):
print(f" DAS: {das.get('code','')} pos={das.get('position','')} {das.get('libelle','')}")
cr = p.get("codage_reco", {})
print(f"\n [RECODAGE]")
print(f" DP: {cr.get('dp','')}")
print(f" DR: {cr['dr']}")
for das in cr.get("das", []):
print(f" DAS: {das.get('code','')} pos={das.get('position','')} {das.get('libelle','')}")
if p.get("actes_etab"):
print(f"\n [ACTES ÉTABLISSEMENT]")
for a in p["actes_etab"]:
if isinstance(a, dict):
print(f" {a.get('code','')} pos={a.get('position','')} {a.get('libelle','')}")
else:
print(f" {a}")
if p.get("actes_reco"):
print(f"\n [ACTES RECODAGE]")
for a in p["actes_reco"]:
if isinstance(a, dict):
print(f" {a.get('code','')} pos={a.get('position','')} {a.get('libelle','')}")
else:
print(f" {a}")
print(f"\n [GHM/GHS]")
print(f" Établissement : GHM={p['ghm_etab']} GHS={p['ghs_etab']}")
print(f" Recodage : GHM={p['ghm_reco']} GHS={p['ghs_reco']}")
print(f"\n [DÉCISIONS]")
print(f" Recodage impactant : {p['recodage_impactant']}")
print(f" GHS injustifié : {p['ghs_injustifie']}")
print(f" Accord/Désaccord : {p['accord_desaccord']}")
print(f" Praticien : {p['praticien_conseil']}")
c2 = result.get("concertation_2", {})
if c2:
p2 = c2["parsed"]
print(f"\n [CONCERTATION — Décision finale]")
print(f" GHS initial : {p2['ghs_initial']}")
print(f" GHS avant concertation : {p2['ghs_avant_concertation']}")
print(f" GHS final : {p2['ghs_final']}")
print(f" Décision : {p2['decision']}")
print(f" Date : {p2['date_concertation']}")
c1 = result.get("concertation_1", {})
if c1:
p1 = c1["parsed"]
arg = p1.get("argumentaire", "")
print(f"\n [ARGUMENTAIRE]")
print(f" Date : {p1['date_concertation']}")
if arg:
for line in arg.split("|")[:5]:
print(f" {line.strip()}")
# ============================================================
# Export Excel consolidé
# ============================================================
def export_excel(all_results, output_path):
"""Exporte tous les résultats en un fichier Excel consolidé"""
try:
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
except ImportError:
print(" ERREUR: openpyxl requis pour l'export Excel. pip install openpyxl")
return
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "OGC Résultats"
# Styles
header_font = Font(bold=True, size=10, color="FFFFFF")
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
cell_align = Alignment(vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
# En-têtes
headers = [
"N° OGC", "N° Champ", "Établissement", "FINESS",
"Date contrôle", "Dates séjour",
# Séjour étab
"Âge étab", "Sexe étab", "Durée séjour étab",
"Mode entrée étab", "Provenance étab", "Mode sortie étab", "Destination étab",
# Séjour reco
"Âge reco", "Sexe reco", "Durée séjour reco",
"Mode entrée reco", "Provenance reco", "Mode sortie reco", "Destination reco",
# RUM
"UM étab", "IGS étab", "Durée RUM étab", "Dates RUM",
# Codage
"DP étab", "DP libellé", "DR étab",
"DAS étab",
"DP reco", "DR reco",
"DAS reco",
# Actes
"Actes étab", "Actes reco",
# GHM/GHS
"GHM étab", "GHS étab", "GHM reco", "GHS reco",
# Décisions
"Recodage impactant", "GHS injustifié",
"Accord/Désaccord", "Praticien",
# Concertation
"GHS initial conc.", "GHS avant conc.", "GHS final conc.",
"Décision conc.", "Date concertation",
"Argumentaire (extrait)",
]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# Données
for row_idx, result in enumerate(all_results, 2):
rec = result.get("recueil", {})
p = rec.get("parsed", {}) if rec else {}
se = p.get("sejour_etab", {})
sr = p.get("sejour_reco", {})
rum = p.get("rum_etab", {})
ce = p.get("codage_etab", {})
cr = p.get("codage_reco", {})
# Formater DAS/actes comme texte (robuste aux formats VLM variables)
def fmt_entry(d, with_libelle=False):
if isinstance(d, str):
return d
code = d.get("code", "")
pos = d.get("position", "")
lib = d.get("libelle", "") if with_libelle else ""
s = code
if pos:
s += f" (pos {pos})"
if lib:
s += f" {lib}"
return s
das_etab_str = "; ".join(fmt_entry(d, True) for d in ce.get("das", []))
das_reco_str = "; ".join(fmt_entry(d) for d in cr.get("das", []))
actes_etab_str = "; ".join(fmt_entry(a) for a in p.get("actes_etab", []))
actes_reco_str = "; ".join(fmt_entry(a) for a in p.get("actes_reco", []))
# Concertation
c2 = result.get("concertation_2", {})
p2 = c2.get("parsed", {}) if c2 else {}
c1 = result.get("concertation_1", {})
p1 = c1.get("parsed", {}) if c1 else {}
arg = p1.get("argumentaire", "")
# Tronquer l'argumentaire pour Excel
arg_short = arg.replace(" | ", "\n")[:500] if arg else ""
values = [
p.get("n_ogc", ""),
p.get("n_champ", ""),
p.get("etablissement", ""),
p.get("finess", ""),
p.get("date_debut_controle", ""),
p.get("dates_sejour", ""),
se.get("age", ""), se.get("sexe", ""), se.get("duree_sejour", ""),
se.get("mode_entree", ""), se.get("provenance", ""),
se.get("mode_sortie", ""), se.get("destination", ""),
sr.get("age", ""), sr.get("sexe", ""), sr.get("duree_sejour", ""),
sr.get("mode_entree", ""), sr.get("provenance", ""),
sr.get("mode_sortie", ""), sr.get("destination", ""),
rum.get("um", ""), rum.get("igs", ""), rum.get("duree", ""), rum.get("dates", ""),
ce.get("dp", ""), ce.get("dp_libelle", ""), ce.get("dr", ""),
das_etab_str,
cr.get("dp", ""), cr.get("dr", ""),
das_reco_str,
actes_etab_str, actes_reco_str,
p.get("ghm_etab", ""), p.get("ghs_etab", ""),
p.get("ghm_reco", ""), p.get("ghs_reco", ""),
p.get("recodage_impactant", ""), p.get("ghs_injustifie", ""),
p.get("accord_desaccord", ""), p.get("praticien_conseil", ""),
p2.get("ghs_initial", ""), p2.get("ghs_avant_concertation", ""),
p2.get("ghs_final", ""),
p2.get("decision", ""), p2.get("date_concertation", ""),
arg_short,
]
for col, value in enumerate(values, 1):
cell = ws.cell(row=row_idx, column=col, value=value)
cell.alignment = cell_align
cell.border = thin_border
# Largeurs de colonnes automatiques
for col in range(1, len(headers) + 1):
max_len = len(str(ws.cell(row=1, column=col).value))
for row in range(2, ws.max_row + 1):
val = ws.cell(row=row, column=col).value
if val:
max_len = max(max_len, min(len(str(val)), 40))
ws.column_dimensions[openpyxl.utils.get_column_letter(col)].width = max_len + 2
# Figer la première ligne
ws.freeze_panes = "A2"
wb.save(output_path)
print(f" Excel exporté: {output_path}")
# ============================================================
# Main — traitement de tous les PDFs
# ============================================================
if __name__ == "__main__":
pdf_dir = "/home/dom/ai/Aivanov_scan_ogc/2018 CARC"
output_dir = "/home/dom/ai/Aivanov_scan_ogc/output"
os.makedirs(output_dir, exist_ok=True)
print("Chargement du modèle docTR...")
model = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True)
# Tous les PDFs du répertoire
pdf_files = sorted(glob.glob(os.path.join(pdf_dir, "OGC *.pdf")))
print(f" {len(pdf_files)} fichiers PDF trouvés")
all_results = []
for pdf_path in pdf_files:
try:
result = extract_ogc(pdf_path, model, debug=True)
print_results(result)
all_results.append(result)
# JSON par dossier
json_path = os.path.join(output_dir, f"{result['fichier']}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f" ERREUR sur {os.path.basename(pdf_path)}: {e}")
import traceback
traceback.print_exc()
# Export Excel consolidé
if all_results:
excel_path = os.path.join(output_dir, "OGC_2018_CARC_resultats.xlsx")
export_excel(all_results, excel_path)
print(f"\n{'='*60}")
print(f" {len(all_results)} dossiers traités sur {len(pdf_files)}")
print(f" JSON individuels: {output_dir}/")
print(f" Excel consolidé: {output_dir}/OGC_2018_CARC_resultats.xlsx")
print(f"{'='*60}")