Files
rpa_vision_v3/demo/facturation_urgences/run_simulation_v2.py
Dom 5ea4960e65
Some checks failed
tests / Lint (ruff + black) (push) Successful in 1m50s
tests / Tests unitaires (sans GPU) (push) Failing after 1m50s
tests / Tests sécurité (critique) (push) Has been skipped
backup: snapshot post-démo GHT 2026-05-19
Backup état complet après enregistrement vidéo démo de bout en bout.
À utiliser comme point de référence pour la consolidation post-démo.

Changements majeurs de la session 18-19 mai :
- AIVA-URGENCE : page autonome avec preset URL + auto-focus chain
- Workflow Demo_urgence_3_db : merge linux_db + steps AIVA + pause humaine NoMachine
- Bypass LLM (static_result / static_text) dans replay_engine
  pour démos déterministes sans appel Ollama
- Fix api_stream:3013 — replay_paused au premier polling /next
- dag_execute : lift duration_ms vers top-level pour wait runtime
- NPM bypass auth /aiva-urgence/ via location ^~ (proxy_host/10.conf hors git)
- scripts/cancel-replays.sh — workaround Stop VWB qui ne purge pas la queue

Anchors visuels (468) forcés dans le commit pour garantir restorabilité.
DB workflows actuelle + ~12 .bak DB de la journée incluses.

Sujets identifiés pour consolidation post-démo (TODO) :
1. Bug VWB recapture anchor ne régénère pas le PNG
2. Léa client accumule état mémoire (restart périodique requis)
3. Stop VWB ne purge pas la queue serveur (lien manquant vers /replay/cancel)
4. Bug coord client mss tronqué 2560x60 → mapping Y cassé
5. delay_before/delay_after ignorés au runtime (fix partiel duration_ms)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 14:55:06 +02:00

301 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Simulation v2 : prompt durci + comparaison multi-modèles.
Améliorations vs v1 :
- Prompt anti-fuite : pas de liste d'exemples copiable, extraction littérale
exigée depuis le DPI.
- Sortie enrichie : elements_pour_hospitalisation / elements_pour_forfait /
duree_passage_heures, pour surfacer le raisonnement contradictoire.
- Confiance calibrée : règle explicite (élevée si convergence, moyenne si
ambivalence, faible si manque d'info).
- Boucle multi-modèles : medgemma:4b vs concurrents généralistes, avec
unload (keep_alive=0) entre chaque pour éviter l'accumulation VRAM.
- Breakdown par type (simple / complexe / borderline) — la borderline est
la vraie métrique business.
Lancer : python run_simulation_v2.py
"""
import json
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cas_dpi import CAS # noqa: E402
OLLAMA_URL = "http://localhost:11434/api/generate"
# Modèles à comparer. Chacun est unload après son tour (keep_alive=0).
# Note : qwen3:* écarté ici car reasoning mode + format=json renvoie {} vide
# (incompatibilité tokens "thinking" / contrainte JSON stricte).
MODELS = [
"medgemma:4b", # 3.3 GB — médical spécialisé
"qwen2.5:7b", # 4.7 GB — généraliste rapide, bon FR + JSON
"qwen2.5:14b", # 9.0 GB — généraliste large, raisonnement clinique
"gemma4:latest", # 9.6 GB — défaut projet aiva-vision
"gemma3:27b-cloud", # 27B — cible DGX Spark (poids identiques)
"qwen3-next:80b-cloud", # 80B (MoE) — cible DGX Spark
]
PROMPT_TEMPLATE = """Tu es médecin DIM (Département d'Information Médicale), expert en facturation T2A/PMSI aux urgences hospitalières en France.
Analyse le dossier patient ci-dessous pour déterminer si le passage relève :
- FORFAIT_URGENCE : passage simple, retour à domicile, sans surveillance prolongée ni soins continus
- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les critères PMSI/ATIH
INSTRUCTIONS STRICTES :
1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère.
2. Identifie d'abord les éléments en faveur d'une hospitalisation, puis ceux en faveur d'un forfait, puis tranche.
3. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier.
4. Module ta confiance honnêtement :
- "elevee" uniquement si tous les indices convergent
- "moyenne" si éléments ambivalents
- "faible" si information manquante ou très atypique
Réponds STRICTEMENT en JSON valide, sans texte avant ni après :
{{
"duree_passage_heures": <nombre, à calculer depuis les horaires du dossier>,
"elements_pour_hospitalisation": [<faits littéralement extraits du dossier>],
"elements_pour_forfait": [<faits littéralement extraits du dossier>],
"decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION",
"justification": "<2-3 phrases s'appuyant explicitement sur les faits ci-dessus>",
"confiance": "elevee" | "moyenne" | "faible"
}}
DOSSIER PATIENT :
{dpi}
"""
def fmt_decision(d: str) -> str:
return {"FORFAIT_URGENCE": "FORFAIT", "REQUALIFICATION_HOSPITALISATION": "HOSPIT"}.get(d, d or "?")
def query_model(model: str, dpi_text: str, timeout: int = 600) -> dict:
payload = {
"model": model,
"prompt": PROMPT_TEMPLATE.format(dpi=dpi_text),
"stream": False,
"format": "json",
"keep_alive": "5m", # garde le modèle chargé entre les cas du même run
"options": {
"temperature": 0.1,
"num_predict": 4000, # large : qwen3-next consomme ~2500 tokens en thinking
"num_ctx": 4096,
"reasoning_effort": "minimal", # pour les modèles cloud à raisonnement
},
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
OLLAMA_URL, data=data, headers={"Content-Type": "application/json"}, method="POST"
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except (urllib.error.URLError, TimeoutError, ConnectionError) as e:
return {"_error": str(e), "_elapsed_s": round(time.time() - t0, 1)}
elapsed = time.time() - t0
body = json.loads(raw)
raw_response = body.get("response", "").strip()
raw_thinking = body.get("thinking", "").strip()
# Pour les modèles "thinking" (qwen3-next, DeepSeek-R1) si num_predict est consommé
# par le raisonnement, response peut être vide → on tente une extraction JSON depuis thinking.
candidates = [raw_response]
if not raw_response and raw_thinking:
# Cherche le dernier bloc {...} dans thinking
last_brace_close = raw_thinking.rfind("}")
last_brace_open = raw_thinking.rfind("{", 0, last_brace_close)
if last_brace_open != -1 and last_brace_close != -1:
candidates.append(raw_thinking[last_brace_open:last_brace_close + 1])
parsed = None
for cand in candidates:
cleaned = cand
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1]
if cleaned.endswith("```"):
cleaned = cleaned.rsplit("```", 1)[0]
cleaned = cleaned.strip()
try:
parsed = json.loads(cleaned)
break
except json.JSONDecodeError:
continue
if parsed is None:
parsed = {"_raw": (raw_response or raw_thinking)[:400], "_parse_error": True}
parsed["_elapsed_s"] = round(elapsed, 1)
parsed["_eval_count"] = body.get("eval_count")
return parsed
def unload_model(model: str) -> None:
"""Force unload via keep_alive=0."""
payload = {"model": model, "prompt": "", "keep_alive": 0, "stream": False}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
OLLAMA_URL, data=data, headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
resp.read()
except Exception:
pass
def run_one_model(model: str) -> list[dict]:
print(f"\n{'#' * 78}")
print(f"# MODÈLE : {model}")
print(f"{'#' * 78}")
results = []
for cas in CAS:
gt = cas["verite_terrain"]
out = query_model(model, cas["dpi"])
if out.get("_error"):
decision = "_ERR_"
match = False
elif out.get("_parse_error"):
decision = "_PARSE_"
match = False
else:
decision = out.get("decision", "?")
match = decision == gt
flag = "OK" if match else "KO"
conf = out.get("confiance", "-") if not out.get("_parse_error") else "-"
duree = out.get("duree_passage_heures", "?") if not out.get("_parse_error") else "?"
elapsed = out.get("_elapsed_s", "?")
print(
f" Cas {cas['id']:>2} [{cas['type'][:4]:<4}] GT={fmt_decision(gt):<7} "
f"Pred={fmt_decision(decision):<7} {flag:<3} "
f"conf={conf:<7} durée={str(duree):<5} {elapsed}s"
)
results.append({"cas": cas, "out": out, "match": match, "decision": decision})
print(f" → Unload {model}...")
unload_model(model)
time.sleep(2)
return results
def stats_for_results(results: list[dict]) -> dict:
n = len(results)
correct = sum(1 for r in results if r["match"])
by_type = {}
for t in ("simple", "complexe", "borderline"):
sub = [r for r in results if r["cas"]["type"] == t]
if sub:
by_type[t] = (sum(1 for r in sub if r["match"]), len(sub))
parse_errors = sum(1 for r in results if r["out"].get("_parse_error"))
api_errors = sum(1 for r in results if r["out"].get("_error"))
latencies = [r["out"].get("_elapsed_s", 0) for r in results if not r["out"].get("_error")]
avg_lat = sum(latencies) / max(1, len(latencies))
# Confiance modulée ?
confs = [r["out"].get("confiance", "?") for r in results if not r["out"].get("_parse_error") and not r["out"].get("_error")]
conf_distribution = {c: confs.count(c) for c in set(confs)}
# Faux positifs / négatifs (positif = HOSPIT)
fp = sum(1 for r in results if not r["match"] and r["cas"]["verite_terrain"] == "FORFAIT_URGENCE" and r.get("decision") == "REQUALIFICATION_HOSPITALISATION")
fn = sum(1 for r in results if not r["match"] and r["cas"]["verite_terrain"] == "REQUALIFICATION_HOSPITALISATION" and r.get("decision") == "FORFAIT_URGENCE")
return {
"n": n,
"correct": correct,
"accuracy": correct / n,
"by_type": by_type,
"parse_errors": parse_errors,
"api_errors": api_errors,
"avg_latency_s": avg_lat,
"confiance_distribution": conf_distribution,
"faux_positifs_hospit": fp, # sur-codage
"faux_negatifs_hospit": fn, # manque à gagner
}
def print_synthesis(all_results: dict[str, list[dict]]) -> None:
print(f"\n{'=' * 78}")
print(f" SYNTHÈSE COMPARATIVE")
print(f"{'=' * 78}")
header = f" {'Modèle':<22} {'Acc':<6} {'Simple':<8} {'Complex':<9} {'Border':<8} {'FP':<3} {'FN':<3} {'Lat.':<7} {'Parse':<6}"
print(header)
print(f" {'-' * 76}")
for model, results in all_results.items():
s = stats_for_results(results)
bt = s["by_type"]
simple_str = f"{bt.get('simple', (0, 0))[0]}/{bt.get('simple', (0, 0))[1]}"
complexe_str = f"{bt.get('complexe', (0, 0))[0]}/{bt.get('complexe', (0, 0))[1]}"
border_str = f"{bt.get('borderline', (0, 0))[0]}/{bt.get('borderline', (0, 0))[1]}"
print(
f" {model:<22} {s['correct']:>2}/{s['n']:<3} {simple_str:<8} {complexe_str:<9} "
f"{border_str:<8} {s['faux_positifs_hospit']:<3} {s['faux_negatifs_hospit']:<3} "
f"{s['avg_latency_s']:<6.1f}s {s['parse_errors']:<6}"
)
# Détail par cas pour la lecture qualitative
print(f"\n Détail par cas (vérité-terrain → prédiction par modèle) :")
header2 = f" {'#':<3} {'Type':<11} {'GT':<8}"
for m in all_results.keys():
header2 += f" {m[:14]:<15}"
print(header2)
print(f" {'-' * (len(header2) - 2)}")
for i, cas in enumerate(CAS):
gt = fmt_decision(cas["verite_terrain"])
line = f" {cas['id']:<3} {cas['type']:<11} {gt:<8}"
for m, results in all_results.items():
r = results[i]
pred = fmt_decision(r["decision"]) if r["decision"] not in ("_ERR_", "_PARSE_") else r["decision"]
mark = "" if r["match"] else ""
line += f" {pred:<7} {mark:<7}"
print(line)
# Distribution confiance par modèle
print(f"\n Calibration de la confiance par modèle :")
for model, results in all_results.items():
s = stats_for_results(results)
print(f" {model:<22}{s['confiance_distribution']}")
print(f"\n{'=' * 78}\n")
def main() -> None:
print(f"\n{'=' * 78}")
print(f" SIMULATION v2 — Facturation urgences (multi-modèles, prompt durci)")
print(f" Cas : {len(CAS)} DPI ({sum(1 for c in CAS if c['type']=='simple')} simples + "
f"{sum(1 for c in CAS if c['type']=='complexe')} complexes + "
f"{sum(1 for c in CAS if c['type']=='borderline')} borderline)")
print(f" Modèles : {', '.join(MODELS)}")
print(f"{'=' * 78}")
all_results = {}
for model in MODELS:
all_results[model] = run_one_model(model)
print_synthesis(all_results)
# Sauvegarde
out_path = Path(__file__).parent / "resultats_v2.json"
serializable = {
model: [
{
"id": r["cas"]["id"],
"titre": r["cas"]["titre"],
"type": r["cas"]["type"],
"verite_terrain": r["cas"]["verite_terrain"],
"criteres_attendus": r["cas"]["criteres_cles"],
"prediction": r["out"],
"decision": r["decision"],
"match": r["match"],
}
for r in results
]
for model, results in all_results.items()
}
with out_path.open("w", encoding="utf-8") as f:
json.dump(serializable, f, ensure_ascii=False, indent=2)
print(f" Détails sauvegardés : {out_path}\n")
if __name__ == "__main__":
main()