Pipeline streaming étendu pour supporter des actions exécutées entièrement
côté serveur (jamais transmises à l'Agent V1) qui produisent des variables
réutilisables dans les steps suivants via templating {{var}} ou {{var.field}}.
== Variables d'exécution ==
- replay_state["variables"] : Dict[str, Any] initialisé vide à la création
- _resolve_runtime_vars() : résout {{var}} et {{var.field}} récursivement
dans str/dict/list. Variables absentes laissées intactes.
- /replay/next applique la résolution sur l'action AVANT toute interception
ou envoi à l'Agent V1.
== Boucle d'exécution serveur ==
- _SERVER_SIDE_ACTION_TYPES = {"extract_text", "t2a_decision"}
- /replay/next pop+execute en boucle ces actions jusqu'à trouver une action
visuelle (à transmettre Agent V1) ou un pause_for_human (qui bloque).
- Latence acceptable : t2a_decision = 5-10s côté serveur, l'Agent V1 attend
la réponse HTTP.
== Action extract_text ==
- Handler côté serveur réutilisant le dernier heartbeat (max 5s d'âge)
- core/llm/ocr_extractor.py : EasyOCR fr+en singleton + extract_text_from_image
- Stockage dans replay_state["variables"][output_var]
- Robuste : pas de heartbeat → variable = "" + log warning, pipeline continue
== Action t2a_decision ==
- core/llm/t2a_decision.py : refactor de demo_app.py query_model en module
importable. Prompt expert DIM T2A/PMSI, qwen2.5:7b par défaut (100% bench).
- Handler côté serveur appelle analyze_dpi(input_template_resolved)
- Stockage du JSON décision dans replay_state["variables"][output_var]
- Erreurs (Ollama down, parse) → variable = INDETERMINE + _error, pipeline continue
== VWB UI ==
- types.ts : nouveau type 't2a_decision' (icône 🧠 catégorie logic)
- extract_text refondu : needsAnchor=false, paramètre output_var (au lieu de
variable_name legacy — bridge accepte les deux pour compat)
- Bridge VWB→core : passthrough des deux types + paramètres préservés
== Tests ==
- tests/integration/test_t2a_extract.py : 25 tests verts
- templating runtime (8 tests)
- handler extract_text (3 tests, OCR mocké)
- handler t2a_decision (3 tests, analyze_dpi mocké)
- edge → action normalisée (2 tests)
- bridge VWB → core (5 tests)
- workflow chain extract→t2a→pause→clic (1 test)
Total branche : 82/82 verts.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
283 lines
10 KiB
Python
283 lines
10 KiB
Python
"""Tests des actions extract_text et t2a_decision (C+.5/.6).
|
|
|
|
Couvre :
|
|
- _resolve_runtime_vars : templating {{var}} / {{var.field}}
|
|
- _handle_extract_text_action : OCR mocké, stockage variable
|
|
- _handle_t2a_decision_action : analyze_dpi mocké, stockage JSON
|
|
- _edge_to_normalized_actions pour les 2 types
|
|
- Bridge VWB → core (mapping + paramètres)
|
|
"""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from agent_v0.server_v1.replay_engine import (
|
|
_ALLOWED_ACTION_TYPES,
|
|
_SERVER_SIDE_ACTION_TYPES,
|
|
_resolve_runtime_vars,
|
|
_handle_extract_text_action,
|
|
_handle_t2a_decision_action,
|
|
_edge_to_normalized_actions,
|
|
_create_replay_state,
|
|
)
|
|
from visual_workflow_builder.backend.services.learned_workflow_bridge import (
|
|
VWB_ACTION_TO_CORE,
|
|
convert_vwb_to_core_workflow,
|
|
_vwb_params_to_core,
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Templating runtime
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_resolve_simple_var():
|
|
r = _resolve_runtime_vars("Patient {{ipp}}", {"ipp": "25003284"})
|
|
assert r == "Patient 25003284"
|
|
|
|
|
|
def test_resolve_field_access():
|
|
r = _resolve_runtime_vars(
|
|
"{{result.decision}} car {{result.justification}}",
|
|
{"result": {"decision": "UHCD", "justification": "asthme + insuf coro"}},
|
|
)
|
|
assert "UHCD car asthme + insuf coro" == r
|
|
|
|
|
|
def test_resolve_missing_var_kept_intact():
|
|
r = _resolve_runtime_vars("Hello {{absent}} world", {"x": "y"})
|
|
assert r == "Hello {{absent}} world"
|
|
|
|
|
|
def test_resolve_missing_field_kept_intact():
|
|
r = _resolve_runtime_vars("{{var.absent}}", {"var": {"present": "x"}})
|
|
assert r == "{{var.absent}}"
|
|
|
|
|
|
def test_resolve_in_dict_recursive():
|
|
r = _resolve_runtime_vars(
|
|
{"msg": "IPP {{ipp}}", "nested": {"k": "{{ipp}}"}, "list": ["{{age}}"]},
|
|
{"ipp": "X", "age": 77},
|
|
)
|
|
assert r == {"msg": "IPP X", "nested": {"k": "X"}, "list": ["77"]}
|
|
|
|
|
|
def test_resolve_empty_vars_noop():
|
|
val = {"k": "{{var}}"}
|
|
assert _resolve_runtime_vars(val, {}) == val
|
|
assert _resolve_runtime_vars(val, None) == val
|
|
|
|
|
|
def test_resolve_non_string_passthrough():
|
|
assert _resolve_runtime_vars(42, {"x": "y"}) == 42
|
|
assert _resolve_runtime_vars(None, {"x": "y"}) is None
|
|
|
|
|
|
def test_resolve_handles_whitespace_in_braces():
|
|
r = _resolve_runtime_vars("{{ ipp }}", {"ipp": "X"})
|
|
assert r == "X"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Action types & types serveur
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_extract_text_in_allowed():
|
|
assert "extract_text" in _ALLOWED_ACTION_TYPES
|
|
|
|
|
|
def test_t2a_decision_in_allowed():
|
|
assert "t2a_decision" in _ALLOWED_ACTION_TYPES
|
|
|
|
|
|
def test_server_side_types():
|
|
assert _SERVER_SIDE_ACTION_TYPES == {"extract_text", "t2a_decision"}
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Handler extract_text
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_handle_extract_text_stores_variable():
|
|
state = _create_replay_state("rep1", "wf", "sess", 3)
|
|
last_hb = {"sess": {"path": "/fake/heartbeat.png", "timestamp": 0}}
|
|
action = {
|
|
"type": "extract_text",
|
|
"parameters": {"output_var": "texte_motif", "paragraph": True},
|
|
}
|
|
with patch(
|
|
"core.llm.extract_text_from_image",
|
|
return_value="Patient asthme peakflow 260",
|
|
):
|
|
ok = _handle_extract_text_action(action, state, "sess", last_hb)
|
|
assert ok is True
|
|
assert state["variables"]["texte_motif"] == "Patient asthme peakflow 260"
|
|
|
|
|
|
def test_handle_extract_text_no_heartbeat_stores_empty():
|
|
state = _create_replay_state("rep1", "wf", "sess", 3)
|
|
last_hb = {} # pas de heartbeat
|
|
action = {"type": "extract_text", "parameters": {"output_var": "v"}}
|
|
ok = _handle_extract_text_action(action, state, "sess", last_hb)
|
|
assert ok is False
|
|
assert state["variables"]["v"] == ""
|
|
|
|
|
|
def test_handle_extract_text_default_var_name():
|
|
state = _create_replay_state("rep1", "wf", "sess", 3)
|
|
last_hb = {"sess": {"path": "/x.png", "timestamp": 0}}
|
|
action = {"type": "extract_text", "parameters": {}}
|
|
with patch("core.llm.extract_text_from_image", return_value="abc"):
|
|
_handle_extract_text_action(action, state, "sess", last_hb)
|
|
assert "extracted_text" in state["variables"]
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Handler t2a_decision
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_handle_t2a_decision_stores_json():
|
|
state = _create_replay_state("rep1", "wf", "sess", 3)
|
|
action = {
|
|
"type": "t2a_decision",
|
|
"parameters": {
|
|
"input_template": "Patient 78 ans, asthme, peakflow 260",
|
|
"output_var": "decision_t2a",
|
|
"model": "qwen2.5:7b",
|
|
},
|
|
}
|
|
fake_result = {
|
|
"decision": "REQUALIFICATION_HOSPITALISATION",
|
|
"justification": "Surveillance continue requise",
|
|
"confiance": "elevee",
|
|
"_elapsed_s": 4.2,
|
|
}
|
|
with patch("core.llm.analyze_dpi", return_value=fake_result):
|
|
ok = _handle_t2a_decision_action(action, state)
|
|
assert ok is True
|
|
assert state["variables"]["decision_t2a"]["decision"] == "REQUALIFICATION_HOSPITALISATION"
|
|
|
|
|
|
def test_handle_t2a_decision_empty_input_returns_indetermine():
|
|
state = _create_replay_state("rep1", "wf", "sess", 3)
|
|
action = {"type": "t2a_decision", "parameters": {"input_template": "", "output_var": "r"}}
|
|
ok = _handle_t2a_decision_action(action, state)
|
|
assert ok is False
|
|
assert state["variables"]["r"]["decision"] == "INDETERMINE"
|
|
|
|
|
|
def test_handle_t2a_decision_analyze_exception():
|
|
state = _create_replay_state("rep1", "wf", "sess", 3)
|
|
action = {"type": "t2a_decision", "parameters": {"input_template": "x", "output_var": "r"}}
|
|
with patch("core.llm.analyze_dpi", side_effect=RuntimeError("ollama down")):
|
|
ok = _handle_t2a_decision_action(action, state)
|
|
assert ok is False
|
|
assert state["variables"]["r"]["decision"] == "INDETERMINE"
|
|
assert "ollama down" in state["variables"]["r"]["_error"]
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Edge → action normalisée
|
|
# ----------------------------------------------------------------------
|
|
|
|
class _FakeAction:
|
|
def __init__(self, type_, parameters=None):
|
|
self.type = type_
|
|
self.target = None
|
|
self.parameters = parameters or {}
|
|
|
|
|
|
class _FakeEdge:
|
|
def __init__(self, action, edge_id="e1", from_node="n1", to_node="n2"):
|
|
self.edge_id = edge_id
|
|
self.from_node = from_node
|
|
self.to_node = to_node
|
|
self.action = action
|
|
|
|
|
|
def test_edge_to_action_extract_text():
|
|
edge = _FakeEdge(_FakeAction(
|
|
"extract_text",
|
|
parameters={"output_var": "texte_examens", "paragraph": True},
|
|
))
|
|
actions = _edge_to_normalized_actions(edge, params={})
|
|
assert len(actions) == 1
|
|
a = actions[0]
|
|
assert a["type"] == "extract_text"
|
|
assert a["parameters"]["output_var"] == "texte_examens"
|
|
assert a["parameters"]["paragraph"] is True
|
|
|
|
|
|
def test_edge_to_action_t2a_decision():
|
|
edge = _FakeEdge(_FakeAction(
|
|
"t2a_decision",
|
|
parameters={
|
|
"input_template": "{{texte_motif}}",
|
|
"output_var": "result",
|
|
"model": "qwen2.5:7b",
|
|
},
|
|
))
|
|
actions = _edge_to_normalized_actions(edge, params={})
|
|
a = actions[0]
|
|
assert a["type"] == "t2a_decision"
|
|
assert a["parameters"]["input_template"] == "{{texte_motif}}"
|
|
assert a["parameters"]["output_var"] == "result"
|
|
assert a["parameters"]["model"] == "qwen2.5:7b"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Bridge VWB → core
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_vwb_extract_text_passthrough():
|
|
assert VWB_ACTION_TO_CORE["extract_text"] == "extract_text"
|
|
|
|
|
|
def test_vwb_t2a_decision_passthrough():
|
|
assert VWB_ACTION_TO_CORE["t2a_decision"] == "t2a_decision"
|
|
|
|
|
|
def test_vwb_params_extract_text_preserves_output_var():
|
|
p = _vwb_params_to_core("extract_text", {"output_var": "v", "paragraph": False})
|
|
assert p == {"output_var": "v", "paragraph": False}
|
|
|
|
|
|
def test_vwb_params_extract_text_legacy_variable_name():
|
|
"""Compat avec l'ancien paramètre variable_name côté VWB."""
|
|
p = _vwb_params_to_core("extract_text", {"variable_name": "v_legacy"})
|
|
assert p["output_var"] == "v_legacy"
|
|
|
|
|
|
def test_vwb_params_t2a_decision_preserves_all():
|
|
p = _vwb_params_to_core("t2a_decision", {
|
|
"input_template": "DPI {{ipp}}",
|
|
"output_var": "dec",
|
|
"model": "qwen2.5:7b",
|
|
})
|
|
assert p == {"input_template": "DPI {{ipp}}", "output_var": "dec", "model": "qwen2.5:7b"}
|
|
|
|
|
|
def test_export_workflow_with_t2a_chain():
|
|
"""Workflow VWB extract_text → t2a_decision → pause_for_human export propre."""
|
|
workflow_data = {"id": "wf_t2a", "name": "Demo T2A"}
|
|
steps_data = [
|
|
{"id": "s1", "action_type": "click_anchor", "parameters": {"target_text": "25003284"}, "label": "Clic IPP"},
|
|
{"id": "s2", "action_type": "extract_text", "parameters": {"output_var": "dpi"}, "label": "OCR"},
|
|
{"id": "s3", "action_type": "t2a_decision", "parameters": {
|
|
"input_template": "{{dpi}}", "output_var": "dec", "model": "qwen2.5:7b",
|
|
}, "label": "Analyse"},
|
|
{"id": "s4", "action_type": "pause_for_human", "parameters": {
|
|
"message": "Décision : {{dec.decision}} — {{dec.justification}}",
|
|
}, "label": "Validation"},
|
|
{"id": "s5", "action_type": "click_anchor", "parameters": {"target_text": "Enregistrer"}, "label": "Clic Enregistrer"},
|
|
]
|
|
core = convert_vwb_to_core_workflow(workflow_data, steps_data)
|
|
edge_types = [e["action"]["type"] for e in core["edges"]]
|
|
assert "extract_text" in edge_types
|
|
assert "t2a_decision" in edge_types
|
|
assert "pause_for_human" in edge_types
|
|
# Vérifier que le templating est bien transporté
|
|
t2a_edge = next(e for e in core["edges"] if e["action"]["type"] == "t2a_decision")
|
|
assert t2a_edge["action"]["parameters"]["input_template"] == "{{dpi}}"
|