Initial commit with full extraction pipeline: PDF OCR (docTR), text segmentation, LLM extraction (Ollama), deterministic post-processing normalizer, validation, and Excel/CSV export. The normalizer fixes OCR/LLM errors on CIM-10 codes: - OCR digit→letter confusion in position 1 (1→I, 0→O, 5→S, 2→Z, 8→B) - Missing dot separator (F050→F05.0, R410→R41.0) - '+' instead of '.' (B99+1→B99.1, J961+0→J96.10) - Excess decimals (Z04.880→Z04.88) - OCR letter→digit in positions 2-3 (LO2.2→L02.2) - Literal "null" string purge - Auto-fill codes_retenus from decision context Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
280 lines
10 KiB
Python
280 lines
10 KiB
Python
"""
|
|
Extraction structurée des données OGC via VLM (Ollama).
|
|
Envoie chaque bloc de texte au modèle et parse la réponse JSON.
|
|
"""
|
|
import json
|
|
import re
|
|
import logging
|
|
import requests
|
|
from dataclasses import dataclass
|
|
|
|
from config import OLLAMA_BASE_URL, OLLAMA_MODEL, OLLAMA_TIMEOUT, OLLAMA_MAX_RETRIES
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Prompt système pour l'extraction
|
|
SYSTEM_PROMPT = """Tu es un expert en codage PMSI et contrôle T2A.
|
|
Tu extrais des données structurées depuis des rapports de décision UCR (Unité de Coordination Régionale).
|
|
|
|
Tu dois retourner UNIQUEMENT un objet JSON valide, sans aucun texte avant ou après.
|
|
Pas de markdown, pas de commentaires, pas de ```json```.
|
|
|
|
Le JSON doit respecter exactement ce schéma :
|
|
{
|
|
"type_desaccord": "DP" | "DAS" | "DP+DAS" | "Actes" | null,
|
|
"codes_etablissement": "code(s) CIM-10 ou CCAM séparés par des virgules" | null,
|
|
"libelle_etablissement": "libellé/description du codage établissement" | null,
|
|
"codes_controleurs": "code(s) CIM-10 ou CCAM séparés par des virgules" | null,
|
|
"libelle_controleurs": "libellé/description du codage contrôleurs" | null,
|
|
"decision_ucr": "Favorable" | "Défavorable",
|
|
"codes_retenus": "code(s) finalement retenus par l'UCR" | null,
|
|
"ghm_ghs": "GHM et/ou GHS mentionnés (ex: 07C133 / GHS 2349)" | null,
|
|
"texte_decision": "texte intégral de la décision UCR, copié tel quel"
|
|
}
|
|
|
|
Règles importantes :
|
|
- "decision_ucr" : "Favorable" = l'UCR retient/confirme la position de l'établissement. "Défavorable" = l'UCR confirme l'avis des médecins contrôleurs ou rejette la demande de l'établissement.
|
|
- "codes_etablissement" : uniquement les codes (K85.1, T81.0, ZZQK002...), PAS les libellés
|
|
- "libelle_etablissement" : le texte descriptif du codage (ex: "pancréatite aigüe d'origine biliaire")
|
|
- "codes_controleurs" : idem, uniquement les codes. Si "non repris" ou "DAS non repris", mettre null
|
|
- "codes_retenus" : les codes qui résultent de la décision finale de l'UCR. Ce champ ne doit JAMAIS être vide : si Défavorable, retenir les codes contrôleurs ; si Favorable, retenir les codes établissement.
|
|
- "ghm_ghs" : extraire si mentionné dans le texte, sinon null
|
|
- "texte_decision" : le paragraphe complet commençant par "DECISION UCR" ou "PROPOSITION UCR", copié intégralement (minimum 50 caractères). Inclure tout le paragraphe de décision, pas un résumé.
|
|
- Les codes CIM-10 commencent TOUJOURS par une lettre majuscule (A-Z), jamais par un chiffre. Si l'OCR a lu "167.3", c'est "I67.3". Si "085", c'est "O85". Corriger systématiquement.
|
|
- Pour les OGC groupés, la même décision s'applique à tous les OGC du groupe
|
|
"""
|
|
|
|
USER_PROMPT_TEMPLATE = """Extrais les données structurées de ce bloc de rapport UCR.
|
|
|
|
Champ : {champ}
|
|
OGC concerné(s) : {ogc_numbers}
|
|
|
|
--- TEXTE DU BLOC ---
|
|
{block_text}
|
|
--- FIN DU TEXTE ---
|
|
|
|
Retourne UNIQUEMENT le JSON structuré."""
|
|
|
|
|
|
@dataclass
|
|
class OGCExtraction:
|
|
"""Données extraites d'un bloc OGC."""
|
|
champ: int
|
|
num_ogc: int
|
|
type_desaccord: str | None
|
|
codes_etablissement: str | None
|
|
libelle_etablissement: str | None
|
|
codes_controleurs: str | None
|
|
libelle_controleurs: str | None
|
|
decision_ucr: str | None
|
|
codes_retenus: str | None
|
|
ghm_ghs: str | None
|
|
texte_decision: str | None
|
|
extraction_success: bool = True
|
|
error_message: str | None = None
|
|
_raw_block_text: str | None = None
|
|
|
|
|
|
def _call_ollama(system_prompt: str, user_prompt: str) -> str:
|
|
"""Appelle l'API Ollama et retourne la réponse texte."""
|
|
url = f"{OLLAMA_BASE_URL}/api/chat"
|
|
payload = {
|
|
"model": OLLAMA_MODEL,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt},
|
|
],
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1, # Extraction factuelle → température basse
|
|
"num_predict": 4096,
|
|
},
|
|
}
|
|
|
|
response = requests.post(url, json=payload, timeout=OLLAMA_TIMEOUT)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["message"]["content"]
|
|
|
|
|
|
def _parse_json_response(response_text: str) -> dict | None:
|
|
"""
|
|
Parse la réponse du VLM en JSON.
|
|
Gère les cas où le modèle entoure le JSON de markdown.
|
|
"""
|
|
text = response_text.strip()
|
|
|
|
# Supprimer les blocs markdown ```json ... ```
|
|
json_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', text, re.DOTALL)
|
|
if json_match:
|
|
text = json_match.group(1).strip()
|
|
|
|
# Essayer de trouver un objet JSON dans le texte
|
|
# Chercher la première { et la dernière }
|
|
first_brace = text.find('{')
|
|
last_brace = text.rfind('}')
|
|
if first_brace != -1 and last_brace != -1:
|
|
text = text[first_brace:last_brace + 1]
|
|
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Échec parsing JSON : {e}")
|
|
logger.debug(f"Réponse brute : {response_text[:500]}")
|
|
return None
|
|
|
|
|
|
def _normalize_decision(decision: str | None) -> str | None:
|
|
"""Normalise la décision en Favorable/Défavorable."""
|
|
if not decision:
|
|
return None
|
|
d = decision.strip().lower()
|
|
if d in ("favorable", "favorable établissement", "favorable etab"):
|
|
return "Favorable"
|
|
if d in ("défavorable", "defavorable", "défavorable établissement", "defavorable etab"):
|
|
return "Défavorable"
|
|
# Heuristiques
|
|
if "favorable" in d and "défavorable" not in d and "defavorable" not in d:
|
|
return "Favorable"
|
|
if "défavorable" in d or "defavorable" in d:
|
|
return "Défavorable"
|
|
return decision # Garder tel quel si non reconnu
|
|
|
|
|
|
def _normalize_type_desaccord(type_d: str | None) -> str | None:
|
|
"""Normalise le type de désaccord."""
|
|
if not type_d:
|
|
return None
|
|
t = type_d.strip().upper()
|
|
if t in ("DP", "DAS", "ACTES"):
|
|
return t
|
|
if "DP" in t and "DAS" in t:
|
|
return "DP+DAS"
|
|
if t in ("DP+DAS", "DP ET DAS"):
|
|
return "DP+DAS"
|
|
return type_d
|
|
|
|
|
|
def extract_ogc_block(champ: int, ogc_numbers: list[int], block_text: str) -> list[OGCExtraction]:
|
|
"""
|
|
Extrait les données structurées d'un bloc OGC via le VLM.
|
|
Retourne une extraction par numéro OGC (dédoublonnage pour les groupés).
|
|
"""
|
|
user_prompt = USER_PROMPT_TEMPLATE.format(
|
|
champ=champ,
|
|
ogc_numbers=", ".join(str(n) for n in ogc_numbers),
|
|
block_text=block_text,
|
|
)
|
|
|
|
results = []
|
|
parsed_data = None
|
|
|
|
for attempt in range(1, OLLAMA_MAX_RETRIES + 1):
|
|
try:
|
|
logger.debug(f"OGC {ogc_numbers} — tentative {attempt}")
|
|
response_text = _call_ollama(SYSTEM_PROMPT, user_prompt)
|
|
parsed_data = _parse_json_response(response_text)
|
|
|
|
if parsed_data:
|
|
break
|
|
|
|
logger.warning(f"OGC {ogc_numbers} — réponse non parsable, retry...")
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"OGC {ogc_numbers} — timeout (tentative {attempt})")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"OGC {ogc_numbers} — erreur réseau : {e}")
|
|
break
|
|
|
|
if not parsed_data:
|
|
# Échec total : créer des entrées d'erreur
|
|
for num in ogc_numbers:
|
|
results.append(OGCExtraction(
|
|
champ=champ,
|
|
num_ogc=num,
|
|
type_desaccord=None,
|
|
codes_etablissement=None,
|
|
libelle_etablissement=None,
|
|
codes_controleurs=None,
|
|
libelle_controleurs=None,
|
|
decision_ucr=None,
|
|
codes_retenus=None,
|
|
ghm_ghs=None,
|
|
texte_decision=block_text, # On garde au moins le texte brut
|
|
extraction_success=False,
|
|
error_message="Échec extraction VLM après retries",
|
|
_raw_block_text=block_text,
|
|
))
|
|
return results
|
|
|
|
# Créer une extraction par OGC
|
|
for num in ogc_numbers:
|
|
results.append(OGCExtraction(
|
|
champ=champ,
|
|
num_ogc=num,
|
|
type_desaccord=_normalize_type_desaccord(parsed_data.get("type_desaccord")),
|
|
codes_etablissement=parsed_data.get("codes_etablissement"),
|
|
libelle_etablissement=parsed_data.get("libelle_etablissement"),
|
|
codes_controleurs=parsed_data.get("codes_controleurs"),
|
|
libelle_controleurs=parsed_data.get("libelle_controleurs"),
|
|
decision_ucr=_normalize_decision(parsed_data.get("decision_ucr")),
|
|
codes_retenus=parsed_data.get("codes_retenus"),
|
|
ghm_ghs=parsed_data.get("ghm_ghs"),
|
|
texte_decision=parsed_data.get("texte_decision"),
|
|
extraction_success=True,
|
|
_raw_block_text=block_text,
|
|
))
|
|
|
|
return results
|
|
|
|
|
|
def extract_champ_block(champ: int, block_text: str) -> OGCExtraction:
|
|
"""
|
|
Extrait les données d'un bloc Champ (décision globale sans OGC individuels).
|
|
"""
|
|
extractions = extract_ogc_block(champ, [0], block_text)
|
|
if extractions:
|
|
extraction = extractions[0]
|
|
extraction.num_ogc = None # Pas de numéro OGC pour un champ global
|
|
return extraction
|
|
|
|
return OGCExtraction(
|
|
champ=champ,
|
|
num_ogc=None,
|
|
type_desaccord=None,
|
|
codes_etablissement=None,
|
|
libelle_etablissement=None,
|
|
codes_controleurs=None,
|
|
libelle_controleurs=None,
|
|
decision_ucr=None,
|
|
codes_retenus=None,
|
|
ghm_ghs=None,
|
|
texte_decision=block_text,
|
|
extraction_success=False,
|
|
error_message="Échec extraction VLM pour bloc champ",
|
|
)
|
|
|
|
|
|
def check_ollama_available() -> bool:
|
|
"""Vérifie que Ollama est accessible et que le modèle est chargé."""
|
|
try:
|
|
# Vérifier la connexion
|
|
response = requests.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=5)
|
|
response.raise_for_status()
|
|
models = response.json().get("models", [])
|
|
model_names = [m["name"] for m in models]
|
|
|
|
if OLLAMA_MODEL in model_names or any(OLLAMA_MODEL in name for name in model_names):
|
|
logger.info(f"Ollama OK — modèle {OLLAMA_MODEL} disponible")
|
|
return True
|
|
|
|
logger.error(f"Modèle {OLLAMA_MODEL} non trouvé. Modèles disponibles : {model_names}")
|
|
return False
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
logger.error(f"Ollama non accessible à {OLLAMA_BASE_URL}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Erreur vérification Ollama : {e}")
|
|
return False
|