"""Aide à la décision de facturation urgences T2A/PMSI via LLM local. Décide si un passage aux urgences relève : - du FORFAIT_URGENCE (passage simple, retour à domicile) - de la REQUALIFICATION_HOSPITALISATION (séjour MCO, valorisation 1k-5k€+) Le prompt impose une extraction littérale des faits du DPI (pas d'invention) et une modulation honnête de la confiance. Validé sur 15 DPI synthétiques : qwen2.5:7b atteint 100 % d'accuracy en ~5 s/cas avec 4,7 Go VRAM. Voir docs/clients/ght_sud_95/ et demo/facturation_urgences/RESULTATS.md pour le bench comparatif des 11 LLMs évalués. """ from __future__ import annotations import json import logging import os import time import urllib.error import urllib.request from typing import Any, Dict logger = logging.getLogger(__name__) OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434/api/generate") DEFAULT_MODEL = os.environ.get("T2A_MODEL", "qwen2.5:7b") DEFAULT_TIMEOUT = 60 # secondes PROMPT_TEMPLATE = """Tu es médecin DIM (Département d'Information Médicale), expert en facturation T2A/PMSI aux urgences hospitalières en France. Analyse le dossier patient ci-dessous pour déterminer si le passage relève : - FORFAIT_URGENCE : passage simple, retour à domicile, sans surveillance prolongée ni soins continus - REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les critères PMSI/ATIH INSTRUCTIONS STRICTES : 1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère. 2. Identifie d'abord les éléments en faveur d'une hospitalisation, puis ceux en faveur d'un forfait, puis tranche. 3. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier. 4. Module ta confiance honnêtement : - "elevee" uniquement si tous les indices convergent - "moyenne" si éléments ambivalents - "faible" si information manquante ou très atypique Réponds STRICTEMENT en JSON valide, sans texte avant ni après : {{ "duree_passage_heures": , "elements_pour_hospitalisation": [], "elements_pour_forfait": [], "decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION", "justification": "<2-3 phrases s'appuyant explicitement sur les faits ci-dessus>", "confiance": "elevee" | "moyenne" | "faible" }} DOSSIER PATIENT : {dpi} """ def analyze_dpi( dpi_text: str, model: str = DEFAULT_MODEL, timeout: int = DEFAULT_TIMEOUT, ollama_url: str = OLLAMA_URL, ) -> Dict[str, Any]: """Soumet un DPI urgences à un LLM Ollama et retourne la décision JSON. Args: dpi_text: Texte du dossier patient (concaténation des onglets ou DPI brut). model: Modèle Ollama à utiliser (default qwen2.5:7b — 100% accuracy bench). timeout: Timeout HTTP en secondes. ollama_url: Endpoint Ollama (default localhost:11434/api/generate). Returns: Dict avec : decision: "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION" elements_pour_hospitalisation: List[str] elements_pour_forfait: List[str] duree_passage_heures: float justification: str confiance: "elevee" | "moyenne" | "faible" _elapsed_s: float (latence) _model: str En cas d'erreur : {"_error": str, "_elapsed_s": float} (réseau / Ollama indisponible) {"_parse_error": True, "_raw": str, "_elapsed_s": float} (JSON invalide) """ payload = { "model": model, "prompt": PROMPT_TEMPLATE.format(dpi=dpi_text), "stream": False, "format": "json", "keep_alive": "5m", "options": { "temperature": 0.1, "num_predict": 4000, "num_ctx": 4096, "reasoning_effort": "minimal", }, } data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( ollama_url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=timeout) as resp: body = json.loads(resp.read().decode("utf-8")) except (urllib.error.URLError, TimeoutError, ConnectionError) as e: elapsed = round(time.time() - t0, 1) logger.warning("analyze_dpi: Ollama indisponible (%s) après %.1fs", e, elapsed) return {"_error": str(e), "_elapsed_s": elapsed, "_model": model} elapsed = time.time() - t0 raw_response = body.get("response", "").strip() raw_thinking = body.get("thinking", "").strip() candidates = [raw_response] if not raw_response and raw_thinking: last_close = raw_thinking.rfind("}") last_open = raw_thinking.rfind("{", 0, last_close) if last_open != -1 and last_close != -1: candidates.append(raw_thinking[last_open:last_close + 1]) parsed = None for cand in candidates: cleaned = cand if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[-1] if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: parsed = json.loads(cleaned) break except json.JSONDecodeError: continue if parsed is None: return { "_parse_error": True, "_raw": (raw_response or raw_thinking)[:500], "_elapsed_s": round(elapsed, 1), "_model": model, } parsed["_elapsed_s"] = round(elapsed, 1) parsed["_model"] = model parsed["_eval_count"] = body.get("eval_count") return parsed