fix(stream+vwb): chaîne replay robuste — auth, anchor type_text, lock async, drift, prompt LLM

Six modifications structurelles côté serveur, non destructives, aboutissant à un
pipeline replay bien plus stable pour la démo GHT Sud 95 (Urgences UHCD).

1. visual_workflow_builder/backend/app.py
   load_dotenv() chargeait .env (cwd) au lieu de .env.local racine projet.
   Conséquence : RPA_API_TOKEN absent après chaque restart manuel du backend
   et tous les proxies VWB→streaming échouaient en 401 « Token API invalide ».
   Charge maintenant explicitement .env.local du project root.

2. visual_workflow_builder/backend/api_v3/learned_workflows.py
   Quatre appels proxy /api/v1/traces/stream/* ne portaient pas le Bearer.
   Helper _stream_headers() factorisé et appliqué (workflows list/detail,
   workflow detail, reload-workflows).

3. visual_workflow_builder/backend/api_v3/dag_execute.py
   _ANCHOR_CLICK_TYPES excluait type_text/type_secret : pas de pre-click de
   focus avant la frappe → texte tapé sans focus → textareas vides au replay.
   Helper _inject_anchor_targeting() factorisé (centre bbox + visual_mode +
   target_spec) appliqué aux click_anchor* ET aux type_text/type_secret dès
   qu'un anchor_id est présent. Workflows historiques sans anchor sur
   type_text → comportement inchangé.

4. agent_v0/server_v1/api_stream.py — endpoint /replay/next
   _replay_lock (threading.Lock global) tenu pendant les actions serveur
   lentes (extract_text OCR ~5s, t2a_decision LLM ~8-13s). Comme le handler
   est async def, l'event loop FastAPI était bloqué : les polls clients
   timeout à 5s, leurs actions étaient popped serveur sans destinataire,
   perdues silencieusement. Mesure : 8 actions/25 perdues sur replay Urgence.

   acquire(timeout=4.5) puis run_in_executor pour libérer l'event loop
   pendant l'attente du lock ET pendant les handlers serveur synchrones.
   Pendant un t2a_decision en cours, les polls concurrents reçoivent
   immédiatement {action: null, server_busy: true} → l'agent ne timeout
   plus, aucune action n'est popped sans destinataire.

5. agent_v0/server_v1/resolve_engine.py — _validate_resolution_quality
   Drift > 0.20 par rapport aux coords enregistrées → fallback aux coords
   enregistrées même quand le template matching trouve l'image avec un
   score quasi parfait. Or un score >= 0.95 signifie que l'image EST
   visuellement à l'écran à l'endroit indiqué, le drift reflète juste
   un changement de layout (scroll, F11, redimensionnement), pas une
   erreur. Exception ajoutée : score >= 0.95 sur template_matching →
   ignore drift check, utilise position visuelle.

6. core/llm/t2a_decision.py — prompt T2A/PMSI
   Ancien prompt autorisait « Critère non validé » en fallback creux.
   Nouveau prompt impose au moins une CITATION LITTÉRALE entre « ... »
   du DPI dans chaque preuve_critereN, qu'elle soutienne ou infirme le
   critère. Si non validé : factualisation explicite (« Aucune ... »,
   « Sortie à H+2 ») citée du dossier. Sortie = preuves cliniques
   traçables et professionnelles, pas du remplissage.

État DB : aucun changement net (bbox patchés puis revertés depuis backup
visual_anchors_backup_20260501 ; by_text re-aligné sur 25003284). Le
re-enregistrement du workflow Urgence en conditions bureau standard
(Chrome normal, taille fenêtre standard) est l'étape suivante côté Dom.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-02 00:32:57 +02:00
parent b584bbabc3
commit 35b27ae492
6 changed files with 206 additions and 84 deletions

View File

@@ -2762,8 +2762,29 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
Si la session de l'agent n'a pas d'actions en attente, cherche dans les
autres queues de la MÊME machine (pas cross-machine).
Acquire timeout : si une action serveur lente (extract_text OCR,
t2a_decision LLM) tient le lock, on retourne immédiatement
{action: None, server_busy: True} avant que le client ne timeout à 5s.
Sans cela, des actions seraient popped serveur puis envoyées sur des
sockets clients déjà fermées par timeout — perdues silencieusement.
L'acquire et les actions serveur lentes sont exécutés via
run_in_executor : sinon l'appel synchrone bloque l'event loop FastAPI
(single-threaded) et même les polls qui devraient recevoir server_busy
sont bloqués jusqu'à libération — ce qui annule l'effet du timeout.
"""
with _replay_lock:
import asyncio
loop = asyncio.get_event_loop()
acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, 4.5)
if not acquired:
return {
"action": None,
"session_id": session_id,
"machine_id": machine_id,
"server_busy": True,
}
try:
# Verifier si le replay est en pause supervisee (target_not_found).
# Dans ce cas, NE PAS envoyer d'action — attendre l'intervention utilisateur.
for state in _replay_states.values():
@@ -2828,6 +2849,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
break
if target_state:
queue = target_queue
owning_replay = target_state
_replay_queues[session_id] = target_queue
del _replay_queues[target_sid]
target_state["session_id"] = session_id
@@ -2844,6 +2866,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
other_queue = _replay_queues.get(other_sid, [])
if other_queue:
queue = other_queue
owning_replay = state
_replay_queues[session_id] = other_queue
del _replay_queues[other_sid]
state["session_id"] = session_id
@@ -2869,53 +2892,65 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
type_ = action.get("type")
# pause_for_human : bascule en paused_need_help, return action=None
if type_ == "pause_for_human" and owning_replay is not None:
params = action.get("parameters") or {}
message = params.get("message") or "Validation requise"
# pause_for_human : no-op en mode autonome — on saute et on continue
if type_ == "pause_for_human":
logger.info(
"pause_for_human ignorée (mode autonome) — replay %s continue",
owning_replay["replay_id"] if owning_replay else "?"
)
queue.pop(0)
_replay_queues[session_id] = queue
owning_replay["status"] = "paused_need_help"
owning_replay["pause_message"] = message
owning_replay["failed_action"] = {
"action_id": action.get("action_id", ""),
"type": "pause_for_human",
"reason": "user_request",
}
logger.info(
f"Replay {owning_replay['replay_id']} pause supervisée demandée "
f"par le workflow : {message[:80]}"
)
return {
"action": None,
"session_id": session_id,
"machine_id": machine_id,
"replay_paused": True,
"pause_message": message,
"replay_id": owning_replay["replay_id"],
}
continue
# Actions serveur : exécuter, pop, continuer
# Actions serveur : exécuter HORS event loop pour ne pas bloquer
# les autres polls (extract_text OCR ~5s, t2a_decision LLM ~8-13s).
# Le lock reste tenu (queue cohérente) mais l'event loop est libre,
# donc les polls concurrents peuvent recevoir {server_busy: True}.
if type_ in _SERVER_SIDE_ACTION_TYPES and owning_replay is not None:
try:
if type_ == "extract_text":
_handle_extract_text_action(
action, owning_replay, session_id, _last_heartbeat
await loop.run_in_executor(
None,
_handle_extract_text_action,
action, owning_replay, session_id, _last_heartbeat,
)
elif type_ == "t2a_decision":
_handle_t2a_decision_action(action, owning_replay)
await loop.run_in_executor(
None,
_handle_t2a_decision_action,
action, owning_replay,
)
except Exception as e:
logger.warning(f"Action serveur {type_} a levé : {e}")
queue.pop(0)
_replay_queues[session_id] = queue
continue # action suivante
# Clic conditionnel : si l'action a un paramètre "condition", évaluer la variable
# Format : "dec.critere1_valide" → runtime_vars["dec"]["critere1_valide"]
condition_key = (action.get("parameters") or {}).get("condition")
if condition_key and owning_replay is not None:
runtime_vars = owning_replay.get("variables") or {}
parts = condition_key.split(".", 1)
if len(parts) == 2:
val = (runtime_vars.get(parts[0]) or {}).get(parts[1])
else:
val = runtime_vars.get(parts[0])
if not val:
logger.info("Clic conditionnel ignoré (%s=%s) — action %s",
condition_key, val, action.get("action_id", "?"))
queue.pop(0)
_replay_queues[session_id] = queue
continue
# Action visuelle : sortir de la boucle pour la transmettre à l'Agent V1
break
# Si la queue s'est vidée après les exécutions serveur, rien à transmettre
if not queue or action is None:
return {"action": None, "session_id": session_id, "machine_id": machine_id}
finally:
_replay_lock.release()
# ---- Pre-check écran (optionnel, non bloquant) ----
# Ne s'applique qu'aux actions qui ont un from_node (actions de workflow,
@@ -3943,7 +3978,9 @@ async def resume_replay(replay_id: str):
state["pause_message"] = None
# Reinjecter l'action echouee en tete de queue (sera re-tentee)
if failed_action and failed_action.get("action_id"):
# pause_for_human est une pause intentionnelle, pas une erreur — ne pas réinjecter
if (failed_action and failed_action.get("action_id")
and failed_action.get("reason") != "user_request"):
# Reconstruire l'action a partir du retry_pending ou de l'original
original_action_id = failed_action["action_id"]
# Chercher l'action originale dans les retry_pending
@@ -3984,6 +4021,26 @@ async def resume_replay(replay_id: str):
}
@app.post("/api/v1/traces/stream/replay/{replay_id}/cancel")
async def cancel_replay(replay_id: str):
"""Annuler un replay (quel que soit son statut) et vider sa queue."""
with _replay_lock:
state = _replay_states.get(replay_id)
if not state:
raise HTTPException(status_code=404, detail=f"Replay '{replay_id}' non trouvé")
session_id = state["session_id"]
state["status"] = "cancelled"
state["failed_action"] = None
state["pause_message"] = None
_replay_queues[session_id] = []
keys_to_del = [k for k, v in _retry_pending.items() if v.get("replay_id") == replay_id]
for k in keys_to_del:
_retry_pending.pop(k, None)
logger.info("Replay %s annulé manuellement", replay_id)
return {"status": "cancelled", "replay_id": replay_id, "session_id": session_id}
# =========================================================================
# Visual Replay — Résolution visuelle des cibles (module resolve_engine)
# =========================================================================

View File

@@ -2193,6 +2193,22 @@ def _validate_resolution_quality(
dx = abs(resolved_x - fallback_x_pct)
dy = abs(resolved_y - fallback_y_pct)
if dx > _RESOLUTION_MAX_DRIFT or dy > _RESOLUTION_MAX_DRIFT:
# Exception : si le template matching trouve l'image avec une
# similarité quasi parfaite, on fait confiance à la position
# visuelle peu importe le drift. Une image retrouvée à >= 0.95
# de score est SUR l'écran à l'endroit indiqué — le drift par
# rapport à l'enregistrement ne reflète qu'un changement de
# layout (scroll, redimensionnement, F11, devtools), pas une
# erreur de résolution.
_HIGH_CONFIDENCE = 0.95
if score >= _HIGH_CONFIDENCE and method.startswith("template_matching"):
logger.info(
"[REPLAY] Drift (%.3f, %.3f) > %.2f IGNORÉ : score=%.3f >= %.2f "
"sur %s — résultat visuel fiable, on l'utilise",
dx, dy, _RESOLUTION_MAX_DRIFT, score, _HIGH_CONFIDENCE, method,
)
return result
logger.warning(
"[REPLAY] Drift trop grand (%.3f, %.3f) > %.2f — fallback coords enregistrées (%.3f, %.3f)",
dx, dy, _RESOLUTION_MAX_DRIFT, fallback_x_pct, fallback_y_pct,

View File

@@ -32,13 +32,20 @@ PROMPT_TEMPLATE = """Tu es médecin DIM (Département d'Information Médicale),
Analyse le dossier patient ci-dessous pour déterminer si le passage relève :
- FORFAIT_URGENCE : passage simple, retour à domicile, sans surveillance prolongée ni soins continus
- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les critères PMSI/ATIH
- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les 3 critères PMSI/ATIH
LES 3 CRITÈRES UHCD (au moins 2 sur 3 validés ⇒ REQUALIFICATION) :
1. Pathologie potentiellement évolutive (instabilité hémodynamique, terrain à risque, traitement nécessitant adaptation)
2. Surveillance médicale et paramédicale prolongée (constantes itératives, observations IDE/médecin, durée > 6 h)
3. Examens complémentaires ou actes thérapeutiques (biologie, imagerie, sutures, gestes techniques)
INSTRUCTIONS STRICTES :
1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère.
2. Identifie d'abord les éléments en faveur d'une hospitalisation, puis ceux en faveur d'un forfait, puis tranche.
3. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier.
4. Module ta confiance honnêtement :
2. Pour CHAQUE critère (1, 2, 3), tu DOIS produire un texte de preuve qui contient AU MOINS UNE CITATION LITTÉRALE du dossier entre guillemets français « ... ». Exemple : « FC à 110 bpm, TA 92/60 ».
3. Si le critère est NON validé, ne renvoie JAMAIS un fallback creux : explique factuellement ce qui manque, en citant le dossier (ex: « Sortie à H+2 », « Aucun acte technique au compte-rendu »).
4. Le texte de chaque preuve fait 2-3 phrases : (i) la citation littérale, (ii) l'analyse PMSI, (iii) la conclusion validé/non validé.
5. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier.
6. Module ta confiance honnêtement :
- "elevee" uniquement si tous les indices convergent
- "moyenne" si éléments ambivalents
- "faible" si information manquante ou très atypique
@@ -46,17 +53,17 @@ INSTRUCTIONS STRICTES :
Réponds STRICTEMENT en JSON valide, sans texte avant ni après :
{{
"duree_passage_heures": <nombre>,
"elements_pour_hospitalisation": [<faits littéralement extraits du dossier>],
"elements_pour_forfait": [<faits littéralement extraits du dossier>],
"elements_pour_hospitalisation": [<phrases littéralement extraites du dossier>],
"elements_pour_forfait": [<phrases littéralement extraites du dossier>],
"decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION",
"decision_court": "UHCD" | "Forfait Urgences",
"preuve_critere1": "<texte factuel pour le critère 'Pathologie potentiellement évolutive' : motif, symptômes, terrain à risque, traitement initial — 2-3 phrases max. Écrire 'Critère non validé' si absent du dossier>",
"preuve_critere1": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (motif, symptôme, terrain à risque, traitement). Si non validé : factualise ce qui manque en citant le dossier.>",
"critere1_valide": true | false,
"preuve_critere2": "<texte factuel pour le critère 'Nécessité de surveillance médicale et paramédicale' : constantes relevées, durée de surveillance, observations IDE/médecin — 2-3 phrases max. Écrire 'Critère non validé' si absent>",
"preuve_critere2": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (constantes, observations IDE, durée surveillance). Si non validé : factualise.>",
"critere2_valide": true | false,
"preuve_critere3": "<texte factuel pour le critère 'Réalisation d'examens ou d'actes' : liste des actes diagnostiques et thérapeutiques réalisés — 2-3 phrases max. Écrire 'Critère non validé' si absent>",
"preuve_critere3": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (actes/examens : biologie, imagerie, suture, etc.). Si non validé : factualise.>",
"critere3_valide": true | false,
"justification": "<2-3 phrases s'appuyant explicitement sur les faits ci-dessus>",
"justification": "<2-3 phrases synthétiques s'appuyant explicitement sur les preuves ci-dessus, avec au moins une citation>",
"confiance": "elevee" | "moyenne" | "faible"
}}

View File

@@ -868,6 +868,60 @@ def _load_anchor_metadata(anchor_id: str) -> Optional[Dict]:
return None
def _inject_anchor_targeting(action: Dict, anchor_id: str) -> None:
"""Enrichit une action avec la cible visuelle (x_pct/y_pct + visual_mode/target_spec).
Mutation in-place de `action`. Utilisé pour click_anchor*, type_text et
type_secret — toute action qui doit cibler une zone visuelle précise avant
d'agir (clic ou frappe avec focus).
Sans cette injection, l'agent côté Windows ne peut pas faire le pre-click
de focus avant `_type_text`, et le texte tape dans le vide.
"""
if not anchor_id:
return
anchor_meta = _load_anchor_metadata(anchor_id)
# Coordonnées du centre du bbox (fallback si template matching échoue)
if anchor_meta:
bbox = anchor_meta.get('bounding_box', {})
orig = anchor_meta.get('original_size', {})
orig_w = orig.get('width', 1920)
orig_h = orig.get('height', 1080)
if bbox.get('x') is not None and orig_w > 0 and orig_h > 0:
cx = (bbox['x'] + bbox.get('width', 0) / 2) / orig_w
cy = (bbox['y'] + bbox.get('height', 0) / 2) / orig_h
action['x_pct'] = round(cx, 4)
action['y_pct'] = round(cy, 4)
# Image de l'ancre pour template matching côté agent
anchor_b64 = _load_anchor_image_b64(anchor_id)
if anchor_b64:
target_spec = {
'anchor_image_base64': anchor_b64,
'anchor_id': anchor_id,
}
if anchor_meta:
target_spec['anchor_bbox'] = anchor_meta.get('bounding_box', {})
target_spec['original_size'] = anchor_meta.get('original_size', {})
action['visual_mode'] = True
action['target_spec'] = target_spec
logger.info(
"Action %s : ancre '%s' chargée (%d Ko), visual_mode activé",
action.get('action_id', '?'),
anchor_id,
len(anchor_b64) // 1024,
)
else:
logger.warning(
"Action %s : ancre '%s' introuvable, fallback blind mode",
action.get('action_id', '?'),
anchor_id,
)
@api_v3_bp.route('/execute-windows', methods=['POST'])
def execute_windows():
"""Proxy les actions du workflow vers le streaming server pour exécution sur Windows.
@@ -932,45 +986,7 @@ def execute_windows():
if vwb_type in _ANCHOR_CLICK_TYPES:
anchor_id = action.get('anchor_id')
if anchor_id:
anchor_meta = _load_anchor_metadata(anchor_id)
# Calculer les coordonnées du centre du bbox (fallback si visual échoue)
if anchor_meta:
bbox = anchor_meta.get('bounding_box', {})
orig = anchor_meta.get('original_size', {})
orig_w = orig.get('width', 1920)
orig_h = orig.get('height', 1080)
if bbox.get('x') is not None and orig_w > 0 and orig_h > 0:
cx = (bbox['x'] + bbox.get('width', 0) / 2) / orig_w
cy = (bbox['y'] + bbox.get('height', 0) / 2) / orig_h
action['x_pct'] = round(cx, 4)
action['y_pct'] = round(cy, 4)
# Tenter aussi le visual_mode (template matching)
anchor_b64 = _load_anchor_image_b64(anchor_id)
if anchor_b64:
target_spec = {
'anchor_image_base64': anchor_b64,
'anchor_id': anchor_id,
}
if anchor_meta:
target_spec['anchor_bbox'] = anchor_meta.get('bounding_box', {})
target_spec['original_size'] = anchor_meta.get('original_size', {})
action['visual_mode'] = True
action['target_spec'] = target_spec
logger.info(
"Action %s : ancre '%s' chargée (%d Ko), visual_mode activé",
action.get('action_id', '?'),
anchor_id,
len(anchor_b64) // 1024,
)
else:
logger.warning(
"Action %s : ancre '%s' introuvable, fallback blind mode",
action.get('action_id', '?'),
anchor_id,
)
_inject_anchor_targeting(action, anchor_id)
# Propagation du by_text (ciblage textuel prioritaire sur template)
_by_text = params.get('by_text', '')
@@ -986,13 +1002,18 @@ def execute_windows():
action['button'] = 'right'
# ---------------------------------------------------------------
# type_text / type_secret → extraire le texte
# type_text / type_secret → extraire le texte + cibler la zone
# de saisie si une ancre visuelle est associée au step.
# Sans ancre, l'agent tape là où le focus se trouve déjà
# (compatibilité avec les workflows historiques sans anchor).
# ---------------------------------------------------------------
if vwb_type in ('type_text', 'type_secret') and 'text' in params:
action['text'] = params['text']
# Ne pas forcer un clic préalable à (0,0) si pas de coordonnées
# L'exécuteur ne cliquera que si x_pct > 0 et y_pct > 0
# (le clic de positionnement est fait par l'action click_anchor précédente)
anchor_id = action.get('anchor_id') or (
params.get('visual_anchor') or {}
).get('anchor_id')
if anchor_id:
_inject_anchor_targeting(action, anchor_id)
# ---------------------------------------------------------------
# keyboard_shortcut / hotkey → extraire les touches

View File

@@ -40,6 +40,17 @@ if _ROOT not in sys.path:
STREAMING_SERVER_URL = "http://localhost:5005"
def _stream_headers() -> Dict[str, str]:
"""Bearer token pour les appels proxy VWB → streaming server.
Retourne un dict vide si RPA_API_TOKEN n'est pas défini ; dans ce cas
les appels échoueront en 401 (auth obligatoire côté streaming).
"""
import os as _os
token = _os.environ.get("RPA_API_TOKEN", "")
return {"Authorization": f"Bearer {token}"} if token else {}
# ---------------------------------------------------------------------------
# Helpers — nom par défaut à l'import
# ---------------------------------------------------------------------------
@@ -162,6 +173,7 @@ def list_learned_workflows():
resp = http_requests.get(
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflows",
params=params,
headers=_stream_headers(),
timeout=3,
)
if resp.ok:
@@ -526,6 +538,7 @@ def _load_core_workflow(
resp = http_requests.get(
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflows",
params=params,
headers=_stream_headers(),
timeout=3,
)
if resp.ok:
@@ -538,6 +551,7 @@ def _load_core_workflow(
try:
detail_resp = http_requests.get(
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflow/{workflow_id}",
headers=_stream_headers(),
timeout=5,
)
if detail_resp.ok:
@@ -573,6 +587,7 @@ def _notify_streaming_reload():
try:
http_requests.post(
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/reload-workflows",
headers=_stream_headers(),
timeout=2,
)
logger.debug("Streaming server notifié pour rechargement des workflows")

View File

@@ -13,11 +13,17 @@ from flask_caching import Cache
from flask_migrate import Migrate
import os
import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Charger .env.local depuis la racine du projet AVANT tout : il contient
# RPA_API_TOKEN utilisé pour le proxy VWB → streaming server. Sans cela,
# le token est absent après chaque restart manuel du backend et tous les
# appels proxy renvoient 401 « Token API invalide ».
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
load_dotenv(_PROJECT_ROOT / '.env.local')
load_dotenv() # fallback .env dans cwd (n'écrase pas les vars déjà définies)
# Initialize Flask app
app = Flask(__name__)