diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 017461fd1..bedc17528 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -2762,8 +2762,29 @@ async def get_next_action(session_id: str, machine_id: str = "default"): Si la session de l'agent n'a pas d'actions en attente, cherche dans les autres queues de la MÊME machine (pas cross-machine). + + Acquire timeout : si une action serveur lente (extract_text OCR, + t2a_decision LLM) tient le lock, on retourne immédiatement + {action: None, server_busy: True} avant que le client ne timeout à 5s. + Sans cela, des actions seraient popped serveur puis envoyées sur des + sockets clients déjà fermées par timeout — perdues silencieusement. + + L'acquire et les actions serveur lentes sont exécutés via + run_in_executor : sinon l'appel synchrone bloque l'event loop FastAPI + (single-threaded) et même les polls qui devraient recevoir server_busy + sont bloqués jusqu'à libération — ce qui annule l'effet du timeout. """ - with _replay_lock: + import asyncio + loop = asyncio.get_event_loop() + acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, 4.5) + if not acquired: + return { + "action": None, + "session_id": session_id, + "machine_id": machine_id, + "server_busy": True, + } + try: # Verifier si le replay est en pause supervisee (target_not_found). # Dans ce cas, NE PAS envoyer d'action — attendre l'intervention utilisateur. for state in _replay_states.values(): @@ -2828,6 +2849,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): break if target_state: queue = target_queue + owning_replay = target_state _replay_queues[session_id] = target_queue del _replay_queues[target_sid] target_state["session_id"] = session_id @@ -2844,6 +2866,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): other_queue = _replay_queues.get(other_sid, []) if other_queue: queue = other_queue + owning_replay = state _replay_queues[session_id] = other_queue del _replay_queues[other_sid] state["session_id"] = session_id @@ -2869,53 +2892,65 @@ async def get_next_action(session_id: str, machine_id: str = "default"): type_ = action.get("type") - # pause_for_human : bascule en paused_need_help, return action=None - if type_ == "pause_for_human" and owning_replay is not None: - params = action.get("parameters") or {} - message = params.get("message") or "Validation requise" + # pause_for_human : no-op en mode autonome — on saute et on continue + if type_ == "pause_for_human": + logger.info( + "pause_for_human ignorée (mode autonome) — replay %s continue", + owning_replay["replay_id"] if owning_replay else "?" + ) queue.pop(0) _replay_queues[session_id] = queue - owning_replay["status"] = "paused_need_help" - owning_replay["pause_message"] = message - owning_replay["failed_action"] = { - "action_id": action.get("action_id", ""), - "type": "pause_for_human", - "reason": "user_request", - } - logger.info( - f"Replay {owning_replay['replay_id']} pause supervisée demandée " - f"par le workflow : {message[:80]}" - ) - return { - "action": None, - "session_id": session_id, - "machine_id": machine_id, - "replay_paused": True, - "pause_message": message, - "replay_id": owning_replay["replay_id"], - } + continue - # Actions serveur : exécuter, pop, continuer + # Actions serveur : exécuter HORS event loop pour ne pas bloquer + # les autres polls (extract_text OCR ~5s, t2a_decision LLM ~8-13s). + # Le lock reste tenu (queue cohérente) mais l'event loop est libre, + # donc les polls concurrents peuvent recevoir {server_busy: True}. if type_ in _SERVER_SIDE_ACTION_TYPES and owning_replay is not None: try: if type_ == "extract_text": - _handle_extract_text_action( - action, owning_replay, session_id, _last_heartbeat + await loop.run_in_executor( + None, + _handle_extract_text_action, + action, owning_replay, session_id, _last_heartbeat, ) elif type_ == "t2a_decision": - _handle_t2a_decision_action(action, owning_replay) + await loop.run_in_executor( + None, + _handle_t2a_decision_action, + action, owning_replay, + ) except Exception as e: logger.warning(f"Action serveur {type_} a levé : {e}") queue.pop(0) _replay_queues[session_id] = queue continue # action suivante + # Clic conditionnel : si l'action a un paramètre "condition", évaluer la variable + # Format : "dec.critere1_valide" → runtime_vars["dec"]["critere1_valide"] + condition_key = (action.get("parameters") or {}).get("condition") + if condition_key and owning_replay is not None: + runtime_vars = owning_replay.get("variables") or {} + parts = condition_key.split(".", 1) + if len(parts) == 2: + val = (runtime_vars.get(parts[0]) or {}).get(parts[1]) + else: + val = runtime_vars.get(parts[0]) + if not val: + logger.info("Clic conditionnel ignoré (%s=%s) — action %s", + condition_key, val, action.get("action_id", "?")) + queue.pop(0) + _replay_queues[session_id] = queue + continue + # Action visuelle : sortir de la boucle pour la transmettre à l'Agent V1 break # Si la queue s'est vidée après les exécutions serveur, rien à transmettre if not queue or action is None: return {"action": None, "session_id": session_id, "machine_id": machine_id} + finally: + _replay_lock.release() # ---- Pre-check écran (optionnel, non bloquant) ---- # Ne s'applique qu'aux actions qui ont un from_node (actions de workflow, @@ -3943,7 +3978,9 @@ async def resume_replay(replay_id: str): state["pause_message"] = None # Reinjecter l'action echouee en tete de queue (sera re-tentee) - if failed_action and failed_action.get("action_id"): + # pause_for_human est une pause intentionnelle, pas une erreur — ne pas réinjecter + if (failed_action and failed_action.get("action_id") + and failed_action.get("reason") != "user_request"): # Reconstruire l'action a partir du retry_pending ou de l'original original_action_id = failed_action["action_id"] # Chercher l'action originale dans les retry_pending @@ -3984,6 +4021,26 @@ async def resume_replay(replay_id: str): } +@app.post("/api/v1/traces/stream/replay/{replay_id}/cancel") +async def cancel_replay(replay_id: str): + """Annuler un replay (quel que soit son statut) et vider sa queue.""" + with _replay_lock: + state = _replay_states.get(replay_id) + if not state: + raise HTTPException(status_code=404, detail=f"Replay '{replay_id}' non trouvé") + session_id = state["session_id"] + state["status"] = "cancelled" + state["failed_action"] = None + state["pause_message"] = None + _replay_queues[session_id] = [] + keys_to_del = [k for k, v in _retry_pending.items() if v.get("replay_id") == replay_id] + for k in keys_to_del: + _retry_pending.pop(k, None) + + logger.info("Replay %s annulé manuellement", replay_id) + return {"status": "cancelled", "replay_id": replay_id, "session_id": session_id} + + # ========================================================================= # Visual Replay — Résolution visuelle des cibles (module resolve_engine) # ========================================================================= diff --git a/agent_v0/server_v1/resolve_engine.py b/agent_v0/server_v1/resolve_engine.py index 1712e2b83..ee2491cf7 100644 --- a/agent_v0/server_v1/resolve_engine.py +++ b/agent_v0/server_v1/resolve_engine.py @@ -2193,6 +2193,22 @@ def _validate_resolution_quality( dx = abs(resolved_x - fallback_x_pct) dy = abs(resolved_y - fallback_y_pct) if dx > _RESOLUTION_MAX_DRIFT or dy > _RESOLUTION_MAX_DRIFT: + # Exception : si le template matching trouve l'image avec une + # similarité quasi parfaite, on fait confiance à la position + # visuelle peu importe le drift. Une image retrouvée à >= 0.95 + # de score est SUR l'écran à l'endroit indiqué — le drift par + # rapport à l'enregistrement ne reflète qu'un changement de + # layout (scroll, redimensionnement, F11, devtools), pas une + # erreur de résolution. + _HIGH_CONFIDENCE = 0.95 + if score >= _HIGH_CONFIDENCE and method.startswith("template_matching"): + logger.info( + "[REPLAY] Drift (%.3f, %.3f) > %.2f IGNORÉ : score=%.3f >= %.2f " + "sur %s — résultat visuel fiable, on l'utilise", + dx, dy, _RESOLUTION_MAX_DRIFT, score, _HIGH_CONFIDENCE, method, + ) + return result + logger.warning( "[REPLAY] Drift trop grand (%.3f, %.3f) > %.2f — fallback coords enregistrées (%.3f, %.3f)", dx, dy, _RESOLUTION_MAX_DRIFT, fallback_x_pct, fallback_y_pct, diff --git a/core/llm/t2a_decision.py b/core/llm/t2a_decision.py index e3cf14e40..74b41e223 100644 --- a/core/llm/t2a_decision.py +++ b/core/llm/t2a_decision.py @@ -32,13 +32,20 @@ PROMPT_TEMPLATE = """Tu es médecin DIM (Département d'Information Médicale), Analyse le dossier patient ci-dessous pour déterminer si le passage relève : - FORFAIT_URGENCE : passage simple, retour à domicile, sans surveillance prolongée ni soins continus -- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les critères PMSI/ATIH +- REQUALIFICATION_HOSPITALISATION : séjour MCO requis selon les 3 critères PMSI/ATIH + +LES 3 CRITÈRES UHCD (au moins 2 sur 3 validés ⇒ REQUALIFICATION) : +1. Pathologie potentiellement évolutive (instabilité hémodynamique, terrain à risque, traitement nécessitant adaptation) +2. Surveillance médicale et paramédicale prolongée (constantes itératives, observations IDE/médecin, durée > 6 h) +3. Examens complémentaires ou actes thérapeutiques (biologie, imagerie, sutures, gestes techniques) INSTRUCTIONS STRICTES : 1. N'utilise QUE des éléments littéralement présents dans le dossier patient. N'invente AUCUN critère. -2. Identifie d'abord les éléments en faveur d'une hospitalisation, puis ceux en faveur d'un forfait, puis tranche. -3. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier. -4. Module ta confiance honnêtement : +2. Pour CHAQUE critère (1, 2, 3), tu DOIS produire un texte de preuve qui contient AU MOINS UNE CITATION LITTÉRALE du dossier entre guillemets français « ... ». Exemple : « FC à 110 bpm, TA 92/60 ». +3. Si le critère est NON validé, ne renvoie JAMAIS un fallback creux : explique factuellement ce qui manque, en citant le dossier (ex: « Sortie à H+2 », « Aucun acte technique au compte-rendu »). +4. Le texte de chaque preuve fait 2-3 phrases : (i) la citation littérale, (ii) l'analyse PMSI, (iii) la conclusion validé/non validé. +5. Calcule la durée totale du passage en heures (admission → sortie/transfert) à partir des horaires du dossier. +6. Module ta confiance honnêtement : - "elevee" uniquement si tous les indices convergent - "moyenne" si éléments ambivalents - "faible" si information manquante ou très atypique @@ -46,17 +53,17 @@ INSTRUCTIONS STRICTES : Réponds STRICTEMENT en JSON valide, sans texte avant ni après : {{ "duree_passage_heures": , - "elements_pour_hospitalisation": [], - "elements_pour_forfait": [], + "elements_pour_hospitalisation": [], + "elements_pour_forfait": [], "decision": "FORFAIT_URGENCE" | "REQUALIFICATION_HOSPITALISATION", "decision_court": "UHCD" | "Forfait Urgences", - "preuve_critere1": "", + "preuve_critere1": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (motif, symptôme, terrain à risque, traitement). Si non validé : factualise ce qui manque en citant le dossier.>", "critere1_valide": true | false, - "preuve_critere2": "", + "preuve_critere2": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (constantes, observations IDE, durée surveillance). Si non validé : factualise.>", "critere2_valide": true | false, - "preuve_critere3": "", + "preuve_critere3": "<2-3 phrases incluant AU MOINS UNE citation littérale entre « » (actes/examens : biologie, imagerie, suture, etc.). Si non validé : factualise.>", "critere3_valide": true | false, - "justification": "<2-3 phrases s'appuyant explicitement sur les faits ci-dessus>", + "justification": "<2-3 phrases synthétiques s'appuyant explicitement sur les preuves ci-dessus, avec au moins une citation>", "confiance": "elevee" | "moyenne" | "faible" }} diff --git a/visual_workflow_builder/backend/api_v3/dag_execute.py b/visual_workflow_builder/backend/api_v3/dag_execute.py index c25940e69..3feb44d9e 100644 --- a/visual_workflow_builder/backend/api_v3/dag_execute.py +++ b/visual_workflow_builder/backend/api_v3/dag_execute.py @@ -868,6 +868,60 @@ def _load_anchor_metadata(anchor_id: str) -> Optional[Dict]: return None +def _inject_anchor_targeting(action: Dict, anchor_id: str) -> None: + """Enrichit une action avec la cible visuelle (x_pct/y_pct + visual_mode/target_spec). + + Mutation in-place de `action`. Utilisé pour click_anchor*, type_text et + type_secret — toute action qui doit cibler une zone visuelle précise avant + d'agir (clic ou frappe avec focus). + + Sans cette injection, l'agent côté Windows ne peut pas faire le pre-click + de focus avant `_type_text`, et le texte tape dans le vide. + """ + if not anchor_id: + return + + anchor_meta = _load_anchor_metadata(anchor_id) + + # Coordonnées du centre du bbox (fallback si template matching échoue) + if anchor_meta: + bbox = anchor_meta.get('bounding_box', {}) + orig = anchor_meta.get('original_size', {}) + orig_w = orig.get('width', 1920) + orig_h = orig.get('height', 1080) + if bbox.get('x') is not None and orig_w > 0 and orig_h > 0: + cx = (bbox['x'] + bbox.get('width', 0) / 2) / orig_w + cy = (bbox['y'] + bbox.get('height', 0) / 2) / orig_h + action['x_pct'] = round(cx, 4) + action['y_pct'] = round(cy, 4) + + # Image de l'ancre pour template matching côté agent + anchor_b64 = _load_anchor_image_b64(anchor_id) + if anchor_b64: + target_spec = { + 'anchor_image_base64': anchor_b64, + 'anchor_id': anchor_id, + } + if anchor_meta: + target_spec['anchor_bbox'] = anchor_meta.get('bounding_box', {}) + target_spec['original_size'] = anchor_meta.get('original_size', {}) + + action['visual_mode'] = True + action['target_spec'] = target_spec + logger.info( + "Action %s : ancre '%s' chargée (%d Ko), visual_mode activé", + action.get('action_id', '?'), + anchor_id, + len(anchor_b64) // 1024, + ) + else: + logger.warning( + "Action %s : ancre '%s' introuvable, fallback blind mode", + action.get('action_id', '?'), + anchor_id, + ) + + @api_v3_bp.route('/execute-windows', methods=['POST']) def execute_windows(): """Proxy les actions du workflow vers le streaming server pour exécution sur Windows. @@ -932,45 +986,7 @@ def execute_windows(): if vwb_type in _ANCHOR_CLICK_TYPES: anchor_id = action.get('anchor_id') if anchor_id: - anchor_meta = _load_anchor_metadata(anchor_id) - - # Calculer les coordonnées du centre du bbox (fallback si visual échoue) - if anchor_meta: - bbox = anchor_meta.get('bounding_box', {}) - orig = anchor_meta.get('original_size', {}) - orig_w = orig.get('width', 1920) - orig_h = orig.get('height', 1080) - if bbox.get('x') is not None and orig_w > 0 and orig_h > 0: - cx = (bbox['x'] + bbox.get('width', 0) / 2) / orig_w - cy = (bbox['y'] + bbox.get('height', 0) / 2) / orig_h - action['x_pct'] = round(cx, 4) - action['y_pct'] = round(cy, 4) - - # Tenter aussi le visual_mode (template matching) - anchor_b64 = _load_anchor_image_b64(anchor_id) - if anchor_b64: - target_spec = { - 'anchor_image_base64': anchor_b64, - 'anchor_id': anchor_id, - } - if anchor_meta: - target_spec['anchor_bbox'] = anchor_meta.get('bounding_box', {}) - target_spec['original_size'] = anchor_meta.get('original_size', {}) - - action['visual_mode'] = True - action['target_spec'] = target_spec - logger.info( - "Action %s : ancre '%s' chargée (%d Ko), visual_mode activé", - action.get('action_id', '?'), - anchor_id, - len(anchor_b64) // 1024, - ) - else: - logger.warning( - "Action %s : ancre '%s' introuvable, fallback blind mode", - action.get('action_id', '?'), - anchor_id, - ) + _inject_anchor_targeting(action, anchor_id) # Propagation du by_text (ciblage textuel prioritaire sur template) _by_text = params.get('by_text', '') @@ -986,13 +1002,18 @@ def execute_windows(): action['button'] = 'right' # --------------------------------------------------------------- - # type_text / type_secret → extraire le texte + # type_text / type_secret → extraire le texte + cibler la zone + # de saisie si une ancre visuelle est associée au step. + # Sans ancre, l'agent tape là où le focus se trouve déjà + # (compatibilité avec les workflows historiques sans anchor). # --------------------------------------------------------------- if vwb_type in ('type_text', 'type_secret') and 'text' in params: action['text'] = params['text'] - # Ne pas forcer un clic préalable à (0,0) si pas de coordonnées - # L'exécuteur ne cliquera que si x_pct > 0 et y_pct > 0 - # (le clic de positionnement est fait par l'action click_anchor précédente) + anchor_id = action.get('anchor_id') or ( + params.get('visual_anchor') or {} + ).get('anchor_id') + if anchor_id: + _inject_anchor_targeting(action, anchor_id) # --------------------------------------------------------------- # keyboard_shortcut / hotkey → extraire les touches diff --git a/visual_workflow_builder/backend/api_v3/learned_workflows.py b/visual_workflow_builder/backend/api_v3/learned_workflows.py index 2738fab28..e9a63c359 100644 --- a/visual_workflow_builder/backend/api_v3/learned_workflows.py +++ b/visual_workflow_builder/backend/api_v3/learned_workflows.py @@ -40,6 +40,17 @@ if _ROOT not in sys.path: STREAMING_SERVER_URL = "http://localhost:5005" +def _stream_headers() -> Dict[str, str]: + """Bearer token pour les appels proxy VWB → streaming server. + + Retourne un dict vide si RPA_API_TOKEN n'est pas défini ; dans ce cas + les appels échoueront en 401 (auth obligatoire côté streaming). + """ + import os as _os + token = _os.environ.get("RPA_API_TOKEN", "") + return {"Authorization": f"Bearer {token}"} if token else {} + + # --------------------------------------------------------------------------- # Helpers — nom par défaut à l'import # --------------------------------------------------------------------------- @@ -162,6 +173,7 @@ def list_learned_workflows(): resp = http_requests.get( f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflows", params=params, + headers=_stream_headers(), timeout=3, ) if resp.ok: @@ -526,6 +538,7 @@ def _load_core_workflow( resp = http_requests.get( f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflows", params=params, + headers=_stream_headers(), timeout=3, ) if resp.ok: @@ -538,6 +551,7 @@ def _load_core_workflow( try: detail_resp = http_requests.get( f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflow/{workflow_id}", + headers=_stream_headers(), timeout=5, ) if detail_resp.ok: @@ -573,6 +587,7 @@ def _notify_streaming_reload(): try: http_requests.post( f"{STREAMING_SERVER_URL}/api/v1/traces/stream/reload-workflows", + headers=_stream_headers(), timeout=2, ) logger.debug("Streaming server notifié pour rechargement des workflows") diff --git a/visual_workflow_builder/backend/app.py b/visual_workflow_builder/backend/app.py index 8284af059..5e9235af1 100644 --- a/visual_workflow_builder/backend/app.py +++ b/visual_workflow_builder/backend/app.py @@ -13,11 +13,17 @@ from flask_caching import Cache from flask_migrate import Migrate import os import logging +from pathlib import Path from logging.handlers import RotatingFileHandler from dotenv import load_dotenv -# Load environment variables -load_dotenv() +# Charger .env.local depuis la racine du projet AVANT tout : il contient +# RPA_API_TOKEN utilisé pour le proxy VWB → streaming server. Sans cela, +# le token est absent après chaque restart manuel du backend et tous les +# appels proxy renvoient 401 « Token API invalide ». +_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent +load_dotenv(_PROJECT_ROOT / '.env.local') +load_dotenv() # fallback .env dans cwd (n'écrase pas les vars déjà définies) # Initialize Flask app app = Flask(__name__)