backup: snapshot post-démo GHT 2026-05-19
Some checks failed
tests / Lint (ruff + black) (push) Successful in 1m50s
tests / Tests unitaires (sans GPU) (push) Failing after 1m50s
tests / Tests sécurité (critique) (push) Has been skipped

Backup état complet après enregistrement vidéo démo de bout en bout.
À utiliser comme point de référence pour la consolidation post-démo.

Changements majeurs de la session 18-19 mai :
- AIVA-URGENCE : page autonome avec preset URL + auto-focus chain
- Workflow Demo_urgence_3_db : merge linux_db + steps AIVA + pause humaine NoMachine
- Bypass LLM (static_result / static_text) dans replay_engine
  pour démos déterministes sans appel Ollama
- Fix api_stream:3013 — replay_paused au premier polling /next
- dag_execute : lift duration_ms vers top-level pour wait runtime
- NPM bypass auth /aiva-urgence/ via location ^~ (proxy_host/10.conf hors git)
- scripts/cancel-replays.sh — workaround Stop VWB qui ne purge pas la queue

Anchors visuels (468) forcés dans le commit pour garantir restorabilité.
DB workflows actuelle + ~12 .bak DB de la journée incluses.

Sujets identifiés pour consolidation post-démo (TODO) :
1. Bug VWB recapture anchor ne régénère pas le PNG
2. Léa client accumule état mémoire (restart périodique requis)
3. Stop VWB ne purge pas la queue serveur (lien manquant vers /replay/cancel)
4. Bug coord client mss tronqué 2560x60 → mapping Y cassé
5. delay_before/delay_after ignorés au runtime (fix partiel duration_ms)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-19 14:55:06 +02:00
parent f2212e77e3
commit 5ea4960e65
627 changed files with 211348 additions and 169 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""Analyse les résultats bench_baseline.json + bench_postfix.json.
Calcule :
- accuracy par dossier (3 runs, vote majoritaire)
- accuracy globale, UHCD, Forfait
- stabilité inter-runs
- score qualité justification (présence CCMU, GEMSA, durée, citations,
cohérence type_forfait)
- Δ baseline vs postfix par dossier
Sortie : tables markdown sur stdout + JSON brut sauvegardé.
"""
from __future__ import annotations
import json
import re
import sys
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
RES = ROOT / "tools" / "_bench_t2a_out"
# (ipp, label court, ground truth, type_forfait attendu)
GT = [
("25003284", "Pneumo VRS 78a 3h37", "FORFAIT_URGENCE", "Standard"),
("25003362", "Intox enfant 3a 4h41", "FORFAIT_URGENCE", "PE2"),
("25003364", "Pneumo SLA 71a 7h35", "REQUALIFICATION_HOSPITALISATION", None),
("25003451", "Plaie suturée 3a 2h00", "FORFAIT_URGENCE", "SU2"),
("25003475", "Aura migr. 34a 4h03", "REQUALIFICATION_HOSPITALISATION", None),
("25005866", "TC hockey 17a 12h01", "REQUALIFICATION_HOSPITALISATION", None),
("25010621", "Laryngite 5a 2h49", "FORFAIT_URGENCE", "PE2"),
("25012257", "Douleur abdo 76a 7h20", "REQUALIFICATION_HOSPITALISATION", None),
("25048485", "CTCG ado 13a 6h50", "FORFAIT_URGENCE", "PE2"),
("25056615", "Salpingite 39a 4h30", "FORFAIT_URGENCE", "Standard"),
("25151530", "Colique nephr. 58a 6h21", "FORFAIT_URGENCE", "Standard"),
]
LITIGIEUX = {"25003475", "25012257", "25048485", "25056615"} # cas borderline cf. audit DIM
def short(d: str | None) -> str:
if d is None: return "?"
if d == "REQUALIFICATION_HOSPITALISATION": return "UHCD"
if d == "FORFAIT_URGENCE": return "Forf"
return d[:8]
def majority(decisions: list[str]) -> str | None:
decisions = [d for d in decisions if d]
if not decisions:
return None
c = Counter(decisions).most_common(1)
return c[0][0]
def quality_score(raw: dict, ipp: str, gt: str, mode: str) -> tuple[int, list[str]]:
"""Score qualité justif sur 5, retourne aussi la liste des points marqués/manqués."""
notes = []
score = 0
# Concaténation de tous les textes pour grep
blob_parts = []
for k, v in raw.items():
if k.startswith("_"):
continue
if isinstance(v, str):
blob_parts.append(v)
elif isinstance(v, dict):
blob_parts.extend(str(x) for x in v.values() if isinstance(x, str))
elif isinstance(v, list):
for x in v:
if isinstance(x, str):
blob_parts.append(x)
elif isinstance(x, dict):
blob_parts.extend(str(y) for y in x.values() if isinstance(y, str))
blob = " ".join(blob_parts).lower()
# 1. Mention CCMU ?
if "ccmu" in blob:
score += 1; notes.append("+CCMU")
else:
notes.append("-CCMU")
# 2. Mention GEMSA ?
if "gemsa" in blob:
score += 1; notes.append("+GEMSA")
else:
notes.append("-GEMSA")
# 3. Mention durée passage ?
duree = raw.get("duree_passage_heures")
if duree is not None and "duree" in str(raw) or re.search(r"\d+\s*h\s*\d+|h(?:eure|rs)", blob):
if duree is not None:
score += 1; notes.append(f"+durée({duree}h)")
else:
notes.append("-durée")
else:
notes.append("-durée")
# 4. Mention mode de sortie / décision médicale ?
if any(w in blob for w in ("retour à domicile", "domicile", "consultation externe",
"hospitalisation", "transfert", "mutation")):
score += 1; notes.append("+mode_sortie")
else:
notes.append("-mode_sortie")
# 5. Présence de citations littérales (« » ou guillemets droits) avec contenu non-vide ?
has_citation = (
bool(re.search(r"«\s*[^»]{6,}\s*»", " ".join(blob_parts)))
or bool(re.search(r'"[^"]{8,}"', " ".join(blob_parts)))
)
if has_citation:
score += 1; notes.append("+citation")
else:
notes.append("-citation")
return score, notes
def hallucination_check(raw: dict, dpi: str) -> list[str]:
"""Liste de citations « ... » présentes dans la sortie LLM mais ABSENTES du DPI."""
out = []
blob_parts = []
for k, v in raw.items():
if k.startswith("_"):
continue
if isinstance(v, str):
blob_parts.append(v)
elif isinstance(v, dict):
for x in v.values():
if isinstance(x, str):
blob_parts.append(x)
full = " ".join(blob_parts)
citations = re.findall(r"«\s*([^»]{6,80})\s*»", full)
dpi_lower = dpi.lower()
for c in citations[:20]: # limite
# tolérance : on cherche un sous-fragment de 8+ caractères
if not any(c.lower()[i:i+12] in dpi_lower for i in range(0, max(1, len(c) - 12), 4)):
out.append(c.strip())
return out
def analyze(mode_label: str, path: Path, dpis: dict[str, str]) -> dict:
if not path.is_file():
print(f"⚠ Fichier manquant : {path}")
return {}
data = json.loads(path.read_text(encoding="utf-8"))
results = data["results"]
model = data["model"]
n_runs = data["runs"]
rows = []
correct_total = 0; total_runs = 0
for ipp, label, gt, ftype in GT:
runs = results.get(ipp, [])
decisions = [r.get("decision") for r in runs]
type_forfaits = [r.get("type_forfait") for r in runs]
match = sum(1 for r in runs if r.get("match"))
total_runs += len(runs)
correct_total += match
maj = majority(decisions)
# type_forfait majoritaire (ignoré si UHCD attendu)
type_maj = Counter([t for t in type_forfaits if t]).most_common(1)
type_maj_str = type_maj[0][0] if type_maj else ""
# Qualité moyenne sur les 3 runs
qscores = []
all_notes = []
halluc_total = []
for r in runs:
raw = r.get("raw", {})
s, notes = quality_score(raw, ipp, gt, mode_label)
qscores.append(s)
all_notes.append(notes)
halluc = hallucination_check(raw, dpis.get(ipp, ""))
halluc_total.extend(halluc)
rows.append({
"ipp": ipp,
"label": label,
"gt": gt,
"gt_short": short(gt),
"ftype": ftype,
"decisions": decisions,
"decisions_short": [short(d) for d in decisions],
"majority": short(maj),
"majority_match": maj == gt,
"type_forfait_maj": type_maj_str,
"type_forfait_match": (gt == "REQUALIFICATION_HOSPITALISATION") or (type_maj_str == ftype),
"stable": len(set(decisions)) == 1,
"match_runs": match,
"litigieux": ipp in LITIGIEUX,
"quality_avg": round(sum(qscores) / max(1, len(qscores)), 1),
"quality_max": max(qscores) if qscores else 0,
"quality_notes_first": all_notes[0] if all_notes else [],
"hallucinations": halluc_total[:5],
})
# Stats globales
n_dossiers = len(rows)
accuracy_runs = correct_total / max(1, total_runs)
accuracy_majority = sum(1 for r in rows if r["majority_match"]) / n_dossiers
uhcd_rows = [r for r in rows if r["gt"] == "REQUALIFICATION_HOSPITALISATION"]
forf_rows = [r for r in rows if r["gt"] == "FORFAIT_URGENCE"]
uhcd_acc_majority = sum(1 for r in uhcd_rows if r["majority_match"]) / max(1, len(uhcd_rows))
forf_acc_majority = sum(1 for r in forf_rows if r["majority_match"]) / max(1, len(forf_rows))
stability = sum(1 for r in rows if r["stable"]) / n_dossiers
litigieux_acc = sum(1 for r in rows if r["litigieux"] and r["majority_match"]) / max(1, len([r for r in rows if r["litigieux"]]))
type_forfait_acc = sum(1 for r in rows if r["gt"] == "FORFAIT_URGENCE" and r["type_forfait_match"]) / max(1, len(forf_rows))
avg_quality = round(sum(r["quality_avg"] for r in rows) / n_dossiers, 2)
n_halluc = sum(len(r["hallucinations"]) for r in rows)
return {
"mode": mode_label,
"model": model,
"n_runs": n_runs,
"rows": rows,
"accuracy_runs": round(accuracy_runs, 3),
"accuracy_majority": round(accuracy_majority, 3),
"uhcd_acc_majority": round(uhcd_acc_majority, 3),
"forfait_acc_majority": round(forf_acc_majority, 3),
"type_forfait_acc": round(type_forfait_acc, 3),
"stability": round(stability, 3),
"litigieux_acc": round(litigieux_acc, 3),
"avg_quality": avg_quality,
"n_hallucinations": n_halluc,
}
def print_table(report: dict):
print(f"\n## {report['mode']} (model={report['model']}, {report['n_runs']} runs/dossier)\n")
print(f"- Accuracy runs (3×11=33 inférences) : **{report['accuracy_runs']*100:.0f}%**")
print(f"- Accuracy vote majoritaire (sur 11 dossiers) : **{report['accuracy_majority']*100:.0f}%**")
print(f"- Accuracy UHCD (majoritaire) : {report['uhcd_acc_majority']*100:.0f}%")
print(f"- Accuracy Forfait (majoritaire) : {report['forfait_acc_majority']*100:.0f}%")
print(f"- Type forfait correct (parmi forfaits OK) : {report['type_forfait_acc']*100:.0f}%")
print(f"- Stabilité inter-runs : {report['stability']*100:.0f}%")
print(f"- Cas litigieux OK : {report['litigieux_acc']*100:.0f}%")
print(f"- Qualité justification moyenne : **{report['avg_quality']}/5**")
print(f"- Hallucinations citations : {report['n_hallucinations']}")
print()
print("| IPP | Cas | GT | Run1 | Run2 | Run3 | Maj | Stable | Type | Qual |")
print("|---|---|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|")
for r in report["rows"]:
runs = r["decisions_short"] + [""] * (3 - len(r["decisions_short"]))
stable = "" if r["stable"] else " "
ftype = r["type_forfait_maj"] if r["gt"] == "FORFAIT_URGENCE" else ""
ftype_mark = "" if r["gt"] == "REQUALIFICATION_HOSPITALISATION" else ("" if r["type_forfait_match"] else "")
flag = "" if r["majority_match"] else ""
litig = " 🔴" if r["litigieux"] else ""
print(f"| {r['ipp']} | {r['label']}{litig} | {r['gt_short']} | "
f"{runs[0]} | {runs[1]} | {runs[2]} | {flag} {r['majority']} | {stable} | "
f"{ftype}{ftype_mark} | {r['quality_avg']}/5 |")
def print_delta(baseline: dict, postfix: dict):
print("\n## Δ Baseline → Post-fix\n")
print("| IPP | Cas | GT | Baseline | Post-fix | Δ |")
print("|---|---|:---:|:---:|:---:|:---:|")
for b, p in zip(baseline["rows"], postfix["rows"]):
b_flag = "" if b["majority_match"] else ""
p_flag = "" if p["majority_match"] else ""
if b["majority_match"] and p["majority_match"]:
delta = "= ✓"
elif not b["majority_match"] and p["majority_match"]:
delta = "🟢 +1"
elif b["majority_match"] and not p["majority_match"]:
delta = "🔴 -1"
else:
delta = "= ✗"
litig = " 🔴" if b["litigieux"] else ""
print(f"| {b['ipp']} | {b['label']}{litig} | {b['gt_short']} | {b_flag} {b['majority']} | {p_flag} {p['majority']} | {delta} |")
# Headlines
print()
print(f"**Synthèse Δ** :")
print(f"- Baseline : {sum(1 for r in baseline['rows'] if r['majority_match'])}/11 → {baseline['accuracy_majority']*100:.0f}%")
print(f"- Post-fix : {sum(1 for r in postfix['rows'] if r['majority_match'])}/11 → {postfix['accuracy_majority']*100:.0f}%")
print(f"- Gain absolu : {(postfix['accuracy_majority'] - baseline['accuracy_majority'])*100:+.0f} points")
print(f"- Stabilité : {baseline['stability']*100:.0f}% → {postfix['stability']*100:.0f}%")
print(f"- Qualité justification : {baseline['avg_quality']}/5 → {postfix['avg_quality']}/5")
def main():
dpis = json.loads((RES / "dpis.json").read_text(encoding="utf-8"))
baseline = analyze("Baseline", RES / "bench_baseline.json", dpis)
postfix = analyze("Post-fix", RES / "bench_postfix.json", dpis)
if baseline:
print_table(baseline)
if postfix:
print_table(postfix)
if baseline and postfix:
print_delta(baseline, postfix)
# Sauve l'analyse complète
out = RES / "analysis.json"
out.write_text(json.dumps({"baseline": baseline, "postfix": postfix}, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\n📁 {out}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,121 @@
"""Ajoute 10 steps Excel à Demo_urgence_2_interop (ord 15-24).
Usage :
python tools/append_excel_steps_interop.py [--dry-run]
"""
from __future__ import annotations
import argparse
import json
import secrets
import sqlite3
import sys
import time
from pathlib import Path
DB_PATH = Path(__file__).resolve().parent.parent / "visual_workflow_builder" / "backend" / "instance" / "workflows.db"
WF_ID = "wf_56bf8fa2d332_1778666923" # Demo_urgence_2_interop
# (label, action_type, parameters_dict)
STEPS = [
("Win+D", "keyboard_shortcut", {"keys": ["win", "d"]}),
("Ouvre codage_urgence.xlsx", "double_click_anchor", {"by_text": "codage_urgence"}),
("Cellule A2", "click_anchor", {"by_text": "A2"}),
("IPP patient", "type_text", {"text": "25003284", "paste": False}),
("Tab", "keyboard_shortcut", {"keys": ["tab"]}),
("Décision T2A", "type_text", {"text": "{{dec.decision_court}}", "paste": False}),
("Tab", "keyboard_shortcut", {"keys": ["tab"]}),
("Résumé clinique", "type_text", {"text": "{{resume_patient}}", "paste": False}),
("Tab", "keyboard_shortcut", {"keys": ["tab"]}),
("Justification", "type_text", {"text": "{{justification_t2a}}", "paste": False}),
]
FIRST_ORD = 15 # le workflow cible a déjà ord 0..14
def new_step_id(ts: int) -> str:
return f"step_{secrets.token_hex(6)}_{ts}"
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--dry-run", action="store_true")
args = p.parse_args()
if not DB_PATH.exists():
print(f"ERREUR : DB introuvable {DB_PATH}", file=sys.stderr)
return 1
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Garde : workflow existe + ord libre à partir de FIRST_ORD
wf = cur.execute("SELECT id FROM workflows WHERE id = ?", (WF_ID,)).fetchone()
if not wf:
print(f"ERREUR : workflow {WF_ID} introuvable", file=sys.stderr)
return 2
max_ord = cur.execute(
'SELECT COALESCE(MAX("order"), -1) FROM steps WHERE workflow_id = ?',
(WF_ID,),
).fetchone()[0]
if max_ord + 1 != FIRST_ORD:
print(
f"ERREUR : ord libre attendu {FIRST_ORD}, trouvé max_ord+1={max_ord + 1}",
file=sys.stderr,
)
return 3
ts = int(time.time())
now_iso = time.strftime("%Y-%m-%d %H:%M:%S")
rows = []
for i, (label, atype, params) in enumerate(STEPS):
rows.append({
"id": new_step_id(ts + i),
"workflow_id": WF_ID,
"action_type": atype,
"order": FIRST_ORD + i,
"position_x": None,
"position_y": None,
"parameters_json": json.dumps(params, ensure_ascii=False),
"anchor_id": None,
"label": label,
"created_at": now_iso,
"updated_at": now_iso,
})
print(f"\nWorkflow : {WF_ID}")
print(f"{'ord':>3} {'action_type':<22} label parameters_json")
print("-" * 110)
for r in rows:
print(f"{r['order']:>3} {r['action_type']:<22} {r['label']:<30} {r['parameters_json']}")
print()
if args.dry_run:
print("--dry-run : aucune modification de la DB.")
return 0
try:
cur.execute("BEGIN")
for r in rows:
cur.execute(
"""
INSERT INTO steps
(id, workflow_id, action_type, "order", position_x, position_y,
parameters_json, anchor_id, label, created_at, updated_at)
VALUES (:id, :workflow_id, :action_type, :order, :position_x, :position_y,
:parameters_json, :anchor_id, :label, :created_at, :updated_at)
""",
r,
)
conn.commit()
print(f"OK — {len(rows)} steps insérés (ord {FIRST_ORD}..{FIRST_ORD + len(rows) - 1})")
return 0
except Exception as e:
conn.rollback()
print(f"ROLLBACK — {e}", file=sys.stderr)
return 5
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())

662
tools/bench_t2a_post_fix.py Normal file
View File

@@ -0,0 +1,662 @@
#!/usr/bin/env python3
"""Bench T2A décision baseline vs post-fix DIM (11 dossiers GHT Sud 95).
Wrapper hors-prod (n'altère pas core/llm/t2a_decision.py ni urgences_orchestrator.py).
Étapes :
1. Parse data.js (regex JS, on n'évalue pas le JS) pour reconstruire un DPI texte
fidèle à ce que `extract_text` produirait depuis l'écran maquette.
2. Lance N inférences/dossier avec :
- Baseline : prompt actuel `core/llm/t2a_decision.py` + DEFAULT_MODEL=qwen2.5:7b
- Post-fix : prompt enrichi (QW1 règle 3/3, QW2 RPU/CCMU/GEMSA/durée,
QW3 type_forfait, QW4 non-admission, QW5 confiance/critère)
+ modèle gemma3:27b-cloud
3. Sauve les résultats bruts JSON, calcule accuracy + matrice + justifs scoring.
Usage :
python tools/bench_t2a_post_fix.py --runs 3 --mode baseline --model qwen2.5:7b
python tools/bench_t2a_post_fix.py --runs 3 --mode postfix --model gemma3:27b-cloud
python tools/bench_t2a_post_fix.py --all # baseline + postfix
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
DATA_JS = ROOT / "docs/clients/ght_sud_95/mockup_easily_assure/data.js"
RESULTS_DIR = ROOT / "tools" / "_bench_t2a_out"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
OLLAMA_URL = "http://localhost:11434/api/generate"
TIMEOUT = 240 # le cloud peut prendre ~30s, on laisse de la marge
ORDRE_DOSSIERS = [
("25003284", "Pneumopathie VRS — terrain coronaire 78a, 3h37 sortie domicile", "FORFAIT_URGENCE", "Standard"),
("25003362", "Intoxication accidentelle enfant 3 ans, 4h41", "FORFAIT_URGENCE", "PE2"),
("25003364", "Pneumopathie sur SLA + BPCO, 7h35 mutation pneumo", "REQUALIFICATION_HOSPITALISATION", None),
("25003451", "Plaie suturée enfant 3 ans, 2h00 (CCMU 2 + suture)", "FORFAIT_URGENCE", "SU2"),
("25003475", "Suspicion AVC → aura migraineuse 34a, 4h03", "REQUALIFICATION_HOSPITALISATION", None),
("25005866", "TC commotionnel hockey 17a, 12h01 surveillance neuro", "REQUALIFICATION_HOSPITALISATION", None),
("25010621", "Laryngite enfant 5 ans, 2h49 (CCMU 2 pédiatrique)", "FORFAIT_URGENCE", "PE2"),
("25012257", "Douleur abdo polypath 76a 7h20 (transfert Embruns)", "REQUALIFICATION_HOSPITALISATION", None),
("25048485", "1ère CTCG ado 13a, 6h50 (CCMU 2 pédiatrique)", "FORFAIT_URGENCE", "PE2"),
("25056615", "Salpingite 39a transfert gynéco GEMSA 5, 4h30", "FORFAIT_URGENCE", "Standard"),
("25151530", "Colique néphrétique 58a 6h21 sortie domicile", "FORFAIT_URGENCE", "Standard"),
]
# ─────────────────────────────────────────────────────────────────────
# Loader data.js → DPI texte
# ─────────────────────────────────────────────────────────────────────
# data.js est du JS littéral. Pour éviter l'embarquement d'un parser JS, on
# utilise une approche pragmatique : on extrait les blocs par regex sur
# l'IPP, puis on aplatit le JS-quasi-JSON en texte lisible.
def strip_html(s: str) -> str:
"""Retire les <b>, <br>, &nbsp; etc. — proche de ce que extract_text rendrait."""
s = re.sub(r"<br\s*/?>", "\n", s, flags=re.IGNORECASE)
s = re.sub(r"<[^>]+>", "", s)
s = s.replace("&nbsp;", " ")
s = re.sub(r"\n\s*\n+", "\n\n", s)
return s.strip()
def extract_block(text: str, ipp: str) -> str:
"""Récupère le bloc JS associé à `"<ipp>": { ... },` en équilibrant les accolades."""
pat = re.compile(rf'"{ipp}"\s*:\s*\{{')
m = pat.search(text)
if not m:
raise ValueError(f"Bloc {ipp} introuvable")
start = m.end() - 1 # position du `{` ouvrant
depth = 0
in_str = False
in_template = False
escape = False
i = start
while i < len(text):
c = text[i]
if escape:
escape = False
i += 1
continue
if c == "\\":
escape = True
i += 1
continue
if in_str:
if c == in_str:
in_str = False
elif in_template:
if c == "`":
in_template = False
else:
if c == '"' or c == "'":
in_str = c
elif c == "`":
in_template = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
return text[start : i + 1]
i += 1
raise ValueError(f"Bloc {ipp} non clos")
def _js_unescape(s: str) -> str:
"""Décode les escapes JS courants sans casser l'UTF-8 (les caractères
accentués sont déjà en utf-8 dans le fichier source)."""
# \\n \\t \\" \\' \\\\ \\u00xx
def repl(m):
c = m.group(0)
if c == r"\n": return "\n"
if c == r"\t": return "\t"
if c == r"\r": return "\r"
if c == r"\"": return "\""
if c == r"\'": return "'"
if c == r"\\": return "\\"
if c.startswith(r"\u"):
return chr(int(c[2:], 16))
return c
return re.sub(r"\\u[0-9a-fA-F]{4}|\\[ntr\"'\\]", repl, s)
def extract_field(block: str, name: str, kind: str = "string") -> str | None:
"""Extrait `name: "..."` ou `name: \\`...\\`` du bloc."""
if kind == "string":
# double-quote string
m = re.search(rf'\b{re.escape(name)}\s*:\s*"((?:[^"\\]|\\.)*)"', block)
if m:
return _js_unescape(m.group(1))
# template literal
m = re.search(rf'\b{re.escape(name)}\s*:\s*`((?:[^`\\]|\\.)*)`', block)
if m:
return _js_unescape(m.group(1))
return None
def extract_recap_rpu(block: str) -> list[tuple[str, str]]:
"""recap_rpu: [["Mode de venue", "Véhicule personnel"], ...]"""
m = re.search(r"recap_rpu\s*:\s*\[(.*?)\]\s*\}", block, flags=re.DOTALL)
if not m:
return []
inner = m.group(1)
pairs = re.findall(r'\[\s*"((?:[^"\\]|\\.)*)"\s*,\s*"((?:[^"\\]|\\.)*)"\s*\]', inner)
return [(strip_html(k), strip_html(v)) for k, v in pairs]
def extract_signes_vitaux(block: str) -> list[tuple[str, str, str]]:
"""signes_vitaux: [{item, v1, v2}, ...] avec dates en signes_vitaux_dates."""
m_dates = re.search(r"signes_vitaux_dates\s*:\s*\[((?:[^\]])*)\]", block)
dates = []
if m_dates:
dates = re.findall(r'"((?:[^"\\]|\\.)*)"', m_dates.group(1))
m = re.search(r"signes_vitaux\s*:\s*\[(.*?)\]\s*\}", block, flags=re.DOTALL)
rows = []
if m:
block_rows = m.group(1)
def _ext(row: str, key: str) -> str:
# Tente double-quote, single-quote, template — accepte les contenus
# avec quotes mixtes (HTML <span class="...">)
for quote in ('"', "'", '`'):
pat = rf'{key}\s*:\s*{re.escape(quote)}((?:(?!{re.escape(quote)}).)*){re.escape(quote)}'
mm = re.search(pat, row, flags=re.DOTALL)
if mm:
return mm.group(1)
return ""
for row in re.findall(r"\{[^}]*\}", block_rows):
rows.append((
strip_html(_ext(row, "item")) or "?",
strip_html(_ext(row, "v1")),
strip_html(_ext(row, "v2")),
))
return rows, dates
def extract_diagnostics(block: str) -> list[str]:
"""diagnostics: [{code, type, ...}]"""
m = re.search(r"diagnostics\s*:\s*\[(.*?)\]", block, flags=re.DOTALL)
if not m:
return []
inner = m.group(1)
out = []
for cell in re.findall(r"\{[^}]*\}", inner):
code = re.search(r'code\s*:\s*"((?:[^"\\]|\\.)*)"', cell)
if code:
out.append(strip_html(code.group(1)))
return out
def extract_notes_medicales(block: str) -> list[dict]:
m = re.search(r"notes_medicales\s*:\s*\[(.*?)\]\s*,\s*//", block, flags=re.DOTALL)
if not m:
# fallback: chercher fermeture par "synthese"
m = re.search(r"notes_medicales\s*:\s*\[(.*?)\]\s*,?\s*synthese", block, flags=re.DOTALL)
if not m:
return []
inner = m.group(1)
notes = []
# split pragmatique sur "type:"
for chunk in re.split(r"\}\s*,\s*\{", inner):
chunk = "{" + chunk.strip("{} ,\n") + "}"
date = re.search(r'date\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
type_ = re.search(r'type\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
par = re.search(r'par\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
contenu = re.search(r"contenu\s*:\s*`((?:[^`\\]|\\.)*)`", chunk, flags=re.DOTALL)
if not contenu:
contenu = re.search(r'contenu\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
if contenu:
notes.append({
"date": date.group(1) if date else "",
"type": type_.group(1) if type_ else "",
"par": par.group(1) if par else "",
"contenu": strip_html(contenu.group(1)),
})
return notes
def extract_examens_questionnaires(block: str) -> list[dict]:
"""examens.questionnaires: [{nom, reponse, ...}]"""
m = re.search(r"questionnaires\s*:\s*\[(.*?)\]", block, flags=re.DOTALL)
if not m:
return []
inner = m.group(1)
out = []
for chunk in re.split(r"\}\s*,\s*\{", inner):
chunk = "{" + chunk.strip("{} ,\n") + "}"
nom = re.search(r'nom\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
rep = re.search(r"reponse\s*:\s*`((?:[^`\\]|\\.)*)`", chunk, flags=re.DOTALL)
if rep:
out.append({
"nom": nom.group(1) if nom else "",
"reponse": strip_html(rep.group(1)),
})
return out
def extract_notes_paramedicales(block: str) -> list[dict]:
m = re.search(r"notes_paramedicales\s*:\s*\[(.*?)\]\s*\}", block, flags=re.DOTALL)
if not m:
return []
inner = m.group(1)
out = []
for chunk in re.split(r"\}\s*,\s*\{", inner):
chunk = "{" + chunk.strip("{} ,\n") + "}"
contenu = re.search(r"contenu\s*:\s*`((?:[^`\\]|\\.)*)`", chunk, flags=re.DOTALL)
if contenu:
par = re.search(r'par\s*:\s*"((?:[^"\\]|\\.)*)"', chunk)
out.append({
"par": par.group(1) if par else "",
"contenu": strip_html(contenu.group(1)),
})
return out
def build_dpi_text(ipp: str, raw: str) -> str:
"""Construit un texte DPI plausible depuis le bloc data.js."""
block = extract_block(raw, ipp)
nom = extract_field(block, "nom") or ""
prenom = extract_field(block, "prenom") or ""
age = extract_field(block, "age") or ""
sexe = extract_field(block, "sexe") or ""
arrivee = extract_field(block, "arrivee") or ""
sortie = extract_field(block, "sortie") or ""
motif_court = extract_field(block, "motif_court") or ""
obs_ide = extract_field(block, "obs_ide") or ""
diagnostics = extract_diagnostics(block)
notes = extract_notes_medicales(block)
examens = extract_examens_questionnaires(block)
notes_param = extract_notes_paramedicales(block)
rpu = extract_recap_rpu(block)
constantes, dates = extract_signes_vitaux(block)
ccmu = extract_field(block, "ccmu") or ""
gemsa = extract_field(block, "gemsa") or ""
diag_synth = extract_field(block, "diagnostics_synthese") or ""
decision = extract_field(block, "decision") or ""
orientation = extract_field(block, "orientation") or ""
us_dest = extract_field(block, "us_destination") or ""
motif_pec = extract_field(block, "motif_pec") or ""
mode_transport = extract_field(block, "mode_transport") or ""
mode_entree = extract_field(block, "mode_entree") or ""
lines = []
lines.append(f"=== DOSSIER PATIENT IPP {ipp} ===")
lines.append(f"Identité : {nom} {prenom} ({sexe}, {age})")
lines.append(f"Arrivée : {arrivee}")
lines.append(f"Sortie : {sortie}")
lines.append(f"Motif : {motif_court}")
lines.append("")
lines.append("--- ONGLET MOTIF / IDE ---")
if obs_ide:
lines.append("Observation IDE :")
lines.append(strip_html(obs_ide))
lines.append("")
if diagnostics:
lines.append("Diagnostics :")
for d in diagnostics:
lines.append(f" - {d}")
lines.append("")
if constantes:
lines.append("Signes vitaux (par colonne datée) :")
if dates:
lines.append(f" Dates colonnes : {' | '.join(dates)}")
for item, v1, v2 in constantes:
lines.append(f" - {item} : V1={v1 or ''} | V2={v2 or ''}")
lines.append("")
if examens:
lines.append("--- ONGLET EXAMENS CLINIQUES ---")
for e in examens:
lines.append(f"[{e['nom']}]")
lines.append(e["reponse"])
lines.append("")
if notes_param:
lines.append("--- NOTES PARAMÉDICALES ---")
for n in notes_param:
lines.append(f"[par {n['par']}]")
lines.append(n["contenu"])
lines.append("")
if notes:
lines.append("--- ONGLET NOTES MÉDICALES ---")
for n in notes:
lines.append(f"[{n['date']}{n['type']}{n['par']}]")
lines.append(n["contenu"])
lines.append("")
lines.append("--- ONGLET SYNTHÈSE URGENCES (RPU) ---")
if mode_transport:
lines.append(f"Mode de venue : {mode_transport}")
if mode_entree:
lines.append(f"Mode d'entrée : {mode_entree}")
if motif_pec:
lines.append(f"Motif PEC : {motif_pec}")
if ccmu:
lines.append(f"CCMU : {ccmu}")
if gemsa:
lines.append(f"GEMSA : {gemsa}")
if diag_synth:
lines.append(f"Diagnostic principal RPU : {diag_synth}")
if decision:
lines.append(f"Décision médicale : {decision}")
if orientation:
lines.append(f"Orientation : {orientation}")
if us_dest:
lines.append(f"Destination : {us_dest}")
if rpu:
lines.append("")
lines.append("Récapitulatif RPU :")
for k, v in rpu:
lines.append(f" - {k} : {v}")
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────
# Prompts
# ─────────────────────────────────────────────────────────────────────
PROMPT_BASELINE = """Tu es médecin DIM (Département d'Information Médicale), expert en facturation T2A/PMSI aux urgences hospitalières en France.
Analyse le dossier patient ci-dessous pour déterminer si le passage relève :
- FORFAIT_URGENCE : passage simple, retour à domicile, sans surveillance prolongée ni soins continus
- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les 3 critères PMSI/ATIH
LES 3 CRITÈRES UHCD (au moins 2 sur 3 validés ⇒ REQUALIFICATION) :
1. Pathologie potentiellement évolutive (instabilité hémodynamique, terrain à risque, traitement nécessitant adaptation)
2. Surveillance médicale et paramédicale prolongée (constantes itératives, observations IDE/médecin, durée > 6 h)
3. Examens complémentaires ou actes thérapeutiques (biologie, imagerie, sutures, gestes techniques)
INSTRUCTIONS STRICTES :
1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère.
2. Pour CHAQUE critère (1, 2, 3), tu DOIS produire un texte de preuve qui contient AU MOINS UNE CITATION LITTÉRALE du dossier entre guillemets français « ... ». Exemple : « FC à 110 bpm, TA 92/60 ».
3. Si le critère est NON validé, ne renvoie JAMAIS un fallback creux : explique factuellement ce qui manque, en citant le dossier (ex: « Sortie à H+2 », « Aucun acte technique au compte-rendu »).
4. Le texte de chaque preuve fait 2-3 phrases : (i) la citation littérale, (ii) l'analyse PMSI, (iii) la conclusion validé/non validé.
5. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier.
6. Module ta confiance honnêtement :
- "elevee" uniquement si tous les indices convergent
- "moyenne" si éléments ambivalents
- "faible" si information manquante ou très atypique
Réponds STRICTEMENT en JSON valide, sans texte avant ni après :
{{
"duree_passage_heures": <nombre>,
"elements_pour_hospitalisation": [<phrases littéralement extraites du dossier>],
"elements_pour_forfait": [<phrases littéralement extraites du dossier>],
"decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION",
"decision_court": "UHCD" | "Forfait Urgences",
"preuve_critere1": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (motif, symptôme, terrain à risque, traitement). Si non validé : factualise ce qui manque en citant le dossier.>",
"critere1_valide": true | false,
"preuve_critere2": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (constantes, observations IDE, durée surveillance). Si non validé : factualise.>",
"critere2_valide": true | false,
"preuve_critere3": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (actes/examens : biologie, imagerie, suture, etc.). Si non validé : factualise.>",
"critere3_valide": true | false,
"justification": "<2-3 phrases synthétiques s'appuyant explicitement sur les preuves ci-dessus, avec au moins une citation>",
"confiance": "elevee" | "moyenne" | "faible"
}}
DOSSIER PATIENT :
{dpi}
"""
# Post-fix : applique les 5 quick wins de l'audit DIM
PROMPT_POSTFIX = """Tu es médecin DIM (Département d'Information Médicale), expert en facturation T2A/PMSI aux urgences hospitalières en France.
Analyse le dossier patient ci-dessous pour déterminer si le passage relève :
- FORFAIT_URGENCE : passage simple, retour à domicile / consultation externe, sans surveillance prolongée
- REQUALIFICATION_HOSPITALISATION : séjour UHCD ou MCO requis selon les 3 critères PMSI/ATIH
LES 3 CRITÈRES UHCD — RÈGLE STRICTE selon arbre Eaubonne / instruction DGOS/R1/DSS/1A/2020/52 :
1. Pathologie potentiellement évolutive (motif initial, intensité/durée des symptômes, traitement initial inefficace, terrain à risque âge/comorbidités)
2. Surveillance médicale et paramédicale prolongée (constantes itératives, observations IDE/médecin, durée > 6 h)
3. Examens complémentaires ou actes thérapeutiques (RX, scanner, biologie, suture, KT, antibiotiques IV, aérosols)
⚠️ RÈGLE DE COMBINAISON STRICTE (arbre PPTX CH Eaubonne, slide 6) :
- Si les 3 critères sont validés ⇒ REQUALIFICATION_HOSPITALISATION (UHCD)
- Si AU MOINS 1 critère est manquant ⇒ FORFAIT_URGENCE
- AUCUNE dérogation au 2/3. La présence d'actes seuls (critère 3) sans pathologie évolutive (critère 1) NE JUSTIFIE PAS un UHCD.
DONNÉES RPU À PRENDRE EN COMPTE EN PRIORITÉ :
- Durée totale du passage : si < 6 h ET sortie domicile/consultation externe ⇒ très probable FORFAIT_URGENCE quel que soit le terrain.
- GEMSA :
* GEMSA 2 = sortie après soins simples → FORFAIT_URGENCE.
* GEMSA 4 = patient hospitalisé MCO interne (mutation) → favorise UHCD si surveillance documentée.
* GEMSA 5 = patient transféré dans un autre établissement → FORFAIT_URGENCE par défaut. Mono-RUM UHCD seulement si transfert MCO POST-UHCD documenté ; un transfert direct sans phase d'observation = forfait.
- Mode de sortie / décision : "Consultation externe" + "Retour à domicile" est une CONTRE-INDICATION FORTE à UHCD, sauf si surveillance > 8 h documentée.
- CCMU : 2 → faveur Forfait + supplément SU2 si acte CCAM réalisé ; 3,4,5 → faveur UHCD ou supplément SU3.
CRITÈRES DE NON-ADMISSION UHCD (SFMU 2024) — si l'un coche, FORFAIT_URGENCE forcé :
- Pathologie clairement identifiée et relevant à l'évidence d'un service d'hospitalisation conventionnelle (mutation directe MCO sans phase de surveillance préalable).
- Patient grave relevant d'un service de soins critiques (réa, USIP).
- Patient déjà hospitalisé dans un autre établissement (UHCD n'accueille pas les urgences intra-hospitalières).
- Patient sortant directement de bloc opératoire (UHCD n'est pas une salle de réveil).
TYPE DE FORFAIT À DÉTERMINER (si decision = FORFAIT_URGENCE) :
- "SU2" : CCMU 2 + au moins un acte CCAM réalisé (suture, plâtre, geste technique).
- "PE2" : enfant ≤ 16 ans + diagnostic pédiatrique + CCMU 2 (cumulable avec SU2).
- "PE1" : enfant ≤ 16 ans + diagnostic pédiatrique + CCMU 1.
- "Standard" : aucun supplément applicable.
- null : si decision = REQUALIFICATION_HOSPITALISATION.
INSTRUCTIONS STRICTES :
1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère.
2. Pour CHAQUE critère (1, 2, 3), tu DOIS produire AU MOINS UNE CITATION LITTÉRALE du dossier entre guillemets français « ... ». Exemple : « FC 110 bpm, TA 92/60 ». Sans citation = critère INVALIDÉ.
3. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires.
4. Module ta confiance par critère :
- "elevee" : citation explicite + cohérence cliniquement nette.
- "moyenne" : signal partiel ou ambivalent.
- "faible" : info manquante ou contradictoire.
Réponds STRICTEMENT en JSON valide, sans texte avant ni après :
{{
"duree_passage_heures": <nombre>,
"ccmu_inferre": "1" | "2" | "3" | "4" | "5",
"gemsa_inferre": "2" | "3" | "4" | "5",
"decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION",
"decision_court": "UHCD" | "Forfait Urgences",
"type_forfait": "Standard" | "SU2" | "SU3" | "PE1" | "PE2" | null,
"supplements_compatibles": [<liste des cumuls applicables, ex. ["SU2", "PE2"]>],
"preuve_critere1": {{
"valide": true | false,
"citation": "<citation littérale entre « »>",
"analyse": "<1-2 phrases d'analyse PMSI>",
"confiance_critere": "elevee" | "moyenne" | "faible"
}},
"preuve_critere2": {{ "valide": ..., "citation": ..., "analyse": ..., "confiance_critere": ... }},
"preuve_critere3": {{ "valide": ..., "citation": ..., "analyse": ..., "confiance_critere": ... }},
"non_admission_uhcd_declenchee": true | false,
"non_admission_motif": "<si déclenchée, motif précis avec citation>",
"elements_pour_hospitalisation": [<phrases littéralement extraites du dossier>],
"elements_pour_forfait": [<phrases littéralement extraites du dossier>],
"justification": "<3-4 phrases synthétiques s'appuyant sur les 3 critères + RPU + non-admission, avec citations>",
"confiance_globale": "elevee" | "moyenne" | "faible"
}}
DOSSIER PATIENT :
{dpi}
"""
def query_ollama(prompt: str, model: str, timeout: int = TIMEOUT) -> dict:
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"format": "json",
"keep_alive": "5m",
"options": {
"temperature": 0.1,
"num_predict": 2000,
"num_ctx": 16384,
},
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
OLLAMA_URL, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, TimeoutError, ConnectionError) as e:
return {"_error": str(e), "_elapsed_s": round(time.time() - t0, 1)}
elapsed = round(time.time() - t0, 1)
raw = (body.get("response") or "").strip()
raw_thinking = (body.get("thinking") or "").strip()
candidates = [raw]
if not raw and raw_thinking:
last_close = raw_thinking.rfind("}")
last_open = raw_thinking.rfind("{", 0, last_close)
if last_open != -1 and last_close != -1:
candidates.append(raw_thinking[last_open:last_close + 1])
parsed = None
for cand in candidates:
cleaned = cand
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1]
if cleaned.endswith("```"):
cleaned = cleaned.rsplit("```", 1)[0]
cleaned = cleaned.strip()
try:
parsed = json.loads(cleaned)
break
except json.JSONDecodeError:
continue
if parsed is None:
return {"_parse_error": True, "_raw": (raw or raw_thinking)[:600], "_elapsed_s": elapsed}
parsed["_elapsed_s"] = elapsed
return parsed
def run_bench(mode: str, model: str, runs: int, dpis: dict[str, str]) -> dict:
prompt_tpl = PROMPT_POSTFIX if mode == "postfix" else PROMPT_BASELINE
out = {}
for ipp, _, gt, _ in ORDRE_DOSSIERS:
dpi_text = dpis[ipp]
prompt = prompt_tpl.format(dpi=dpi_text)
runs_out = []
for r in range(runs):
res = query_ollama(prompt, model)
decision = res.get("decision")
match = decision == gt
runs_out.append({
"run": r + 1,
"decision": decision,
"match": match,
"type_forfait": res.get("type_forfait"),
"duree": res.get("duree_passage_heures"),
"elapsed_s": res.get("_elapsed_s"),
"raw": res,
})
print(f" [{mode}] {ipp} r{r+1} : {decision or '?'} ({'OK' if match else 'KO'}) {res.get('_elapsed_s', '?')}s", flush=True)
out[ipp] = runs_out
return out
def stats(bench: dict, mode_label: str) -> dict:
"""Calcule accuracy globale + par sous-groupes + stabilité."""
n_dossiers = len(bench)
n_runs_total = sum(len(v) for v in bench.values())
correct_total = sum(1 for runs in bench.values() for r in runs if r["match"])
# accuracy majoritaire (vote sur 3 runs)
correct_majority = 0
stable = 0
for ipp, runs in bench.items():
gt = next(g for i, _, g, _ in ORDRE_DOSSIERS if i == ipp)
decisions = [r["decision"] for r in runs]
# majorité
from collections import Counter
most = Counter(decisions).most_common(1)
if most and most[0][0] == gt:
correct_majority += 1
if len(set(decisions)) == 1:
stable += 1
# par sous-groupe
uhcd_correct = sum(
1 for runs in bench.values()
for r in runs
if r["match"] and any(g == "REQUALIFICATION_HOSPITALISATION" and i == ipp_run for i, _, g, _ in ORDRE_DOSSIERS for ipp_run in [next(ipp for ipp, runs2 in bench.items() if runs2 is runs)])
)
# plus simple :
ipp_to_gt = {i: g for i, _, g, _ in ORDRE_DOSSIERS}
ipp_to_type = {i: t for i, _, _, t in ORDRE_DOSSIERS}
uhcd_dossiers = [i for i, gt in ipp_to_gt.items() if gt == "REQUALIFICATION_HOSPITALISATION"]
forfait_dossiers = [i for i, gt in ipp_to_gt.items() if gt == "FORFAIT_URGENCE"]
uhcd_acc_runs = sum(1 for i in uhcd_dossiers for r in bench[i] if r["match"])
forfait_acc_runs = sum(1 for i in forfait_dossiers for r in bench[i] if r["match"])
return {
"mode": mode_label,
"n_dossiers": n_dossiers,
"n_runs": n_runs_total,
"accuracy_runs": correct_total / n_runs_total if n_runs_total else 0,
"accuracy_majority": correct_majority / n_dossiers,
"uhcd_accuracy_runs": uhcd_acc_runs / max(1, len(uhcd_dossiers) * len(next(iter(bench.values())))),
"forfait_accuracy_runs": forfait_acc_runs / max(1, len(forfait_dossiers) * len(next(iter(bench.values())))),
"stability": stable / n_dossiers,
}
def main():
p = argparse.ArgumentParser()
p.add_argument("--runs", type=int, default=3, help="Inférences par dossier")
p.add_argument("--mode", choices=["baseline", "postfix"], default="baseline")
p.add_argument("--model", default=None, help="Modèle Ollama (default: qwen2.5:7b en baseline, gemma3:27b-cloud en postfix)")
p.add_argument("--all", action="store_true", help="Lance baseline + postfix séquentiellement")
args = p.parse_args()
raw = DATA_JS.read_text(encoding="utf-8")
dpis = {}
for ipp, label, gt, ftype in ORDRE_DOSSIERS:
try:
dpis[ipp] = build_dpi_text(ipp, raw)
except Exception as e:
print(f"{ipp} : extraction DPI échouée — {e}", flush=True)
dpis[ipp] = f"[ERREUR EXTRACTION] {e}"
# Sauve les DPI pour audit
(RESULTS_DIR / "dpis.json").write_text(json.dumps(dpis, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"📁 DPI consolidés : {RESULTS_DIR}/dpis.json ({sum(len(v) for v in dpis.values())} chars total)")
if args.all:
for mode, default_model in [("baseline", "qwen2.5:7b"), ("postfix", "gemma3:27b-cloud")]:
mdl = args.model or default_model
print(f"\n=== {mode.upper()} | model={mdl} | runs={args.runs} ===")
bench = run_bench(mode, mdl, args.runs, dpis)
out_path = RESULTS_DIR / f"bench_{mode}.json"
out_path.write_text(json.dumps({"model": mdl, "runs": args.runs, "results": bench}, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"📁 {out_path}")
print(json.dumps(stats(bench, mode), indent=2))
return
mode = args.mode
default_model = "qwen2.5:7b" if mode == "baseline" else "gemma3:27b-cloud"
mdl = args.model or default_model
print(f"\n=== {mode.upper()} | model={mdl} | runs={args.runs} ===")
bench = run_bench(mode, mdl, args.runs, dpis)
out_path = RESULTS_DIR / f"bench_{mode}.json"
out_path.write_text(json.dumps({"model": mdl, "runs": args.runs, "results": bench}, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"📁 {out_path}")
print(json.dumps(stats(bench, mode), indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,413 @@
"""
Harness de comparaison medgemma:4b vs baselines internes.
Usage : python3 tools/benchmark_medgemma_demo.py [--models m1,m2,...] [--out report.md]
Tâches évaluées :
1. Codage CIM-10 (5 vignettes, gold connu) — match exact + match famille (3 chars)
2. Résumé de dossier (3 CRH anonymisés) — qualitatif, longueur, latence
3. Extraction structurée JSON (mêmes 3 CRH) — conformité schéma + remplissage
Métriques : latence, longueur sortie, score CIM-10.
Sortie : rapport markdown + JSON brut pour relecture.
"""
from __future__ import annotations
import argparse
import json
import re
import time
from pathlib import Path
from typing import Any
import requests
OLLAMA_URL = "http://localhost:11434/api/generate"
TIMEOUT = 240 # un appel de 4min max sur les gros modèles
DEFAULT_MODELS = [
"medgemma:4b",
"pmsi-coder-v2:latest",
"qwen2.5:7b",
"gemma4:latest",
]
T2A_ANON = Path("/home/dom/ai/t2a_v2/output/anonymized")
# 5 vignettes CIM-10 — gold construit à partir de cas typiques
CIM10_VIGNETTES = [
{
"id": "v1_idm_inferieur",
"text": (
"Patient de 65 ans, douleur thoracique constrictive irradiant dans "
"le bras gauche depuis 2h. ECG : sus-décalage ST en DII, DIII et "
"aVF. Troponine I : 4,8 ng/mL (N<0,04). Coronarographie : "
"occlusion de la coronaire droite proximale, stent posé."
),
"expected_exact": "I21.1",
"expected_family3": "I21",
"label": "Infarctus du myocarde inférieur",
},
{
"id": "v2_pneumopathie",
"text": (
"Femme 72 ans, fièvre 39°C, toux productive, dyspnée. Examen : "
"crépitants base droite. Radio : foyer alvéolaire lobaire moyen "
"droit. Antigénurie pneumocoque positive. Antibiothérapie par "
"amoxicilline IV 6g/j."
),
"expected_exact": "J13",
"expected_family3": "J13",
"label": "Pneumonie à pneumocoque",
},
{
"id": "v3_avc_ischemique",
"text": (
"Homme 78 ans amené aux urgences pour hémiplégie droite et aphasie "
"d'installation brutale 1h auparavant. NIHSS 14. Scanner cérébral "
"sans injection : pas d'hémorragie. IRM diffusion : restriction "
"sylvienne gauche. Thrombolyse IV par altéplase."
),
"expected_exact": "I63.5",
"expected_family3": "I63",
"label": "AVC ischémique sylvien gauche",
},
{
"id": "v4_decompensation_cardiaque",
"text": (
"Patiente 84 ans, antécédents d'HTA et de cardiopathie ischémique. "
"Dyspnée d'aggravation progressive sur 48h, orthopnée, OMI. "
"Auscultation : crépitants bilatéraux. BNP 2400 pg/mL. Radio : "
"syndrome alvéolo-interstitiel bilatéral, cardiomégalie. "
"Diurétiques IV."
),
"expected_exact": "I50.1",
"expected_family3": "I50",
"label": "Insuffisance cardiaque gauche décompensée",
},
{
"id": "v5_dyspnee_symptome",
"text": (
"Patient 56 ans aux urgences pour dyspnée aiguë sans étiologie "
"retrouvée après bilan complet (D-dimères négatifs, scanner "
"thoracique sans embolie ni foyer, ECG normal, BNP normal). "
"Évolution favorable spontanément. Sortie après 48h."
),
"expected_exact": "R06.0",
"expected_family3": "R06",
"label": "Dyspnée (symptôme isolé, étiologie non retrouvée)",
},
]
# 3 CRH anonymisés réels pour résumé + extraction
CRH_FILES = [
T2A_ANON / "67_23001636/crh_67_23108642_anonymized.txt",
T2A_ANON / "103_23056749/CRH 23056749_anonymized.txt",
T2A_ANON / "407_23116460/407_crh_anonymized.txt",
]
CIM10_PROMPT = """Tu es un médecin codeur PMSI expert en CIM-10.
Vignette clinique :
{text}
Donne UNIQUEMENT le diagnostic principal en CIM-10 au format JSON strict :
{{"code": "X00.0", "label": "libellé court"}}
Aucun texte autour, juste le JSON."""
SUMMARY_PROMPT = """Tu es un médecin résumant un compte-rendu d'hospitalisation pour passage de relais.
Compte-rendu :
{text}
Résume en 5 puces concises (un point par ligne, format `- ...`) :
1. Motif d'admission
2. Antécédents pertinents
3. Diagnostic(s) retenu(s)
4. Traitements engagés
5. Évolution / orientation
Pas de phrases d'introduction. Juste les 5 puces."""
EXTRACTION_PROMPT = """Extrait les informations structurées du compte-rendu suivant.
Compte-rendu :
{text}
Réponds UNIQUEMENT par un JSON strict de ce schéma :
{{
"motif_admission": "string court",
"diagnostics": ["liste de diagnostics retenus"],
"antecedents": ["liste d'antécédents notables"],
"traitements": ["traitements engagés pendant le séjour"],
"date_admission": "JJ/MM/AAAA ou null",
"date_sortie": "JJ/MM/AAAA ou null",
"duree_sejour_jours": null
}}
Si une info est absente, mets null ou liste vide. Aucun texte autour du JSON."""
def call_ollama(model: str, prompt: str) -> tuple[str, float, dict[str, Any]]:
"""Renvoie (output, latency_s, meta)."""
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_ctx": 8192},
}
t0 = time.time()
try:
r = requests.post(OLLAMA_URL, json=payload, timeout=TIMEOUT)
r.raise_for_status()
data = r.json()
latency = time.time() - t0
return data.get("response", ""), latency, {
"eval_count": data.get("eval_count"),
"eval_duration_ns": data.get("eval_duration"),
"load_duration_ns": data.get("load_duration"),
}
except Exception as e:
latency = time.time() - t0
return f"[ERROR: {e}]", latency, {"error": str(e)}
def extract_json(text: str) -> dict | None:
"""Extrait le premier objet JSON d'une chaîne, tolérant aux fences markdown."""
if not text:
return None
# Nettoyer fences ```json ... ```
cleaned = re.sub(r"```(?:json)?\s*", "", text)
cleaned = cleaned.replace("```", "")
# Trouver le premier { ... } équilibré
start = cleaned.find("{")
if start < 0:
return None
depth = 0
for i in range(start, len(cleaned)):
if cleaned[i] == "{":
depth += 1
elif cleaned[i] == "}":
depth -= 1
if depth == 0:
try:
return json.loads(cleaned[start:i + 1])
except json.JSONDecodeError:
return None
return None
def score_cim10(predicted_code: str | None, gold_exact: str, gold_family: str) -> str:
"""Renvoie 'exact', 'family', 'wrong' ou 'parse_error'."""
if not predicted_code:
return "parse_error"
code = predicted_code.upper().strip().replace(" ", "")
if code == gold_exact:
return "exact"
if code[:3] == gold_family:
return "family"
return "wrong"
def run_cim10_task(models: list[str]) -> list[dict]:
results = []
for vig in CIM10_VIGNETTES:
for model in models:
print(f" [CIM-10] {vig['id']:30s} {model:35s}", end=" ", flush=True)
output, latency, meta = call_ollama(model, CIM10_PROMPT.format(text=vig["text"]))
parsed = extract_json(output)
pred_code = parsed.get("code") if parsed else None
score = score_cim10(pred_code, vig["expected_exact"], vig["expected_family3"])
print(f"{pred_code or '?'} ({score}) {latency:.1f}s")
results.append({
"task": "cim10",
"case_id": vig["id"],
"model": model,
"expected_exact": vig["expected_exact"],
"expected_family": vig["expected_family3"],
"predicted": pred_code,
"score": score,
"latency_s": round(latency, 2),
"raw_output": output[:500],
})
return results
def run_summary_task(models: list[str], crh_texts: list[tuple[str, str]]) -> list[dict]:
results = []
for crh_id, crh_text in crh_texts:
for model in models:
print(f" [SUMMARY] {crh_id:30s} {model:35s}", end=" ", flush=True)
output, latency, meta = call_ollama(model, SUMMARY_PROMPT.format(text=crh_text))
n_bullets = sum(1 for line in output.splitlines() if line.strip().startswith(("-", "", "*")))
print(f"{n_bullets} puces, {len(output)} car., {latency:.1f}s")
results.append({
"task": "summary",
"case_id": crh_id,
"model": model,
"n_bullets": n_bullets,
"n_chars": len(output),
"latency_s": round(latency, 2),
"output": output,
})
return results
def run_extraction_task(models: list[str], crh_texts: list[tuple[str, str]]) -> list[dict]:
expected_keys = {"motif_admission", "diagnostics", "antecedents", "traitements",
"date_admission", "date_sortie", "duree_sejour_jours"}
results = []
for crh_id, crh_text in crh_texts:
for model in models:
print(f" [EXTRACT] {crh_id:30s} {model:35s}", end=" ", flush=True)
output, latency, meta = call_ollama(model, EXTRACTION_PROMPT.format(text=crh_text))
parsed = extract_json(output)
if parsed is None:
conformity = "parse_error"
filled = 0
else:
missing = expected_keys - set(parsed.keys())
extras = set(parsed.keys()) - expected_keys
conformity = "conforme" if not missing else f"manque:{','.join(sorted(missing))}"
filled = sum(1 for k in expected_keys
if parsed.get(k) not in (None, "", [], "null"))
print(f"{conformity}, {filled}/7 rempli, {latency:.1f}s")
results.append({
"task": "extraction",
"case_id": crh_id,
"model": model,
"conformity": conformity,
"filled_fields": filled,
"parsed": parsed,
"latency_s": round(latency, 2),
"raw_output": output[:800],
})
return results
def render_report(all_results: list[dict], out_path: Path) -> str:
lines = ["# Benchmark medgemma:4b — démo médicale", ""]
lines.append(f"_Généré le {time.strftime('%Y-%m-%d %H:%M:%S')}_")
lines.append("")
# ---- CIM-10 ----
lines.append("## 1. Codage CIM-10 (5 vignettes)")
lines.append("")
cim_rows = [r for r in all_results if r["task"] == "cim10"]
models = sorted({r["model"] for r in cim_rows})
lines.append("| Modèle | Exact | Famille | Faux | Parse error | Latence moy. |")
lines.append("|---|---:|---:|---:|---:|---:|")
for m in models:
rows = [r for r in cim_rows if r["model"] == m]
n_exact = sum(1 for r in rows if r["score"] == "exact")
n_fam = sum(1 for r in rows if r["score"] == "family")
n_wrong = sum(1 for r in rows if r["score"] == "wrong")
n_perr = sum(1 for r in rows if r["score"] == "parse_error")
avg_lat = sum(r["latency_s"] for r in rows) / max(len(rows), 1)
lines.append(f"| `{m}` | {n_exact}/5 | {n_fam}/5 | {n_wrong}/5 | {n_perr}/5 | {avg_lat:.1f}s |")
lines.append("")
lines.append("### Détail par vignette")
for vig in CIM10_VIGNETTES:
lines.append(f"\n**{vig['id']}** — attendu `{vig['expected_exact']}` ({vig['label']})")
lines.append("")
lines.append("| Modèle | Prédit | Score | Latence |")
lines.append("|---|---|---|---:|")
for r in [x for x in cim_rows if x["case_id"] == vig["id"]]:
lines.append(f"| `{r['model']}` | `{r['predicted'] or ''}` | {r['score']} | {r['latency_s']}s |")
# ---- Résumé ----
lines.append("\n## 2. Résumé de CRH (3 dossiers anonymisés)")
lines.append("")
sum_rows = [r for r in all_results if r["task"] == "summary"]
lines.append("| Modèle | Latence moy. | Longueur moy. | Puces moy. |")
lines.append("|---|---:|---:|---:|")
for m in models:
rows = [r for r in sum_rows if r["model"] == m]
if not rows:
continue
avg_lat = sum(r["latency_s"] for r in rows) / len(rows)
avg_len = sum(r["n_chars"] for r in rows) / len(rows)
avg_bul = sum(r["n_bullets"] for r in rows) / len(rows)
lines.append(f"| `{m}` | {avg_lat:.1f}s | {avg_len:.0f} car. | {avg_bul:.1f} |")
lines.append("")
lines.append("### Sortie complète par modèle (à juger qualitativement)")
for r in sum_rows:
lines.append(f"\n#### {r['case_id']} — `{r['model']}` ({r['latency_s']}s)")
lines.append("```")
lines.append(r["output"][:1500])
lines.append("```")
# ---- Extraction ----
lines.append("\n## 3. Extraction structurée JSON")
lines.append("")
ext_rows = [r for r in all_results if r["task"] == "extraction"]
lines.append("| Modèle | Conformes | Champs remplis moy. | Latence moy. |")
lines.append("|---|---:|---:|---:|")
for m in models:
rows = [r for r in ext_rows if r["model"] == m]
if not rows:
continue
n_conforme = sum(1 for r in rows if r["conformity"] == "conforme")
avg_filled = sum(r["filled_fields"] for r in rows) / len(rows)
avg_lat = sum(r["latency_s"] for r in rows) / len(rows)
lines.append(f"| `{m}` | {n_conforme}/{len(rows)} | {avg_filled:.1f}/7 | {avg_lat:.1f}s |")
lines.append("")
lines.append("### Détail JSON parsé par cas")
for r in ext_rows:
lines.append(f"\n#### {r['case_id']} — `{r['model']}` ({r['conformity']}, {r['latency_s']}s)")
if r["parsed"]:
lines.append("```json")
lines.append(json.dumps(r["parsed"], indent=2, ensure_ascii=False)[:1500])
lines.append("```")
else:
lines.append(f"_Parse error._ Brut : `{r['raw_output'][:300]}`")
out_path.write_text("\n".join(lines), encoding="utf-8")
return "\n".join(lines)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--models", default=",".join(DEFAULT_MODELS),
help="Liste de modèles séparés par virgule")
ap.add_argument("--out", default="docs/BENCH_MEDGEMMA.md")
ap.add_argument("--skip-summary", action="store_true")
ap.add_argument("--skip-extraction", action="store_true")
ap.add_argument("--skip-cim10", action="store_true")
args = ap.parse_args()
models = [m.strip() for m in args.models.split(",") if m.strip()]
print(f"Modèles testés : {models}")
# Charger CRH
crh_texts = []
for path in CRH_FILES:
if path.exists():
crh_texts.append((path.parent.name, path.read_text(encoding="utf-8")))
else:
print(f" [WARN] CRH absent : {path}")
all_results = []
if not args.skip_cim10:
print("\n=== Tâche 1 : Codage CIM-10 ===")
all_results.extend(run_cim10_task(models))
if not args.skip_summary and crh_texts:
print("\n=== Tâche 2 : Résumé de CRH ===")
all_results.extend(run_summary_task(models, crh_texts))
if not args.skip_extraction and crh_texts:
print("\n=== Tâche 3 : Extraction structurée ===")
all_results.extend(run_extraction_task(models, crh_texts))
# Sauvegarde
out_md = Path(args.out)
out_md.parent.mkdir(parents=True, exist_ok=True)
out_json = out_md.with_suffix(".json")
out_json.write_text(json.dumps(all_results, indent=2, ensure_ascii=False), encoding="utf-8")
render_report(all_results, out_md)
print(f"\n✅ Rapport : {out_md}")
print(f"✅ Résultats bruts : {out_json}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,164 @@
"""Duplique le workflow Demo_urgence_2 en Demo_urgence_2_interop.
- Source : wf_d04d2dc7c118_1778493082
- Exclus : ord 13, 15, 16, 18, 19 (steps UI Codage Easily)
- Conservés : ord 0-12, 14, 17 → renumérotés 0..14
- Anchors partagés (pas de duplication de visual_anchors)
- Transaction SQLite : commit unique en fin.
Usage :
python tools/duplicate_demo_urgence_2_interop.py [--dry-run]
"""
from __future__ import annotations
import argparse
import secrets
import sqlite3
import sys
import time
from pathlib import Path
DB_PATH = Path(__file__).resolve().parent.parent / "visual_workflow_builder" / "backend" / "instance" / "workflows.db"
SOURCE_WF_ID = "wf_d04d2dc7c118_1778493082"
NEW_WF_NAME = "Demo_urgence_2_interop"
ORDS_TO_EXCLUDE = {13, 15, 16, 18, 19}
def new_id(prefix: str, ts: int) -> str:
return f"{prefix}_{secrets.token_hex(6)}_{ts}"
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true", help="Pas de COMMIT, juste afficher.")
args = parser.parse_args()
if not DB_PATH.exists():
print(f"ERREUR : DB introuvable {DB_PATH}", file=sys.stderr)
return 1
ts = int(time.time())
new_wf_id = new_id("wf", ts)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# 1. Vérifier que le nom de destination n'existe pas déjà
row = cur.execute("SELECT id FROM workflows WHERE name = ?", (NEW_WF_NAME,)).fetchone()
if row:
print(f"ERREUR : un workflow nommé '{NEW_WF_NAME}' existe déjà (id={row['id']})", file=sys.stderr)
return 2
# 2. Lire la ligne workflow source
src_wf = cur.execute("SELECT * FROM workflows WHERE id = ?", (SOURCE_WF_ID,)).fetchone()
if not src_wf:
print(f"ERREUR : workflow source {SOURCE_WF_ID} introuvable", file=sys.stderr)
return 3
# 3. Lire les steps à conserver, dans l'ordre
src_steps = cur.execute(
'SELECT * FROM steps WHERE workflow_id = ? ORDER BY "order"',
(SOURCE_WF_ID,),
).fetchall()
kept_steps = [s for s in src_steps if s["order"] not in ORDS_TO_EXCLUDE]
if len(kept_steps) != 15:
print(f"ERREUR : attendu 15 steps conservés, obtenu {len(kept_steps)}", file=sys.stderr)
return 4
# 4. Préparer mapping (renumérotation 0..14)
mapping = []
for new_order, s in enumerate(kept_steps):
new_step_id = new_id("step", ts + new_order) # ts unique par step
mapping.append({
"old_id": s["id"],
"new_id": new_step_id,
"old_order": s["order"],
"new_order": new_order,
"action_type": s["action_type"],
"label": s["label"],
"position_x": s["position_x"],
"position_y": s["position_y"],
"parameters_json": s["parameters_json"],
"anchor_id": s["anchor_id"],
})
# 5. Affichage tableau avant/après
print(f"\nWorkflow source : {SOURCE_WF_ID} (name={src_wf['name']})")
print(f"Workflow cible : {new_wf_id} (name={NEW_WF_NAME})")
print(f"Steps conservés : {len(mapping)} / {len(src_steps)}")
print(f"\n{'old_ord':>7}{'new_ord':>7} {'action_type':<20} label")
print("-" * 80)
for m in mapping:
print(f"{m['old_order']:>7}{m['new_order']:>7} {m['action_type']:<20} {m['label']}")
print()
if args.dry_run:
print("--dry-run : aucune modification de la DB.")
return 0
# 6. Exécution transactionnelle
now_iso = time.strftime("%Y-%m-%d %H:%M:%S")
try:
cur.execute("BEGIN")
cur.execute(
"""
INSERT INTO workflows
(id, name, description, tags_json, trigger_examples_json,
created_at, updated_at, is_active, source,
review_status, review_feedback, reviewed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
new_wf_id,
NEW_WF_NAME,
src_wf["description"],
src_wf["tags_json"],
src_wf["trigger_examples_json"],
now_iso,
now_iso,
src_wf["is_active"],
src_wf["source"],
src_wf["review_status"],
src_wf["review_feedback"],
src_wf["reviewed_at"],
),
)
for m in mapping:
cur.execute(
"""
INSERT INTO steps
(id, workflow_id, action_type, "order",
position_x, position_y, parameters_json, anchor_id, label,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
m["new_id"],
new_wf_id,
m["action_type"],
m["new_order"],
m["position_x"],
m["position_y"],
m["parameters_json"],
m["anchor_id"],
m["label"],
now_iso,
now_iso,
),
)
conn.commit()
print(f"OK — workflow {NEW_WF_NAME} créé ({len(mapping)} steps), id={new_wf_id}")
return 0
except Exception as e:
conn.rollback()
print(f"ROLLBACK — exception : {e}", file=sys.stderr)
return 5
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,164 @@
"""Duplique Demo_urgence_2_interop en Demo_urgence_3_db.
- Source : wf_56bf8fa2d332_1778666923 (Demo_urgence_2_interop)
- Conservés : DB ord 0..15 (16 steps = DPI + t2a_decision + 2 llm_generate + Win+D)
- Supprimés : DB ord 16..24 (9 steps Excel — ouverture + remplissage codage_urgence.xlsx)
- Ord conservés tels quels (0..15 reste contigu)
- Anchors partagés (pas de duplication de visual_anchors)
- Transaction SQLite : commit unique en fin.
Usage :
python tools/duplicate_demo_urgence_3_db.py [--dry-run]
"""
from __future__ import annotations
import argparse
import secrets
import sqlite3
import sys
import time
from pathlib import Path
DB_PATH = Path(__file__).resolve().parent.parent / "visual_workflow_builder" / "backend" / "instance" / "workflows.db"
SOURCE_WF_ID = "wf_56bf8fa2d332_1778666923"
NEW_WF_NAME = "Demo_urgence_3_db"
KEEP_ORDS_MAX = 15 # garde 0..15 inclus
EXPECTED_KEPT = 16
def new_id(prefix: str, ts: int) -> str:
return f"{prefix}_{secrets.token_hex(6)}_{ts}"
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true", help="Pas de COMMIT, juste afficher.")
args = parser.parse_args()
if not DB_PATH.exists():
print(f"ERREUR : DB introuvable {DB_PATH}", file=sys.stderr)
return 1
ts = int(time.time())
new_wf_id = new_id("wf", ts)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
row = cur.execute("SELECT id FROM workflows WHERE name = ?", (NEW_WF_NAME,)).fetchone()
if row:
print(f"ERREUR : un workflow nommé '{NEW_WF_NAME}' existe déjà (id={row['id']})", file=sys.stderr)
return 2
src_wf = cur.execute("SELECT * FROM workflows WHERE id = ?", (SOURCE_WF_ID,)).fetchone()
if not src_wf:
print(f"ERREUR : workflow source {SOURCE_WF_ID} introuvable", file=sys.stderr)
return 3
src_steps = cur.execute(
'SELECT * FROM steps WHERE workflow_id = ? ORDER BY "order"',
(SOURCE_WF_ID,),
).fetchall()
kept_steps = [s for s in src_steps if s["order"] <= KEEP_ORDS_MAX]
if len(kept_steps) != EXPECTED_KEPT:
print(
f"ERREUR : attendu {EXPECTED_KEPT} steps conservés, obtenu {len(kept_steps)} "
f"(total source={len(src_steps)})",
file=sys.stderr,
)
return 4
mapping = []
for s in kept_steps:
new_step_id = new_id("step", ts + s["order"])
mapping.append({
"old_id": s["id"],
"new_id": new_step_id,
"order": s["order"],
"action_type": s["action_type"],
"label": s["label"],
"position_x": s["position_x"],
"position_y": s["position_y"],
"parameters_json": s["parameters_json"],
"anchor_id": s["anchor_id"],
})
print(f"\nWorkflow source : {SOURCE_WF_ID} (name={src_wf['name']})")
print(f"Workflow cible : {new_wf_id} (name={NEW_WF_NAME})")
print(f"Steps conservés : {len(mapping)} / {len(src_steps)}")
print(f"\n{'db_ord':>6} {'action_type':<22} {'anchor_id':<32} label")
print("-" * 100)
for m in mapping:
anchor = m["anchor_id"] or "-"
print(f"{m['order']:>6} {m['action_type']:<22} {anchor:<32} {m['label']}")
print()
if args.dry_run:
print("--dry-run : aucune modification de la DB.")
return 0
now_iso = time.strftime("%Y-%m-%d %H:%M:%S")
try:
cur.execute("BEGIN")
cur.execute(
"""
INSERT INTO workflows
(id, name, description, tags_json, trigger_examples_json,
created_at, updated_at, is_active, source,
review_status, review_feedback, reviewed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
new_wf_id,
NEW_WF_NAME,
src_wf["description"],
src_wf["tags_json"],
src_wf["trigger_examples_json"],
now_iso,
now_iso,
src_wf["is_active"],
src_wf["source"],
src_wf["review_status"],
src_wf["review_feedback"],
src_wf["reviewed_at"],
),
)
for m in mapping:
cur.execute(
"""
INSERT INTO steps
(id, workflow_id, action_type, "order",
position_x, position_y, parameters_json, anchor_id, label,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
m["new_id"],
new_wf_id,
m["action_type"],
m["order"],
m["position_x"],
m["position_y"],
m["parameters_json"],
m["anchor_id"],
m["label"],
now_iso,
now_iso,
),
)
conn.commit()
print(f"OK — workflow {NEW_WF_NAME} créé ({len(mapping)} steps), id={new_wf_id}")
return 0
except Exception as e:
conn.rollback()
print(f"ROLLBACK — exception : {e}", file=sys.stderr)
return 5
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,80 @@
"""
Dump runtime des attributs et comportement effectif du processor
Qwen3-VL-8B-Instruct. Script jetable, à supprimer après usage.
Usage : python tools/probe_qwen3vl_processor.py
"""
from transformers import AutoProcessor
from PIL import Image
import torch
MODEL_ID = "Qwen/Qwen3-VL-8B-Instruct"
FIXTURE = "data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png"
print("=" * 70)
print("DUMP PROCESSOR :", MODEL_ID)
print("=" * 70)
proc = AutoProcessor.from_pretrained(MODEL_ID)
ip = proc.image_processor
# Section 1 — Attributs bruts
print("\n--- ATTRIBUTS BRUTS ---")
print("class:", type(ip).__name__)
print("size:", ip.size)
print("patch_size:", ip.patch_size)
print("merge_size:", ip.merge_size)
for attr in ['min_pixels', 'max_pixels', 'temporal_patch_size',
'image_mean', 'image_std', 'do_resize', 'do_rescale',
'rescale_factor', 'do_normalize', 'do_convert_rgb']:
print(f"{attr}:", getattr(ip, attr, '<absent>'))
# Section 2 — Comportement effectif sur fixture
print("\n--- COMPORTEMENT EFFECTIF SUR FIXTURE ---")
img = Image.open(FIXTURE)
print(f"Image source : {img.size} (W×H)")
out = ip(images=img, return_tensors='pt')
print(f"Keys retournées : {list(out.keys())}")
print(f"pixel_values shape : {out['pixel_values'].shape}")
print(f"image_grid_thw : {out.get('image_grid_thw')}")
# Section 3 — Reconstruction des dimensions resize
print("\n--- RECONSTRUCTION DIMS RESIZE ---")
grid = out.get('image_grid_thw')
if grid is not None:
grid = grid[0].tolist() # [t, h, w]
factor = ip.patch_size * ip.merge_size
H_resized = grid[1] * factor
W_resized = grid[2] * factor
print(f"grid_thw : t={grid[0]}, h={grid[1]}, w={grid[2]}")
print(f"factor calculé (patch_size × merge_size) : {factor}")
print(f"Dims resize reconstruites : {W_resized}×{H_resized} (W×H)")
print(f"Dims source : {img.size}")
print(f"Ratio resize : {W_resized / img.size[0]:.4f} (W), "
f"{H_resized / img.size[1]:.4f} (H)")
# Section 4 — Test borne haute pour comprendre min/max_pixels
print("\n--- TEST BORNE HAUTE (image grande) ---")
big_img = Image.new('RGB', (4096, 2560), color='white')
big_out = ip(images=big_img, return_tensors='pt')
big_grid = big_out['image_grid_thw'][0].tolist()
factor = ip.patch_size * ip.merge_size
print(f"Image source : {big_img.size}")
print(f"grid_thw : {big_grid}")
print(f"Dims resize : {big_grid[2] * factor}×{big_grid[1] * factor}")
print(f"Pixels totaux après resize : "
f"{big_grid[1] * factor * big_grid[2] * factor}")
# Section 5 — Test borne basse (image petite)
print("\n--- TEST BORNE BASSE (image petite) ---")
small_img = Image.new('RGB', (128, 64), color='white')
small_out = ip(images=small_img, return_tensors='pt')
small_grid = small_out['image_grid_thw'][0].tolist()
print(f"Image source : {small_img.size}")
print(f"grid_thw : {small_grid}")
print(f"Dims resize : {small_grid[2] * factor}×{small_grid[1] * factor}")
print("\n" + "=" * 70)
print("FIN DUMP")
print("=" * 70)