Backup état complet après enregistrement vidéo démo de bout en bout. À utiliser comme point de référence pour la consolidation post-démo. Changements majeurs de la session 18-19 mai : - AIVA-URGENCE : page autonome avec preset URL + auto-focus chain - Workflow Demo_urgence_3_db : merge linux_db + steps AIVA + pause humaine NoMachine - Bypass LLM (static_result / static_text) dans replay_engine pour démos déterministes sans appel Ollama - Fix api_stream:3013 — replay_paused au premier polling /next - dag_execute : lift duration_ms vers top-level pour wait runtime - NPM bypass auth /aiva-urgence/ via location ^~ (proxy_host/10.conf hors git) - scripts/cancel-replays.sh — workaround Stop VWB qui ne purge pas la queue Anchors visuels (468) forcés dans le commit pour garantir restorabilité. DB workflows actuelle + ~12 .bak DB de la journée incluses. Sujets identifiés pour consolidation post-démo (TODO) : 1. Bug VWB recapture anchor ne régénère pas le PNG 2. Léa client accumule état mémoire (restart périodique requis) 3. Stop VWB ne purge pas la queue serveur (lien manquant vers /replay/cancel) 4. Bug coord client mss tronqué 2560x60 → mapping Y cassé 5. delay_before/delay_after ignorés au runtime (fix partiel duration_ms) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
663 lines
30 KiB
Python
663 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""Bench T2A décision baseline vs post-fix DIM (11 dossiers GHT Sud 95).
|
|
|
|
Wrapper hors-prod (n'altère pas core/llm/t2a_decision.py ni urgences_orchestrator.py).
|
|
|
|
Étapes :
|
|
1. Parse data.js (regex JS, on n'évalue pas le JS) pour reconstruire un DPI texte
|
|
fidèle à ce que `extract_text` produirait depuis l'écran maquette.
|
|
2. Lance N inférences/dossier avec :
|
|
- Baseline : prompt actuel `core/llm/t2a_decision.py` + DEFAULT_MODEL=qwen2.5:7b
|
|
- Post-fix : prompt enrichi (QW1 règle 3/3, QW2 RPU/CCMU/GEMSA/durée,
|
|
QW3 type_forfait, QW4 non-admission, QW5 confiance/critère)
|
|
+ modèle gemma3:27b-cloud
|
|
3. Sauve les résultats bruts JSON, calcule accuracy + matrice + justifs scoring.
|
|
|
|
Usage :
|
|
python tools/bench_t2a_post_fix.py --runs 3 --mode baseline --model qwen2.5:7b
|
|
python tools/bench_t2a_post_fix.py --runs 3 --mode postfix --model gemma3:27b-cloud
|
|
python tools/bench_t2a_post_fix.py --all # baseline + postfix
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
DATA_JS = ROOT / "docs/clients/ght_sud_95/mockup_easily_assure/data.js"
|
|
RESULTS_DIR = ROOT / "tools" / "_bench_t2a_out"
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
OLLAMA_URL = "http://localhost:11434/api/generate"
|
|
TIMEOUT = 240 # le cloud peut prendre ~30s, on laisse de la marge
|
|
|
|
ORDRE_DOSSIERS = [
|
|
("25003284", "Pneumopathie VRS — terrain coronaire 78a, 3h37 sortie domicile", "FORFAIT_URGENCE", "Standard"),
|
|
("25003362", "Intoxication accidentelle enfant 3 ans, 4h41", "FORFAIT_URGENCE", "PE2"),
|
|
("25003364", "Pneumopathie sur SLA + BPCO, 7h35 mutation pneumo", "REQUALIFICATION_HOSPITALISATION", None),
|
|
("25003451", "Plaie suturée enfant 3 ans, 2h00 (CCMU 2 + suture)", "FORFAIT_URGENCE", "SU2"),
|
|
("25003475", "Suspicion AVC → aura migraineuse 34a, 4h03", "REQUALIFICATION_HOSPITALISATION", None),
|
|
("25005866", "TC commotionnel hockey 17a, 12h01 surveillance neuro", "REQUALIFICATION_HOSPITALISATION", None),
|
|
("25010621", "Laryngite enfant 5 ans, 2h49 (CCMU 2 pédiatrique)", "FORFAIT_URGENCE", "PE2"),
|
|
("25012257", "Douleur abdo polypath 76a 7h20 (transfert Embruns)", "REQUALIFICATION_HOSPITALISATION", None),
|
|
("25048485", "1ère CTCG ado 13a, 6h50 (CCMU 2 pédiatrique)", "FORFAIT_URGENCE", "PE2"),
|
|
("25056615", "Salpingite 39a transfert gynéco GEMSA 5, 4h30", "FORFAIT_URGENCE", "Standard"),
|
|
("25151530", "Colique néphrétique 58a 6h21 sortie domicile", "FORFAIT_URGENCE", "Standard"),
|
|
]
|
|
|
|
# ─────────────────────────────────────────────────────────────────────
|
|
# Loader data.js → DPI texte
|
|
# ─────────────────────────────────────────────────────────────────────
|
|
|
|
# data.js est du JS littéral. Pour éviter l'embarquement d'un parser JS, on
|
|
# utilise une approche pragmatique : on extrait les blocs par regex sur
|
|
# l'IPP, puis on aplatit le JS-quasi-JSON en texte lisible.
|
|
|
|
def strip_html(s: str) -> str:
|
|
"""Retire les <b>, <br>, etc. — proche de ce que extract_text rendrait."""
|
|
s = re.sub(r"<br\s*/?>", "\n", s, flags=re.IGNORECASE)
|
|
s = re.sub(r"<[^>]+>", "", s)
|
|
s = s.replace(" ", " ")
|
|
s = re.sub(r"\n\s*\n+", "\n\n", s)
|
|
return s.strip()
|
|
|
|
|
|
def extract_block(text: str, ipp: str) -> str:
|
|
"""Récupère le bloc JS associé à `"<ipp>": { ... },` en équilibrant les accolades."""
|
|
pat = re.compile(rf'"{ipp}"\s*:\s*\{{')
|
|
m = pat.search(text)
|
|
if not m:
|
|
raise ValueError(f"Bloc {ipp} introuvable")
|
|
start = m.end() - 1 # position du `{` ouvrant
|
|
depth = 0
|
|
in_str = False
|
|
in_template = False
|
|
escape = False
|
|
i = start
|
|
while i < len(text):
|
|
c = text[i]
|
|
if escape:
|
|
escape = False
|
|
i += 1
|
|
continue
|
|
if c == "\\":
|
|
escape = True
|
|
i += 1
|
|
continue
|
|
if in_str:
|
|
if c == in_str:
|
|
in_str = False
|
|
elif in_template:
|
|
if c == "`":
|
|
in_template = False
|
|
else:
|
|
if c == '"' or c == "'":
|
|
in_str = c
|
|
elif c == "`":
|
|
in_template = True
|
|
elif c == "{":
|
|
depth += 1
|
|
elif c == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
return text[start : i + 1]
|
|
i += 1
|
|
raise ValueError(f"Bloc {ipp} non clos")
|
|
|
|
|
|
def _js_unescape(s: str) -> str:
|
|
"""Décode les escapes JS courants sans casser l'UTF-8 (les caractères
|
|
accentués sont déjà en utf-8 dans le fichier source)."""
|
|
# \\n \\t \\" \\' \\\\ \\u00xx
|
|
def repl(m):
|
|
c = m.group(0)
|
|
if c == r"\n": return "\n"
|
|
if c == r"\t": return "\t"
|
|
if c == r"\r": return "\r"
|
|
if c == r"\"": return "\""
|
|
if c == r"\'": return "'"
|
|
if c == r"\\": return "\\"
|
|
if c.startswith(r"\u"):
|
|
return chr(int(c[2:], 16))
|
|
return c
|
|
return re.sub(r"\\u[0-9a-fA-F]{4}|\\[ntr\"'\\]", repl, s)
|
|
|
|
|
|
def extract_field(block: str, name: str, kind: str = "string") -> str | None:
|
|
"""Extrait `name: "..."` ou `name: \\`...\\`` du bloc."""
|
|
if kind == "string":
|
|
# double-quote string
|
|
m = re.search(rf'\b{re.escape(name)}\s*:\s*"((?:[^"\\]|\\.)*)"', block)
|
|
if m:
|
|
return _js_unescape(m.group(1))
|
|
# template literal
|
|
m = re.search(rf'\b{re.escape(name)}\s*:\s*`((?:[^`\\]|\\.)*)`', block)
|
|
if m:
|
|
return _js_unescape(m.group(1))
|
|
return None
|
|
|
|
|
|
def extract_recap_rpu(block: str) -> list[tuple[str, str]]:
|
|
"""recap_rpu: [["Mode de venue", "Véhicule personnel"], ...]"""
|
|
m = re.search(r"recap_rpu\s*:\s*\[(.*?)\]\s*\}", block, flags=re.DOTALL)
|
|
if not m:
|
|
return []
|
|
inner = m.group(1)
|
|
pairs = re.findall(r'\[\s*"((?:[^"\\]|\\.)*)"\s*,\s*"((?:[^"\\]|\\.)*)"\s*\]', inner)
|
|
return [(strip_html(k), strip_html(v)) for k, v in pairs]
|
|
|
|
|
|
def extract_signes_vitaux(block: str) -> list[tuple[str, str, str]]:
|
|
"""signes_vitaux: [{item, v1, v2}, ...] avec dates en signes_vitaux_dates."""
|
|
m_dates = re.search(r"signes_vitaux_dates\s*:\s*\[((?:[^\]])*)\]", block)
|
|
dates = []
|
|
if m_dates:
|
|
dates = re.findall(r'"((?:[^"\\]|\\.)*)"', m_dates.group(1))
|
|
m = re.search(r"signes_vitaux\s*:\s*\[(.*?)\]\s*\}", block, flags=re.DOTALL)
|
|
rows = []
|
|
if m:
|
|
block_rows = m.group(1)
|
|
|
|
def _ext(row: str, key: str) -> str:
|
|
# Tente double-quote, single-quote, template — accepte les contenus
|
|
# avec quotes mixtes (HTML <span class="...">)
|
|
for quote in ('"', "'", '`'):
|
|
pat = rf'{key}\s*:\s*{re.escape(quote)}((?:(?!{re.escape(quote)}).)*){re.escape(quote)}'
|
|
mm = re.search(pat, row, flags=re.DOTALL)
|
|
if mm:
|
|
return mm.group(1)
|
|
return ""
|
|
|
|
for row in re.findall(r"\{[^}]*\}", block_rows):
|
|
rows.append((
|
|
strip_html(_ext(row, "item")) or "?",
|
|
strip_html(_ext(row, "v1")),
|
|
strip_html(_ext(row, "v2")),
|
|
))
|
|
return rows, dates
|
|
|
|
|
|
def extract_diagnostics(block: str) -> list[str]:
|
|
"""diagnostics: [{code, type, ...}]"""
|
|
m = re.search(r"diagnostics\s*:\s*\[(.*?)\]", block, flags=re.DOTALL)
|
|
if not m:
|
|
return []
|
|
inner = m.group(1)
|
|
out = []
|
|
for cell in re.findall(r"\{[^}]*\}", inner):
|
|
code = re.search(r'code\s*:\s*"((?:[^"\\]|\\.)*)"', cell)
|
|
if code:
|
|
out.append(strip_html(code.group(1)))
|
|
return out
|
|
|
|
|
|
def extract_notes_medicales(block: str) -> list[dict]:
|
|
m = re.search(r"notes_medicales\s*:\s*\[(.*?)\]\s*,\s*//", block, flags=re.DOTALL)
|
|
if not m:
|
|
# fallback: chercher fermeture par "synthese"
|
|
m = re.search(r"notes_medicales\s*:\s*\[(.*?)\]\s*,?\s*synthese", block, flags=re.DOTALL)
|
|
if not m:
|
|
return []
|
|
inner = m.group(1)
|
|
notes = []
|
|
# split pragmatique sur "type:"
|
|
for chunk in re.split(r"\}\s*,\s*\{", inner):
|
|
chunk = "{" + chunk.strip("{} ,\n") + "}"
|
|
date = re.search(r'date\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
|
|
type_ = re.search(r'type\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
|
|
par = re.search(r'par\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
|
|
contenu = re.search(r"contenu\s*:\s*`((?:[^`\\]|\\.)*)`", chunk, flags=re.DOTALL)
|
|
if not contenu:
|
|
contenu = re.search(r'contenu\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
|
|
if contenu:
|
|
notes.append({
|
|
"date": date.group(1) if date else "",
|
|
"type": type_.group(1) if type_ else "",
|
|
"par": par.group(1) if par else "",
|
|
"contenu": strip_html(contenu.group(1)),
|
|
})
|
|
return notes
|
|
|
|
|
|
def extract_examens_questionnaires(block: str) -> list[dict]:
|
|
"""examens.questionnaires: [{nom, reponse, ...}]"""
|
|
m = re.search(r"questionnaires\s*:\s*\[(.*?)\]", block, flags=re.DOTALL)
|
|
if not m:
|
|
return []
|
|
inner = m.group(1)
|
|
out = []
|
|
for chunk in re.split(r"\}\s*,\s*\{", inner):
|
|
chunk = "{" + chunk.strip("{} ,\n") + "}"
|
|
nom = re.search(r'nom\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
|
|
rep = re.search(r"reponse\s*:\s*`((?:[^`\\]|\\.)*)`", chunk, flags=re.DOTALL)
|
|
if rep:
|
|
out.append({
|
|
"nom": nom.group(1) if nom else "",
|
|
"reponse": strip_html(rep.group(1)),
|
|
})
|
|
return out
|
|
|
|
|
|
def extract_notes_paramedicales(block: str) -> list[dict]:
|
|
m = re.search(r"notes_paramedicales\s*:\s*\[(.*?)\]\s*\}", block, flags=re.DOTALL)
|
|
if not m:
|
|
return []
|
|
inner = m.group(1)
|
|
out = []
|
|
for chunk in re.split(r"\}\s*,\s*\{", inner):
|
|
chunk = "{" + chunk.strip("{} ,\n") + "}"
|
|
contenu = re.search(r"contenu\s*:\s*`((?:[^`\\]|\\.)*)`", chunk, flags=re.DOTALL)
|
|
if contenu:
|
|
par = re.search(r'par\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
|
|
out.append({
|
|
"par": par.group(1) if par else "",
|
|
"contenu": strip_html(contenu.group(1)),
|
|
})
|
|
return out
|
|
|
|
|
|
def build_dpi_text(ipp: str, raw: str) -> str:
|
|
"""Construit un texte DPI plausible depuis le bloc data.js."""
|
|
block = extract_block(raw, ipp)
|
|
|
|
nom = extract_field(block, "nom") or ""
|
|
prenom = extract_field(block, "prenom") or ""
|
|
age = extract_field(block, "age") or ""
|
|
sexe = extract_field(block, "sexe") or ""
|
|
arrivee = extract_field(block, "arrivee") or ""
|
|
sortie = extract_field(block, "sortie") or ""
|
|
motif_court = extract_field(block, "motif_court") or ""
|
|
obs_ide = extract_field(block, "obs_ide") or ""
|
|
diagnostics = extract_diagnostics(block)
|
|
notes = extract_notes_medicales(block)
|
|
examens = extract_examens_questionnaires(block)
|
|
notes_param = extract_notes_paramedicales(block)
|
|
rpu = extract_recap_rpu(block)
|
|
constantes, dates = extract_signes_vitaux(block)
|
|
ccmu = extract_field(block, "ccmu") or ""
|
|
gemsa = extract_field(block, "gemsa") or ""
|
|
diag_synth = extract_field(block, "diagnostics_synthese") or ""
|
|
decision = extract_field(block, "decision") or ""
|
|
orientation = extract_field(block, "orientation") or ""
|
|
us_dest = extract_field(block, "us_destination") or ""
|
|
motif_pec = extract_field(block, "motif_pec") or ""
|
|
mode_transport = extract_field(block, "mode_transport") or ""
|
|
mode_entree = extract_field(block, "mode_entree") or ""
|
|
|
|
lines = []
|
|
lines.append(f"=== DOSSIER PATIENT IPP {ipp} ===")
|
|
lines.append(f"Identité : {nom} {prenom} ({sexe}, {age})")
|
|
lines.append(f"Arrivée : {arrivee}")
|
|
lines.append(f"Sortie : {sortie}")
|
|
lines.append(f"Motif : {motif_court}")
|
|
lines.append("")
|
|
|
|
lines.append("--- ONGLET MOTIF / IDE ---")
|
|
if obs_ide:
|
|
lines.append("Observation IDE :")
|
|
lines.append(strip_html(obs_ide))
|
|
lines.append("")
|
|
if diagnostics:
|
|
lines.append("Diagnostics :")
|
|
for d in diagnostics:
|
|
lines.append(f" - {d}")
|
|
lines.append("")
|
|
if constantes:
|
|
lines.append("Signes vitaux (par colonne datée) :")
|
|
if dates:
|
|
lines.append(f" Dates colonnes : {' | '.join(dates)}")
|
|
for item, v1, v2 in constantes:
|
|
lines.append(f" - {item} : V1={v1 or '—'} | V2={v2 or '—'}")
|
|
lines.append("")
|
|
|
|
if examens:
|
|
lines.append("--- ONGLET EXAMENS CLINIQUES ---")
|
|
for e in examens:
|
|
lines.append(f"[{e['nom']}]")
|
|
lines.append(e["reponse"])
|
|
lines.append("")
|
|
if notes_param:
|
|
lines.append("--- NOTES PARAMÉDICALES ---")
|
|
for n in notes_param:
|
|
lines.append(f"[par {n['par']}]")
|
|
lines.append(n["contenu"])
|
|
lines.append("")
|
|
|
|
if notes:
|
|
lines.append("--- ONGLET NOTES MÉDICALES ---")
|
|
for n in notes:
|
|
lines.append(f"[{n['date']} — {n['type']} — {n['par']}]")
|
|
lines.append(n["contenu"])
|
|
lines.append("")
|
|
|
|
lines.append("--- ONGLET SYNTHÈSE URGENCES (RPU) ---")
|
|
if mode_transport:
|
|
lines.append(f"Mode de venue : {mode_transport}")
|
|
if mode_entree:
|
|
lines.append(f"Mode d'entrée : {mode_entree}")
|
|
if motif_pec:
|
|
lines.append(f"Motif PEC : {motif_pec}")
|
|
if ccmu:
|
|
lines.append(f"CCMU : {ccmu}")
|
|
if gemsa:
|
|
lines.append(f"GEMSA : {gemsa}")
|
|
if diag_synth:
|
|
lines.append(f"Diagnostic principal RPU : {diag_synth}")
|
|
if decision:
|
|
lines.append(f"Décision médicale : {decision}")
|
|
if orientation:
|
|
lines.append(f"Orientation : {orientation}")
|
|
if us_dest:
|
|
lines.append(f"Destination : {us_dest}")
|
|
|
|
if rpu:
|
|
lines.append("")
|
|
lines.append("Récapitulatif RPU :")
|
|
for k, v in rpu:
|
|
lines.append(f" - {k} : {v}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────
|
|
# Prompts
|
|
# ─────────────────────────────────────────────────────────────────────
|
|
|
|
PROMPT_BASELINE = """Tu es médecin DIM (Département d'Information Médicale), expert en facturation T2A/PMSI aux urgences hospitalières en France.
|
|
|
|
Analyse le dossier patient ci-dessous pour déterminer si le passage relève :
|
|
- FORFAIT_URGENCE : passage simple, retour à domicile, sans surveillance prolongée ni soins continus
|
|
- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les 3 critères PMSI/ATIH
|
|
|
|
LES 3 CRITÈRES UHCD (au moins 2 sur 3 validés ⇒ REQUALIFICATION) :
|
|
1. Pathologie potentiellement évolutive (instabilité hémodynamique, terrain à risque, traitement nécessitant adaptation)
|
|
2. Surveillance médicale et paramédicale prolongée (constantes itératives, observations IDE/médecin, durée > 6 h)
|
|
3. Examens complémentaires ou actes thérapeutiques (biologie, imagerie, sutures, gestes techniques)
|
|
|
|
INSTRUCTIONS STRICTES :
|
|
1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère.
|
|
2. Pour CHAQUE critère (1, 2, 3), tu DOIS produire un texte de preuve qui contient AU MOINS UNE CITATION LITTÉRALE du dossier entre guillemets français « ... ». Exemple : « FC à 110 bpm, TA 92/60 ».
|
|
3. Si le critère est NON validé, ne renvoie JAMAIS un fallback creux : explique factuellement ce qui manque, en citant le dossier (ex: « Sortie à H+2 », « Aucun acte technique au compte-rendu »).
|
|
4. Le texte de chaque preuve fait 2-3 phrases : (i) la citation littérale, (ii) l'analyse PMSI, (iii) la conclusion validé/non validé.
|
|
5. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier.
|
|
6. Module ta confiance honnêtement :
|
|
- "elevee" uniquement si tous les indices convergent
|
|
- "moyenne" si éléments ambivalents
|
|
- "faible" si information manquante ou très atypique
|
|
|
|
Réponds STRICTEMENT en JSON valide, sans texte avant ni après :
|
|
{{
|
|
"duree_passage_heures": <nombre>,
|
|
"elements_pour_hospitalisation": [<phrases littéralement extraites du dossier>],
|
|
"elements_pour_forfait": [<phrases littéralement extraites du dossier>],
|
|
"decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION",
|
|
"decision_court": "UHCD" | "Forfait Urgences",
|
|
"preuve_critere1": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (motif, symptôme, terrain à risque, traitement). Si non validé : factualise ce qui manque en citant le dossier.>",
|
|
"critere1_valide": true | false,
|
|
"preuve_critere2": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (constantes, observations IDE, durée surveillance). Si non validé : factualise.>",
|
|
"critere2_valide": true | false,
|
|
"preuve_critere3": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (actes/examens : biologie, imagerie, suture, etc.). Si non validé : factualise.>",
|
|
"critere3_valide": true | false,
|
|
"justification": "<2-3 phrases synthétiques s'appuyant explicitement sur les preuves ci-dessus, avec au moins une citation>",
|
|
"confiance": "elevee" | "moyenne" | "faible"
|
|
}}
|
|
|
|
DOSSIER PATIENT :
|
|
{dpi}
|
|
"""
|
|
|
|
# Post-fix : applique les 5 quick wins de l'audit DIM
|
|
PROMPT_POSTFIX = """Tu es médecin DIM (Département d'Information Médicale), expert en facturation T2A/PMSI aux urgences hospitalières en France.
|
|
|
|
Analyse le dossier patient ci-dessous pour déterminer si le passage relève :
|
|
- FORFAIT_URGENCE : passage simple, retour à domicile / consultation externe, sans surveillance prolongée
|
|
- REQUALIFICATION_HOSPITALISATION : séjour UHCD ou MCO requis selon les 3 critères PMSI/ATIH
|
|
|
|
LES 3 CRITÈRES UHCD — RÈGLE STRICTE selon arbre Eaubonne / instruction DGOS/R1/DSS/1A/2020/52 :
|
|
1. Pathologie potentiellement évolutive (motif initial, intensité/durée des symptômes, traitement initial inefficace, terrain à risque âge/comorbidités)
|
|
2. Surveillance médicale et paramédicale prolongée (constantes itératives, observations IDE/médecin, durée > 6 h)
|
|
3. Examens complémentaires ou actes thérapeutiques (RX, scanner, biologie, suture, KT, antibiotiques IV, aérosols)
|
|
|
|
⚠️ RÈGLE DE COMBINAISON STRICTE (arbre PPTX CH Eaubonne, slide 6) :
|
|
- Si les 3 critères sont validés ⇒ REQUALIFICATION_HOSPITALISATION (UHCD)
|
|
- Si AU MOINS 1 critère est manquant ⇒ FORFAIT_URGENCE
|
|
- AUCUNE dérogation au 2/3. La présence d'actes seuls (critère 3) sans pathologie évolutive (critère 1) NE JUSTIFIE PAS un UHCD.
|
|
|
|
DONNÉES RPU À PRENDRE EN COMPTE EN PRIORITÉ :
|
|
- Durée totale du passage : si < 6 h ET sortie domicile/consultation externe ⇒ très probable FORFAIT_URGENCE quel que soit le terrain.
|
|
- GEMSA :
|
|
* GEMSA 2 = sortie après soins simples → FORFAIT_URGENCE.
|
|
* GEMSA 4 = patient hospitalisé MCO interne (mutation) → favorise UHCD si surveillance documentée.
|
|
* GEMSA 5 = patient transféré dans un autre établissement → FORFAIT_URGENCE par défaut. Mono-RUM UHCD seulement si transfert MCO POST-UHCD documenté ; un transfert direct sans phase d'observation = forfait.
|
|
- Mode de sortie / décision : "Consultation externe" + "Retour à domicile" est une CONTRE-INDICATION FORTE à UHCD, sauf si surveillance > 8 h documentée.
|
|
- CCMU : 2 → faveur Forfait + supplément SU2 si acte CCAM réalisé ; 3,4,5 → faveur UHCD ou supplément SU3.
|
|
|
|
CRITÈRES DE NON-ADMISSION UHCD (SFMU 2024) — si l'un coche, FORFAIT_URGENCE forcé :
|
|
- Pathologie clairement identifiée et relevant à l'évidence d'un service d'hospitalisation conventionnelle (mutation directe MCO sans phase de surveillance préalable).
|
|
- Patient grave relevant d'un service de soins critiques (réa, USIP).
|
|
- Patient déjà hospitalisé dans un autre établissement (UHCD n'accueille pas les urgences intra-hospitalières).
|
|
- Patient sortant directement de bloc opératoire (UHCD n'est pas une salle de réveil).
|
|
|
|
TYPE DE FORFAIT À DÉTERMINER (si decision = FORFAIT_URGENCE) :
|
|
- "SU2" : CCMU 2 + au moins un acte CCAM réalisé (suture, plâtre, geste technique).
|
|
- "PE2" : enfant ≤ 16 ans + diagnostic pédiatrique + CCMU 2 (cumulable avec SU2).
|
|
- "PE1" : enfant ≤ 16 ans + diagnostic pédiatrique + CCMU 1.
|
|
- "Standard" : aucun supplément applicable.
|
|
- null : si decision = REQUALIFICATION_HOSPITALISATION.
|
|
|
|
INSTRUCTIONS STRICTES :
|
|
1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère.
|
|
2. Pour CHAQUE critère (1, 2, 3), tu DOIS produire AU MOINS UNE CITATION LITTÉRALE du dossier entre guillemets français « ... ». Exemple : « FC 110 bpm, TA 92/60 ». Sans citation = critère INVALIDÉ.
|
|
3. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires.
|
|
4. Module ta confiance par critère :
|
|
- "elevee" : citation explicite + cohérence cliniquement nette.
|
|
- "moyenne" : signal partiel ou ambivalent.
|
|
- "faible" : info manquante ou contradictoire.
|
|
|
|
Réponds STRICTEMENT en JSON valide, sans texte avant ni après :
|
|
{{
|
|
"duree_passage_heures": <nombre>,
|
|
"ccmu_inferre": "1" | "2" | "3" | "4" | "5",
|
|
"gemsa_inferre": "2" | "3" | "4" | "5",
|
|
"decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION",
|
|
"decision_court": "UHCD" | "Forfait Urgences",
|
|
"type_forfait": "Standard" | "SU2" | "SU3" | "PE1" | "PE2" | null,
|
|
"supplements_compatibles": [<liste des cumuls applicables, ex. ["SU2", "PE2"]>],
|
|
"preuve_critere1": {{
|
|
"valide": true | false,
|
|
"citation": "<citation littérale entre « »>",
|
|
"analyse": "<1-2 phrases d'analyse PMSI>",
|
|
"confiance_critere": "elevee" | "moyenne" | "faible"
|
|
}},
|
|
"preuve_critere2": {{ "valide": ..., "citation": ..., "analyse": ..., "confiance_critere": ... }},
|
|
"preuve_critere3": {{ "valide": ..., "citation": ..., "analyse": ..., "confiance_critere": ... }},
|
|
"non_admission_uhcd_declenchee": true | false,
|
|
"non_admission_motif": "<si déclenchée, motif précis avec citation>",
|
|
"elements_pour_hospitalisation": [<phrases littéralement extraites du dossier>],
|
|
"elements_pour_forfait": [<phrases littéralement extraites du dossier>],
|
|
"justification": "<3-4 phrases synthétiques s'appuyant sur les 3 critères + RPU + non-admission, avec citations>",
|
|
"confiance_globale": "elevee" | "moyenne" | "faible"
|
|
}}
|
|
|
|
DOSSIER PATIENT :
|
|
{dpi}
|
|
"""
|
|
|
|
|
|
def query_ollama(prompt: str, model: str, timeout: int = TIMEOUT) -> dict:
|
|
payload = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"format": "json",
|
|
"keep_alive": "5m",
|
|
"options": {
|
|
"temperature": 0.1,
|
|
"num_predict": 2000,
|
|
"num_ctx": 16384,
|
|
},
|
|
}
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
OLLAMA_URL, data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
t0 = time.time()
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = json.loads(resp.read().decode("utf-8"))
|
|
except (urllib.error.URLError, TimeoutError, ConnectionError) as e:
|
|
return {"_error": str(e), "_elapsed_s": round(time.time() - t0, 1)}
|
|
elapsed = round(time.time() - t0, 1)
|
|
raw = (body.get("response") or "").strip()
|
|
raw_thinking = (body.get("thinking") or "").strip()
|
|
candidates = [raw]
|
|
if not raw and raw_thinking:
|
|
last_close = raw_thinking.rfind("}")
|
|
last_open = raw_thinking.rfind("{", 0, last_close)
|
|
if last_open != -1 and last_close != -1:
|
|
candidates.append(raw_thinking[last_open:last_close + 1])
|
|
parsed = None
|
|
for cand in candidates:
|
|
cleaned = cand
|
|
if cleaned.startswith("```"):
|
|
cleaned = cleaned.split("\n", 1)[-1]
|
|
if cleaned.endswith("```"):
|
|
cleaned = cleaned.rsplit("```", 1)[0]
|
|
cleaned = cleaned.strip()
|
|
try:
|
|
parsed = json.loads(cleaned)
|
|
break
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if parsed is None:
|
|
return {"_parse_error": True, "_raw": (raw or raw_thinking)[:600], "_elapsed_s": elapsed}
|
|
parsed["_elapsed_s"] = elapsed
|
|
return parsed
|
|
|
|
|
|
def run_bench(mode: str, model: str, runs: int, dpis: dict[str, str]) -> dict:
|
|
prompt_tpl = PROMPT_POSTFIX if mode == "postfix" else PROMPT_BASELINE
|
|
out = {}
|
|
for ipp, _, gt, _ in ORDRE_DOSSIERS:
|
|
dpi_text = dpis[ipp]
|
|
prompt = prompt_tpl.format(dpi=dpi_text)
|
|
runs_out = []
|
|
for r in range(runs):
|
|
res = query_ollama(prompt, model)
|
|
decision = res.get("decision")
|
|
match = decision == gt
|
|
runs_out.append({
|
|
"run": r + 1,
|
|
"decision": decision,
|
|
"match": match,
|
|
"type_forfait": res.get("type_forfait"),
|
|
"duree": res.get("duree_passage_heures"),
|
|
"elapsed_s": res.get("_elapsed_s"),
|
|
"raw": res,
|
|
})
|
|
print(f" [{mode}] {ipp} r{r+1} : {decision or '?'} ({'OK' if match else 'KO'}) {res.get('_elapsed_s', '?')}s", flush=True)
|
|
out[ipp] = runs_out
|
|
return out
|
|
|
|
|
|
def stats(bench: dict, mode_label: str) -> dict:
|
|
"""Calcule accuracy globale + par sous-groupes + stabilité."""
|
|
n_dossiers = len(bench)
|
|
n_runs_total = sum(len(v) for v in bench.values())
|
|
correct_total = sum(1 for runs in bench.values() for r in runs if r["match"])
|
|
# accuracy majoritaire (vote sur 3 runs)
|
|
correct_majority = 0
|
|
stable = 0
|
|
for ipp, runs in bench.items():
|
|
gt = next(g for i, _, g, _ in ORDRE_DOSSIERS if i == ipp)
|
|
decisions = [r["decision"] for r in runs]
|
|
# majorité
|
|
from collections import Counter
|
|
most = Counter(decisions).most_common(1)
|
|
if most and most[0][0] == gt:
|
|
correct_majority += 1
|
|
if len(set(decisions)) == 1:
|
|
stable += 1
|
|
|
|
# par sous-groupe
|
|
uhcd_correct = sum(
|
|
1 for runs in bench.values()
|
|
for r in runs
|
|
if r["match"] and any(g == "REQUALIFICATION_HOSPITALISATION" and i == ipp_run for i, _, g, _ in ORDRE_DOSSIERS for ipp_run in [next(ipp for ipp, runs2 in bench.items() if runs2 is runs)])
|
|
)
|
|
# plus simple :
|
|
ipp_to_gt = {i: g for i, _, g, _ in ORDRE_DOSSIERS}
|
|
ipp_to_type = {i: t for i, _, _, t in ORDRE_DOSSIERS}
|
|
|
|
uhcd_dossiers = [i for i, gt in ipp_to_gt.items() if gt == "REQUALIFICATION_HOSPITALISATION"]
|
|
forfait_dossiers = [i for i, gt in ipp_to_gt.items() if gt == "FORFAIT_URGENCE"]
|
|
uhcd_acc_runs = sum(1 for i in uhcd_dossiers for r in bench[i] if r["match"])
|
|
forfait_acc_runs = sum(1 for i in forfait_dossiers for r in bench[i] if r["match"])
|
|
|
|
return {
|
|
"mode": mode_label,
|
|
"n_dossiers": n_dossiers,
|
|
"n_runs": n_runs_total,
|
|
"accuracy_runs": correct_total / n_runs_total if n_runs_total else 0,
|
|
"accuracy_majority": correct_majority / n_dossiers,
|
|
"uhcd_accuracy_runs": uhcd_acc_runs / max(1, len(uhcd_dossiers) * len(next(iter(bench.values())))),
|
|
"forfait_accuracy_runs": forfait_acc_runs / max(1, len(forfait_dossiers) * len(next(iter(bench.values())))),
|
|
"stability": stable / n_dossiers,
|
|
}
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--runs", type=int, default=3, help="Inférences par dossier")
|
|
p.add_argument("--mode", choices=["baseline", "postfix"], default="baseline")
|
|
p.add_argument("--model", default=None, help="Modèle Ollama (default: qwen2.5:7b en baseline, gemma3:27b-cloud en postfix)")
|
|
p.add_argument("--all", action="store_true", help="Lance baseline + postfix séquentiellement")
|
|
args = p.parse_args()
|
|
|
|
raw = DATA_JS.read_text(encoding="utf-8")
|
|
dpis = {}
|
|
for ipp, label, gt, ftype in ORDRE_DOSSIERS:
|
|
try:
|
|
dpis[ipp] = build_dpi_text(ipp, raw)
|
|
except Exception as e:
|
|
print(f"⚠ {ipp} : extraction DPI échouée — {e}", flush=True)
|
|
dpis[ipp] = f"[ERREUR EXTRACTION] {e}"
|
|
|
|
# Sauve les DPI pour audit
|
|
(RESULTS_DIR / "dpis.json").write_text(json.dumps(dpis, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"📁 DPI consolidés : {RESULTS_DIR}/dpis.json ({sum(len(v) for v in dpis.values())} chars total)")
|
|
|
|
if args.all:
|
|
for mode, default_model in [("baseline", "qwen2.5:7b"), ("postfix", "gemma3:27b-cloud")]:
|
|
mdl = args.model or default_model
|
|
print(f"\n=== {mode.upper()} | model={mdl} | runs={args.runs} ===")
|
|
bench = run_bench(mode, mdl, args.runs, dpis)
|
|
out_path = RESULTS_DIR / f"bench_{mode}.json"
|
|
out_path.write_text(json.dumps({"model": mdl, "runs": args.runs, "results": bench}, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"📁 {out_path}")
|
|
print(json.dumps(stats(bench, mode), indent=2))
|
|
return
|
|
|
|
mode = args.mode
|
|
default_model = "qwen2.5:7b" if mode == "baseline" else "gemma3:27b-cloud"
|
|
mdl = args.model or default_model
|
|
print(f"\n=== {mode.upper()} | model={mdl} | runs={args.runs} ===")
|
|
bench = run_bench(mode, mdl, args.runs, dpis)
|
|
out_path = RESULTS_DIR / f"bench_{mode}.json"
|
|
out_path.write_text(json.dumps({"model": mdl, "runs": args.runs, "results": bench}, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"📁 {out_path}")
|
|
print(json.dumps(stats(bench, mode), indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|