feat(t2a): build_dpi_enriched - extraction déterministe horaires + classifications cliniques

Préprocesseur Python qui injecte un bloc FAITS_CALCULÉS en tête du DPI
avant l'appel LLM, pour neutraliser l'hallucination de durée (bug "23h"
sur cas MOREL, confusion avec "depuis 23h" de l'Observ. IDE Urg).

Extrait depuis le bandeau Easily Assure et la Synthèse Urgences :
- âge (dateutil.relativedelta)
- date admission / sortie + durée passage (format humain + décimal)
- CCMU / GEMSA libellé complet (parser multi-ligne)
- priorité IAO, mode de venue / médicalisation / mode d'entrée
- diagnostic principal
- decision_terrain + orientation_terrain (metadata only, jamais injectés
  dans le prompt pour ne pas biaiser le LLM)

Retour tuple (dpi_enriched, metadata) pour permettre les garde-fous
serveur Python ↔ LLM au commit 2.

Robustesse :
- re.search 1re occurrence + WARNING si bandeau divergent multi-occurrences
- Synthèse Urgences priorité sur bandeau pour dates
- Valeur exigée sur même ligne que label (évite capture de section title)
- Cas négatif (horaires absents) → "NON CALCULABLE" + parsing_warnings
- Jamais de crash, retour tuple toujours valide

Tests : 4/4 verts (golden MOREL string + metadata, négatif sortie absente,
DPI vide). Pas de régression sur tests/integration/test_t2a_extract.py.

Brief complet : docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-12 18:49:49 +02:00
parent 2eeaa806bb
commit 9872f4510c
4 changed files with 937 additions and 9 deletions

View File

@@ -15,6 +15,7 @@ Extrait de api_stream.py pour clarifier l'architecture.
import json
import logging
import os
import re
import threading
import time
@@ -35,13 +36,27 @@ _ALLOWED_ACTION_TYPES = {
"verify_screen", # Replay hybride : vérification visuelle entre groupes
"pause_for_human", # Pause supervisée explicite (interceptée par /replay/next)
"extract_text", # OCR serveur sur dernier heartbeat → variable workflow
"extract_table", # OCR serveur + filtre regex → liste structurée (boucle)
"extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions
"_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll)
"t2a_decision", # Analyse LLM facturation T2A → variable workflow
"llm_generate", # Génération texte libre côté serveur → variable workflow
}
# Types d'actions exécutées CÔTÉ SERVEUR (jamais transmises à l'Agent V1).
# Le pipeline /replay/next les traite en boucle interne et passe à l'action
# suivante jusqu'à trouver une action visuelle (à transmettre au client).
_SERVER_SIDE_ACTION_TYPES = {"extract_text", "t2a_decision"}
_SERVER_SIDE_ACTION_TYPES = {
"extract_text",
"extract_table",
"t2a_decision",
"llm_generate",
"_concat_text_vars",
}
# Pause par défaut entre Ctrl+End/Home et la capture suivante (ms).
# Configurable par step via parameters.scroll_pause_ms ; default ici.
SCROLL_PAUSE_MS = 500
_MAX_ACTION_TEXT_LENGTH = 10000
_MAX_KEYS_PER_COMBO = 10
# Touches autorisées dans les key_combo (modificateurs + touches spéciales + caractères simples)
@@ -875,6 +890,42 @@ def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str,
}
return [normalized]
elif action_type == "extract_table":
normalized["type"] = "extract_table"
normalized["parameters"] = {
"output_var": action_params.get("output_var", "table_rows"),
"pattern": action_params.get("pattern"),
"limit": action_params.get("limit"),
"region": action_params.get("region"),
}
return [normalized]
elif action_type == "extract_text_scroll":
# Expansion en séquence : OCR(top) → Ctrl+End → wait → OCR(bottom)
# → concat(top, bottom → final) → Ctrl+Home.
# variable_name (préféré) ou output_var (compat extract_text).
final_var = (
action_params.get("variable_name")
or action_params.get("output_var")
or "extracted_text"
)
paragraph = bool(action_params.get("paragraph", True))
# Pause après scroll Ctrl+End — configurable au step.
# Default 500ms (Wikipedia) ; cible 1500-2000ms pour DPI Citrix lent.
try:
scroll_pause = int(action_params.get("scroll_pause_ms", SCROLL_PAUSE_MS))
except (TypeError, ValueError):
scroll_pause = SCROLL_PAUSE_MS
# Variables internes nommées par préfixe : invisibles à l'utilisateur.
# Préfixe `_` pour signaler "interne" et éviter collision.
top_var = f"__{final_var}_top"
bottom_var = f"__{final_var}_bottom"
return _expand_extract_text_scroll(
base, final_var, top_var, bottom_var, paragraph,
scroll_pause_ms=scroll_pause,
)
elif action_type == "t2a_decision":
normalized["type"] = "t2a_decision"
normalized["parameters"] = {
@@ -884,6 +935,22 @@ def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str,
}
return [normalized]
elif action_type == "llm_generate":
normalized["type"] = "llm_generate"
normalized["parameters"] = {
"prompt": action_params.get("prompt", ""),
"context": action_params.get("context", ""),
"output_var": (
action_params.get("output_var")
or action_params.get("variable_name")
or "generated_text"
),
"model": action_params.get("model"),
}
if action_params.get("temperature") is not None:
normalized["parameters"]["temperature"] = action_params.get("temperature")
return [normalized]
else:
logger.warning(f"Type d'action inconnu : {action_type}")
return []
@@ -966,6 +1033,18 @@ def _resolve_runtime_vars(value: Any, variables: Dict[str, Any]) -> Any:
# Handlers pour les actions exécutées côté serveur (extract_text, t2a_decision)
# =========================================================================
def _normalize_ollama_endpoint(raw_url: str) -> str:
"""Normalise une URL Ollama pour les clients qui attendent l'endpoint racine.
`OLLAMA_URL` est parfois configuré vers `/api/generate` alors que
`LLMActionHandler` attend la racine `http://host:port`.
"""
endpoint = (raw_url or "http://localhost:11434").strip().rstrip("/")
for suffix in ("/api/generate", "/api/chat"):
if endpoint.endswith(suffix):
return endpoint[: -len(suffix)]
return endpoint
def _handle_extract_text_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
@@ -979,11 +1058,40 @@ def _handle_extract_text_action(
False (le pipeline continue, pas de blocage).
"""
params = action.get("parameters") or {}
output_var = (params.get("output_var") or "extracted_text").strip()
# Compatibilité VWB : "variable_name" (VWB) et "output_var" (agent libre)
output_var = (params.get("output_var") or params.get("variable_name") or "extracted_text").strip()
paragraph = bool(params.get("paragraph", True))
heartbeat = last_heartbeat.get(session_id) or {}
path = heartbeat.get("path")
# Source prioritaire : screenshot envoyé par l'agent après la dernière action.
# Si c'est du base64, on le sauvegarde dans un fichier temp pour l'OCR.
# Fallback : heartbeat de fond (vrai chemin serveur, via "bg_{machine_id}").
path = None
raw_screenshot = replay_state.get("last_screenshot") or ""
if raw_screenshot:
if raw_screenshot.startswith("data:"):
# base64 → fichier temp
try:
import base64, tempfile
header, b64data = raw_screenshot.split(",", 1)
suffix = ".jpg" if "jpeg" in header else ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.write(base64.b64decode(b64data))
tmp.close()
path = tmp.name
except Exception as e:
logger.warning("extract_text: décodage base64 screenshot échoué: %s", e)
elif os.path.isfile(raw_screenshot):
path = raw_screenshot
if not path:
machine_id = replay_state.get("machine_id", "")
bg_session = f"bg_{machine_id}" if machine_id and machine_id != "default" else None
heartbeat = (
last_heartbeat.get(session_id)
or (last_heartbeat.get(bg_session) if bg_session else None)
or {}
)
path = heartbeat.get("path")
text = ""
if path:
@@ -1006,6 +1114,93 @@ def _handle_extract_text_action(
return bool(text)
def _handle_extract_table_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
session_id: str,
last_heartbeat: Dict[str, Dict[str, Any]],
) -> bool:
"""Traite une action extract_table côté serveur. OCR + filtre regex pour
retourner une liste structurée (ex : IPP d'un tableau de patients) qui
pourra être bouclée par le templating ${patients[i]}.
Paramètres reconnus :
output_var : nom de variable runtime (default "table_rows")
pattern : regex à matcher sur chaque token OCR (ex : r"^25\\d{6}$")
limit : nb max d'entrées à retourner
region : (x, y, w, h) en pixels pour cropper avant OCR
(None = image entière)
Robuste aux échecs : si pas de heartbeat ou OCR raté, stocke [] et
retourne False — le pipeline continue.
"""
params = action.get("parameters") or {}
output_var = (params.get("output_var") or params.get("variable_name") or "table_rows").strip()
pattern = params.get("pattern") or None
limit = params.get("limit")
region = params.get("region") or None
if isinstance(limit, str):
try:
limit = int(limit)
except ValueError:
limit = None
# Source : screenshot du heartbeat (idem extract_text)
path = None
raw_screenshot = replay_state.get("last_screenshot") or ""
if raw_screenshot:
if raw_screenshot.startswith("data:"):
try:
import base64, tempfile
header, b64data = raw_screenshot.split(",", 1)
suffix = ".jpg" if "jpeg" in header else ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.write(base64.b64decode(b64data))
tmp.close()
path = tmp.name
except Exception as e:
logger.warning("extract_table: décodage base64 screenshot échoué: %s", e)
elif os.path.isfile(raw_screenshot):
path = raw_screenshot
if not path:
machine_id = replay_state.get("machine_id", "")
bg_session = f"bg_{machine_id}" if machine_id and machine_id != "default" else None
heartbeat = (
last_heartbeat.get(session_id)
or (last_heartbeat.get(bg_session) if bg_session else None)
or {}
)
path = heartbeat.get("path")
rows: list = []
if path:
try:
from core.llm import extract_table_from_image
rows = extract_table_from_image(
path,
region=tuple(region) if region else None,
pattern=pattern,
limit=limit,
)
except Exception as e:
logger.warning(
"extract_table OCR échoué (%s) — variable '%s' = []", e, output_var,
)
else:
logger.warning(
"extract_table : pas de heartbeat pour session %s — variable '%s' = []",
session_id, output_var,
)
replay_state.setdefault("variables", {})[output_var] = rows
logger.info(
"extract_table → variable '%s' (%d entrées, pattern=%r, limit=%s) replay %s",
output_var, len(rows), pattern, limit, replay_state.get("replay_id", "?"),
)
return bool(rows)
def _handle_t2a_decision_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
@@ -1034,8 +1229,19 @@ def _handle_t2a_decision_action(
return False
try:
from core.llm import analyze_dpi, DEFAULT_MODEL
result = analyze_dpi(dpi_text, model=model or DEFAULT_MODEL)
from core.llm import analyze_dpi, DEFAULT_MODEL, build_dpi_enriched
# Enrichissement déterministe avant LLM : injection FAITS_CALCULÉS en
# tête (durée, âge, CCMU/GEMSA/priorité…) pour neutraliser les
# hallucinations de durée (cf. bug "23h" MOREL). La metadata est
# capturée pour les garde-fous serveur (commit 2 — Python ↔ LLM).
dpi_enriched, metadata = build_dpi_enriched(dpi_text)
logger.info(
"[build_dpi_enriched] duree_python=%sh decision_terrain=%r warnings=%d",
metadata.get("duree_heures_decimales"),
metadata.get("decision_terrain"),
len(metadata.get("parsing_warnings", [])),
)
result = analyze_dpi(dpi_enriched, model=model or DEFAULT_MODEL)
except Exception as e:
logger.warning("t2a_decision : analyze_dpi exception %s", e)
result = {
@@ -1045,6 +1251,18 @@ def _handle_t2a_decision_action(
"_error": str(e),
}
# Si parse_error, injecter des valeurs de fallback pour que les templates restent lisibles
if result.get("_parse_error"):
raw_preview = result.get("_raw", "")[:200]
logger.warning("t2a_decision parse_error — raw: %s", raw_preview)
result.setdefault("decision", "INDETERMINE")
result.setdefault("decision_court", "À vérifier")
result.setdefault("preuve_critere1", "Analyse non disponible (erreur LLM)")
result.setdefault("preuve_critere2", "Analyse non disponible (erreur LLM)")
result.setdefault("preuve_critere3", "Analyse non disponible (erreur LLM)")
result.setdefault("justification", f"Réponse LLM non parsable : {raw_preview}")
result.setdefault("confiance", "faible")
replay_state.setdefault("variables", {})[output_var] = result
decision = result.get("decision", "?")
elapsed = result.get("_elapsed_s", "?")
@@ -1052,7 +1270,176 @@ def _handle_t2a_decision_action(
"t2a_decision → variable '%s' decision=%s (%ss) replay %s",
output_var, decision, elapsed, replay_state.get("replay_id", "?"),
)
return "_error" not in result
return "_error" not in result and not result.get("_parse_error")
def _handle_llm_generate_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
) -> bool:
"""Traite une action llm_generate côté serveur.
Stocke le texte généré dans replay_state["variables"][output_var].
Les paramètres `prompt` et `context` sont déjà résolus via le templating
runtime avant d'arriver ici.
"""
params = action.get("parameters") or {}
output_var = (
params.get("output_var")
or params.get("variable_name")
or "generated_text"
).strip()
prompt = str(params.get("prompt") or "").strip()
context = str(params.get("context") or "")
model = params.get("model") or None
temperature = None
if params.get("temperature") is not None:
try:
temperature = float(params.get("temperature"))
except (TypeError, ValueError):
logger.warning(
"llm_generate : temperature invalide %r — fallback valeur par défaut",
params.get("temperature"),
)
if not prompt:
logger.warning(
"llm_generate : prompt vide — variable '%s' = ''",
output_var,
)
replay_state.setdefault("variables", {})[output_var] = ""
return False
generated = ""
try:
from core.execution.llm_actions import LLMActionHandler
handler = LLMActionHandler(
ollama_endpoint=_normalize_ollama_endpoint(
os.environ.get("OLLAMA_URL", "http://localhost:11434")
),
timeout=180,
)
generated = handler.generate_text(
prompt=prompt,
context=context,
model=model,
temperature=temperature,
).strip()
except Exception as e:
logger.warning("llm_generate : génération échouée (%s) — variable '%s' = ''", e, output_var)
replay_state.setdefault("variables", {})[output_var] = generated
logger.info(
"llm_generate → variable '%s' (%d chars, model=%s) replay %s",
output_var,
len(generated),
model or "default",
replay_state.get("replay_id", "?"),
)
return bool(generated)
def _handle_concat_text_vars_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
) -> bool:
"""Traite une action serveur interne `_concat_text_vars`.
Concatène deux variables runtime existantes (top_var + separator + bottom_var)
et écrit le résultat dans output_var. Variables manquantes traitées comme "".
Action générée par l'expansion de `extract_text_scroll` ; pas exposée à
l'utilisateur final. Robuste aux échecs OCR amont (l'une ou l'autre var vide).
"""
params = action.get("parameters") or {}
top_var = (params.get("top_var") or "").strip()
bottom_var = (params.get("bottom_var") or "").strip()
output_var = (params.get("output_var") or "extracted_text").strip()
separator = params.get("separator", "\n\n")
variables = replay_state.setdefault("variables", {})
top_text = str(variables.get(top_var, "") or "")
bottom_text = str(variables.get(bottom_var, "") or "")
# Si les deux sont vides, output reste "" (cohérent avec _handle_extract_text_action).
# Si un seul est vide, on évite un séparateur inutile en début/fin.
if top_text and bottom_text:
merged = top_text + separator + bottom_text
else:
merged = top_text or bottom_text
variables[output_var] = merged
# Nettoyage des variables internes pour ne pas polluer l'état.
if top_var.startswith("__"):
variables.pop(top_var, None)
if bottom_var.startswith("__"):
variables.pop(bottom_var, None)
logger.info(
"extract_text_scroll concat → variable '%s' (%d chars) replay %s",
output_var, len(merged), replay_state.get("replay_id", "?"),
)
return bool(merged)
def _expand_extract_text_scroll(
base: Dict[str, Any],
final_var: str,
top_var: str,
bottom_var: str,
paragraph: bool,
scroll_pause_ms: int = SCROLL_PAUSE_MS,
) -> List[Dict[str, Any]]:
"""Expanse un step extract_text_scroll en séquence d'actions atomiques.
Séquence générée :
1. extract_text(top_var) — OCR zone visible (haut de page)
2. key_combo(ctrl+end) — scroll bas (côté client Léa V1)
3. wait(scroll_pause_ms) — laisse DOM/UI se redessiner
4. extract_text(bottom_var) — OCR zone visible (bas de page)
5. _concat_text_vars(top, bottom→final) — action serveur interne
6. key_combo(ctrl+home) — remet en haut
Toutes les sous-actions héritent de `base` (edge_id, from_node, to_node)
pour la traçabilité. Chaque action obtient un action_id unique.
`scroll_pause_ms` : configurable au step (défaut SCROLL_PAUSE_MS=500ms).
"""
def _new_action() -> Dict[str, Any]:
return {**base, "action_id": f"act_{uuid.uuid4().hex[:8]}"}
a1 = _new_action()
a1["type"] = "extract_text"
a1["parameters"] = {"output_var": top_var, "paragraph": paragraph}
a2 = _new_action()
a2["type"] = "key_combo"
a2["keys"] = ["ctrl", "end"]
a3 = _new_action()
a3["type"] = "wait"
a3["duration_ms"] = scroll_pause_ms
a4 = _new_action()
a4["type"] = "extract_text"
a4["parameters"] = {"output_var": bottom_var, "paragraph": paragraph}
a5 = _new_action()
a5["type"] = "_concat_text_vars"
a5["parameters"] = {
"top_var": top_var,
"bottom_var": bottom_var,
"output_var": final_var,
"separator": "\n\n",
}
a6 = _new_action()
a6["type"] = "key_combo"
a6["keys"] = ["ctrl", "home"]
return [a1, a2, a3, a4, a5, a6]
def _expand_compound_steps(

View File

@@ -4,12 +4,15 @@ from .t2a_decision import (
PROMPT_TEMPLATE,
DEFAULT_MODEL,
analyze_dpi,
build_dpi_enriched,
)
from .ocr_extractor import extract_text_from_image
from .ocr_extractor import extract_table_from_image, extract_text_from_image
__all__ = [
"PROMPT_TEMPLATE",
"DEFAULT_MODEL",
"analyze_dpi",
"build_dpi_enriched",
"extract_text_from_image",
"extract_table_from_image",
]

View File

@@ -17,10 +17,14 @@ from __future__ import annotations
import json
import logging
import os
import re
import time
import urllib.error
import urllib.request
from typing import Any, Dict
from datetime import datetime
from typing import Any, Dict, Optional, Tuple
from dateutil.relativedelta import relativedelta
logger = logging.getLogger(__name__)
@@ -166,3 +170,287 @@ def analyze_dpi(
parsed["_model"] = model
parsed["_eval_count"] = body.get("eval_count")
return parsed
# ---------------------------------------------------------------------------
# build_dpi_enriched — extraction déterministe horaires + classifications
# ---------------------------------------------------------------------------
#
# Voir docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md pour le contexte
# complet (bug hallucination durée "23h" sur cas MOREL, plan P0 [S1]/[S2]).
# Libellés connus de la section "Synthèse Urgences" (Easily Assure).
# Servent d'ancres pour le parsing : la valeur d'un champ s'étend depuis son
# libellé jusqu'au prochain libellé connu (gère le wrap multi-ligne CCMU/GEMSA).
# Trier par longueur décroissante côté regex pour éviter qu'un préfixe partiel
# (ex: "Décision médicale" matche "Médecin de la décision médicale").
_LIBELLES_SYNTHESE = [
"Episode - Date",
"Mode de transport à l'arrivée",
"Médicalisation du transport",
"Mode d'entrée",
"Origine du transfert",
"Date d'orientation",
"IAO",
"Priorité",
"Episode - Sous-type",
"Circonstances",
"Motif de prise en charge",
"Observ. IDE Urg",
"Médecin de la prise en charge médicale",
"Date de la prise en charge médicale",
"CCMU",
"GEMSA",
"Diagnostics",
"Médecin de la décision médicale",
"Date de décision médicale",
"Décision médicale",
"Orientation du patient",
"US de destination",
]
def _extract_synthese_field(text: str, label: str) -> Optional[str]:
"""Extrait la valeur du champ `label` dans la Synthèse Urgences.
Stratégie : trouve la 1re ligne qui commence par `label`, capture la suite
jusqu'au prochain libellé connu (en début de ligne) ou fin de texte.
Gère le wrap multi-ligne (CCMU 3 + libellé long sur plusieurs lignes).
Normalise les espaces multiples internes en un seul espace.
"""
other_labels = sorted(
(l for l in _LIBELLES_SYNTHESE if l != label),
key=len,
reverse=True,
)
other_alt = "|".join(re.escape(l) for l in other_labels)
# Exigence : la valeur doit commencer sur la même ligne que le label
# ([^\n]). Cela évite qu'une ligne titre de section ("Décision médicale\n"
# seule) capture la ligne suivante (qui serait un autre champ).
# Le wrap multi-ligne CCMU/GEMSA reste supporté : 1er char sur la même
# ligne que le label, suite via DOTALL jusqu'au prochain libellé connu.
pattern = (
rf"^[\s\|]*{re.escape(label)}[ \t]*[:\|]?[ \t]+"
rf"([^\n].*?)(?=\n[\s\|]*(?:{other_alt})\b|\Z)"
)
m = re.search(pattern, text, re.DOTALL | re.MULTILINE)
if not m:
return None
value = re.sub(r"\s+", " ", m.group(1)).strip()
return value or None
def _parse_dt(date_str: str, time_str: str) -> Optional[datetime]:
"""Parse 'DD/MM/YYYY' + 'HH:MM' → datetime. Retourne None si échec."""
try:
return datetime.strptime(f"{date_str} {time_str}", "%d/%m/%Y %H:%M")
except ValueError:
return None
def _parse_synthese_datetime(value: Optional[str]) -> Optional[datetime]:
"""Parse une valeur de type 'JJ/MM/AAAA à HH:MM' (format Synthèse Urgences).
Tolère absence du séparateur ' à ' (OCR peut linéariser autrement).
"""
if not value:
return None
m = re.search(r"(\d{2}/\d{2}/\d{4})\s*(?:à\s*)?(\d{2}:\d{2})", value)
if not m:
return None
return _parse_dt(m.group(1), m.group(2))
def build_dpi_enriched(dpi_raw: str) -> Tuple[str, Dict[str, Any]]:
"""Enrichit le DPI brut avec un bloc FAITS_CALCULÉS en tête.
Extrait de manière déterministe (Python, pas LLM) :
- âge du patient (depuis date de naissance bandeau + date d'admission)
- durée totale du passage (depuis horaires admission/sortie)
- CCMU, GEMSA, priorité IAO, mode de venue, diagnostic principal
- décision médicale terrain + orientation (metadata uniquement, NON injectés
dans le bloc FAITS_CALCULÉS pour ne pas biaiser le LLM)
Le bloc FAITS_CALCULÉS est concaténé en tête du DPI retourné.
Le metadata dict permet au handler serveur d'effectuer les garde-fous
Python ↔ LLM au commit 2.
Args:
dpi_raw: DPI brut concaténé (5 onglets OCR scroll auto + bandeau répété).
Returns:
Tuple (dpi_enriched, metadata).
dpi_enriched: str — FAITS_CALCULÉS + "\\n\\n" + dpi_raw.
metadata: dict — toutes les valeurs Python extraites (None si parsing
échoué pour un champ), + parsing_warnings: list[str].
"""
metadata: Dict[str, Any] = {
"age_ans": None,
"date_admission": None,
"date_sortie": None,
"duree_heures_decimales": None,
"ccmu": None,
"gemsa": None,
"priorite_iao": None,
"mode_venue": None,
"mode_medicalisation": None,
"mode_entree": None,
"diagnostic_principal": None,
"decision_terrain": None,
"orientation_terrain": None,
"parsing_warnings": [],
}
# ── 1. Bandeau Easily Assure (re.search = 1re occurrence) ────────────────
# Le bandeau est répété ~5 fois dans dpi_raw (un par extract_text_scroll
# de chaque onglet). On prend la 1re occurrence et on signale une éventuelle
# divergence inter-occurrences (symptôme d'OCR instable).
date_naissance: Optional[datetime] = None
date_adm_bandeau: Optional[datetime] = None
date_sortie_bandeau: Optional[datetime] = None
m_naissance = re.search(r"\(e\)\s+le\s+(\d{2}/\d{2}/\d{4})", dpi_raw)
if m_naissance:
try:
date_naissance = datetime.strptime(m_naissance.group(1), "%d/%m/%Y")
except ValueError:
metadata["parsing_warnings"].append(
f"date_naissance non parsable : {m_naissance.group(1)!r}"
)
arrivees = re.findall(
r"Arriv[ée]e\s*:\s*(\d{2}/\d{2}/\d{4})\s+(\d{2}:\d{2})", dpi_raw
)
if arrivees:
date_adm_bandeau = _parse_dt(*arrivees[0])
if len({a for a in arrivees}) > 1:
logger.warning(
"[build_dpi_enriched] Bandeau détecté %d fois avec divergences "
"sur Arrivée — prise de la 1re occurrence : %s",
len(arrivees), arrivees[0],
)
sorties = re.findall(
r"Sortie\s*:\s*(\d{2}/\d{2}/\d{4})\s+(\d{2}:\d{2})", dpi_raw
)
if sorties:
date_sortie_bandeau = _parse_dt(*sorties[0])
if len({s for s in sorties}) > 1:
logger.warning(
"[build_dpi_enriched] Bandeau détecté %d fois avec divergences "
"sur Sortie — prise de la 1re occurrence : %s",
len(sorties), sorties[0],
)
# ── 2. Synthèse Urgences (priorité sur le bandeau) ───────────────────────
syn_episode_date = _parse_synthese_datetime(
_extract_synthese_field(dpi_raw, "Episode - Date")
)
syn_pec_medicale = _parse_synthese_datetime(
_extract_synthese_field(dpi_raw, "Date de la prise en charge médicale")
)
syn_decision_medicale = _parse_synthese_datetime(
_extract_synthese_field(dpi_raw, "Date de décision médicale")
)
# Priorité d'admission : Episode-Date > PEC médicale > bandeau Arrivée
metadata["date_admission"] = (
syn_episode_date or syn_pec_medicale or date_adm_bandeau
)
# Priorité de sortie : Date décision médicale > bandeau Sortie
metadata["date_sortie"] = syn_decision_medicale or date_sortie_bandeau
# Champs structurés Synthèse Urgences (libellé complet, multi-ligne ok)
metadata["ccmu"] = _extract_synthese_field(dpi_raw, "CCMU")
metadata["gemsa"] = _extract_synthese_field(dpi_raw, "GEMSA")
metadata["priorite_iao"] = _extract_synthese_field(dpi_raw, "Priorité")
metadata["mode_venue"] = _extract_synthese_field(
dpi_raw, "Mode de transport à l'arrivée"
)
metadata["mode_medicalisation"] = _extract_synthese_field(
dpi_raw, "Médicalisation du transport"
)
metadata["mode_entree"] = _extract_synthese_field(dpi_raw, "Mode d'entrée")
metadata["diagnostic_principal"] = _extract_synthese_field(dpi_raw, "Diagnostics")
metadata["decision_terrain"] = _extract_synthese_field(dpi_raw, "Décision médicale")
metadata["orientation_terrain"] = _extract_synthese_field(
dpi_raw, "US de destination"
)
# ── 3. Calculs dérivés ───────────────────────────────────────────────────
if date_naissance and metadata["date_admission"]:
metadata["age_ans"] = relativedelta(
metadata["date_admission"], date_naissance
).years
elif date_naissance is None:
metadata["parsing_warnings"].append("date_naissance non détectée dans bandeau")
duree_format_humain: Optional[str] = None
if metadata["date_admission"] and metadata["date_sortie"]:
delta = metadata["date_sortie"] - metadata["date_admission"]
total_seconds = delta.total_seconds()
if total_seconds <= 0:
metadata["parsing_warnings"].append(
f"durée invalide : sortie {metadata['date_sortie']} <= "
f"admission {metadata['date_admission']}"
)
else:
metadata["duree_heures_decimales"] = round(total_seconds / 3600, 2)
heures = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
duree_format_humain = f"{heures} heures et {minutes} minutes"
else:
if metadata["date_admission"] is None:
metadata["parsing_warnings"].append("date_admission non détectée")
if metadata["date_sortie"] is None:
metadata["parsing_warnings"].append("date_sortie non détectée")
logger.warning(
"[build_dpi_enriched] Durée non calculable — admission=%s sortie=%s",
metadata["date_admission"], metadata["date_sortie"],
)
# ── 4. Construction du bloc FAITS_CALCULÉS ───────────────────────────────
lignes = ["FAITS_CALCULÉS (déterministes, ne pas recalculer) :"]
if metadata["age_ans"] is not None:
lignes.append(f"- Âge du patient : {metadata['age_ans']} ans")
if metadata["date_admission"]:
lignes.append(
f"- Date admission : "
f"{metadata['date_admission'].strftime('%d/%m/%Y à %H:%M')}"
)
if metadata["date_sortie"]:
lignes.append(
f"- Date sortie : "
f"{metadata['date_sortie'].strftime('%d/%m/%Y à %H:%M')}"
)
if duree_format_humain and metadata["duree_heures_decimales"] is not None:
lignes.append(
f"- Durée totale du passage : {duree_format_humain} "
f"(soit {metadata['duree_heures_decimales']} heures décimales)"
)
elif metadata["date_admission"] is None or metadata["date_sortie"] is None:
lignes.append(
"- Durée totale du passage : NON CALCULABLE (horaires non détectés)"
)
if metadata["ccmu"]:
lignes.append(f"- CCMU : {metadata['ccmu']}")
if metadata["gemsa"]:
lignes.append(f"- GEMSA : {metadata['gemsa']}")
if metadata["priorite_iao"]:
lignes.append(f"- Priorité IAO : {metadata['priorite_iao']}")
if metadata["mode_venue"] or metadata["mode_medicalisation"]:
mode_parts = [
p for p in (metadata["mode_venue"], metadata["mode_medicalisation"]) if p
]
lignes.append(f"- Mode de venue : {', '.join(mode_parts)}")
if metadata["mode_entree"]:
lignes.append(f"- Mode d'entrée : {metadata['mode_entree']}")
if metadata["diagnostic_principal"]:
lignes.append(f"- Diagnostic principal : {metadata['diagnostic_principal']}")
bloc_faits = "\n".join(lignes)
dpi_enriched = f"{bloc_faits}\n\n{dpi_raw}"
return dpi_enriched, metadata

View File

@@ -0,0 +1,250 @@
"""Tests de la fonction build_dpi_enriched (core/llm/t2a_decision).
Voir docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md pour le contexte.
Cas pilote : MOREL Catherine (IPP 25003284), passage urgences 01/01/2025
03:12 → 06:49 (3h37 réelles). Bug initial : LLM hallucine 23h (confusion
avec "depuis 23h" présent dans Observ. IDE Urg).
"""
from __future__ import annotations
from core.llm.t2a_decision import build_dpi_enriched
# ----------------------------------------------------------------------------
# Fixtures DPI — reproduisent le format texte attendu en sortie OCR scroll auto.
# Le bandeau Easily Assure est répété en tête de chaque onglet (effet de bord
# de extract_text(top_var) qui capture la zone visible incluant l'en-tête fixe).
# ----------------------------------------------------------------------------
_BANDEAU_MOREL = (
"IPP : 25003284 MOREL Catherine Né(e) le 14/03/1947 | 77 ans | Sexe : F | "
"Arrivée : 01/01/2025 03:12 | IAO : CARON Sandrine (03:25) | "
"Médecin : BONNET Antoine | Sortie : 01/01/2025 06:49"
)
_BANDEAU_MOREL_SANS_SORTIE = (
"IPP : 25003284 MOREL Catherine Né(e) le 14/03/1947 | 77 ans | Sexe : F | "
"Arrivée : 01/01/2025 03:12 | IAO : CARON Sandrine (03:25) | "
"Médecin : BONNET Antoine"
)
def _build_dpi_morel(
bandeau: str = _BANDEAU_MOREL,
inclure_date_decision: bool = True,
) -> str:
"""Construit un DPI MOREL plausible (bandeau répété 5x + sections d'onglets +
Synthèse Urgences). Permet de générer la variante "test négatif" en
retirant la Date de décision médicale.
"""
date_decision_ligne = (
"Date de décision médicale 01/01/2025 à 06:49\n"
if inclure_date_decision
else ""
)
return f"""{bandeau}
Motif d'admission :
Asthme — quintes de toux sèche depuis 23h, fièvre, tachycardie.
{bandeau}
Examens cliniques :
À l'admission : fièvre 39.2°C, tachycardie 91 bpm, peakflow 260.
Sibilants bilatéraux, conscience normale.
{bandeau}
Imagerie :
Condensation parenchymateuse basithoracique gauche avec émoussement
du cul-de-sac pleural.
{bandeau}
Notes médicales :
Bilan biologique : hyperleucocytose 11 000 GB, CRP 25.
PCR positive pour virus respiratoire syncytial.
Traitement : Augmentin + Ventoline 3/j.
{bandeau}
Synthèse Urgences
Détails de l'épisode
Episode - Date 01/01/2025 à 03:12
Mode de transport à l'arrivée Véhicule personnel
Médicalisation du transport Aucune médicalisation
Mode d'entrée Autres admissions urgentes
Origine du transfert
Détails de l'orientation aux Urgences
Date d'orientation 01/01/2025 à 03:25
IAO CARON Sandrine
Priorité Priorité 3
Episode - Sous-type Médecine
Circonstances
Motif de prise en charge Asthme
Observ. IDE Urg depuis 23h , quintes de toux sèche
a pris 2+2 B de VENTO = inefficace
ATCD = asthme , insuf coronarienne
Détails de la prise en charge
Médecin de la prise en charge médicale BONNET Antoine
Date de la prise en charge médicale 01/01/2025 à 03:12
CCMU 3. Etat lésionnel et/ou pronostic fonctionnel jugés susceptibles de s'aggraver aux urgences ou durant l'intervention du SMUR, sans mettre en jeu le pronostic vital
GEMSA 2. Patient non convoqué sortant après consultation ou soins.
Diagnostics J12.1 Pneumopathie due au virus respiratoire syncytial [VRS] [CMA2] - actif
Décision médicale
Médecin de la décision médicale BONNET Antoine
{date_decision_ligne}Décision médicale Consultation externe
Orientation du patient
US de destination UC CONSULT.URGENCES
"""
# ----------------------------------------------------------------------------
# Test golden — cas MOREL complet
# ----------------------------------------------------------------------------
def test_golden_morel_dpi_enriched_string():
"""Le bloc FAITS_CALCULÉS contient toutes les valeurs attendues."""
dpi_raw = _build_dpi_morel()
dpi_enriched, _metadata = build_dpi_enriched(dpi_raw)
# Durée — format humain + décimal
assert "Durée totale du passage : 3 heures et 37 minutes" in dpi_enriched
assert "(soit 3.62 heures décimales)" in dpi_enriched
# Âge
assert "Âge du patient : 77 ans" in dpi_enriched
# CCMU libellé COMPLET (début + portion fin = reconstitution multi-ligne)
assert "CCMU : 3. Etat lésionnel" in dpi_enriched
assert "sans mettre en jeu le pronostic vital" in dpi_enriched
# GEMSA libellé COMPLET
assert "GEMSA : 2. Patient non convoqué sortant après consultation ou soins" in dpi_enriched
# Priorité IAO
assert "Priorité IAO : Priorité 3" in dpi_enriched
# Diagnostic principal
assert "J12.1 Pneumopathie" in dpi_enriched
# Bloc FAITS_CALCULÉS positionné AVANT le bandeau brut
idx_faits = dpi_enriched.index("FAITS_CALCULÉS")
idx_bandeau = dpi_enriched.index("IPP : 25003284")
assert idx_faits < idx_bandeau, "FAITS_CALCULÉS doit être en tête du DPI"
# decision_terrain et orientation_terrain ABSENTS du bloc FAITS_CALCULÉS
# (ils sont dans metadata mais ne doivent pas biaiser le LLM)
bloc_faits = dpi_enriched.split("\n\n", 1)[0]
assert "Consultation externe" not in bloc_faits
assert "UC CONSULT.URGENCES" not in bloc_faits
assert "Décision médicale terrain" not in bloc_faits
def test_golden_morel_metadata_complet():
"""Le dict metadata contient toutes les valeurs Python extraites."""
dpi_raw = _build_dpi_morel()
_enriched, metadata = build_dpi_enriched(dpi_raw)
# Durée précise (tolérance arrondi)
assert metadata["duree_heures_decimales"] is not None
assert abs(metadata["duree_heures_decimales"] - 3.62) < 0.01
# Âge
assert metadata["age_ans"] == 77
# CCMU/GEMSA libellé complet (validation startswith + extrait fin)
assert metadata["ccmu"] is not None
assert metadata["ccmu"].startswith("3.")
assert "Etat lésionnel" in metadata["ccmu"]
assert "pronostic vital" in metadata["ccmu"]
assert metadata["gemsa"] is not None
assert metadata["gemsa"].startswith("2.")
assert "non convoqué" in metadata["gemsa"]
# Priorité IAO
assert metadata["priorite_iao"] == "Priorité 3"
# Mode de venue + médicalisation
assert metadata["mode_venue"] == "Véhicule personnel"
assert metadata["mode_medicalisation"] == "Aucune médicalisation"
assert metadata["mode_entree"] == "Autres admissions urgentes"
# Diagnostic principal
assert metadata["diagnostic_principal"] is not None
assert "J12.1" in metadata["diagnostic_principal"]
assert "Pneumopathie" in metadata["diagnostic_principal"]
# Décision terrain (metadata uniquement, pour garde-fou serveur commit 2)
assert metadata["decision_terrain"] == "Consultation externe"
assert metadata["orientation_terrain"] == "UC CONSULT.URGENCES"
# Dates parsées
assert metadata["date_admission"] is not None
assert metadata["date_admission"].strftime("%Y-%m-%d %H:%M") == "2025-01-01 03:12"
assert metadata["date_sortie"] is not None
assert metadata["date_sortie"].strftime("%Y-%m-%d %H:%M") == "2025-01-01 06:49"
# Pas de warning de parsing sur un cas complet
assert metadata["parsing_warnings"] == []
# ----------------------------------------------------------------------------
# Test négatif — horaires de sortie absents
# ----------------------------------------------------------------------------
def test_negatif_sortie_absente():
"""Si Date de décision médicale ET Sortie bandeau sont retirées,
la fonction ne crashe pas, signale NON CALCULABLE et accumule un warning.
"""
dpi_raw = _build_dpi_morel(
bandeau=_BANDEAU_MOREL_SANS_SORTIE,
inclure_date_decision=False,
)
dpi_enriched, metadata = build_dpi_enriched(dpi_raw)
# Pas de crash, retour valide
assert isinstance(dpi_enriched, str)
assert isinstance(metadata, dict)
# Ligne explicite de signalement dans FAITS_CALCULÉS
assert "Durée totale du passage : NON CALCULABLE" in dpi_enriched
# Metadata : duree None, date_sortie None
assert metadata["duree_heures_decimales"] is None
assert metadata["date_sortie"] is None
# Admission reste détectable (Episode-Date + bandeau Arrivée présents)
assert metadata["date_admission"] is not None
# Warnings explicites
assert metadata["parsing_warnings"]
assert any("date_sortie" in w for w in metadata["parsing_warnings"])
# ----------------------------------------------------------------------------
# Test robustesse — DPI vide / malformé ne crashe pas
# ----------------------------------------------------------------------------
def test_dpi_vide_ne_crashe_pas():
"""Un DPI vide retourne un tuple valide avec metadata tous None."""
dpi_enriched, metadata = build_dpi_enriched("")
assert isinstance(dpi_enriched, str)
assert dpi_enriched.startswith("FAITS_CALCULÉS")
assert metadata["duree_heures_decimales"] is None
assert metadata["age_ans"] is None
assert metadata["ccmu"] is None
assert metadata["decision_terrain"] is None
# Warnings accumulés sur tous les champs critiques
assert metadata["parsing_warnings"]