"""Tests des actions extract_text et t2a_decision (C+.5/.6). Couvre : - _resolve_runtime_vars : templating {{var}} / {{var.field}} - _handle_extract_text_action : OCR mocké, stockage variable - _handle_t2a_decision_action : analyze_dpi mocké, stockage JSON - _edge_to_normalized_actions pour les 2 types - Bridge VWB → core (mapping + paramètres) """ from unittest.mock import patch import pytest from agent_v0.server_v1.replay_engine import ( _ALLOWED_ACTION_TYPES, _SERVER_SIDE_ACTION_TYPES, _resolve_runtime_vars, _handle_extract_text_action, _handle_t2a_decision_action, _handle_concat_text_vars_action, _edge_to_normalized_actions, _create_replay_state, SCROLL_PAUSE_MS, ) from visual_workflow_builder.backend.services.learned_workflow_bridge import ( VWB_ACTION_TO_CORE, convert_vwb_to_core_workflow, _vwb_params_to_core, ) # ---------------------------------------------------------------------- # Templating runtime # ---------------------------------------------------------------------- def test_resolve_simple_var(): r = _resolve_runtime_vars("Patient {{ipp}}", {"ipp": "25003284"}) assert r == "Patient 25003284" def test_resolve_field_access(): r = _resolve_runtime_vars( "{{result.decision}} car {{result.justification}}", {"result": {"decision": "UHCD", "justification": "asthme + insuf coro"}}, ) assert "UHCD car asthme + insuf coro" == r def test_resolve_missing_var_kept_intact(): r = _resolve_runtime_vars("Hello {{absent}} world", {"x": "y"}) assert r == "Hello {{absent}} world" def test_resolve_missing_field_kept_intact(): r = _resolve_runtime_vars("{{var.absent}}", {"var": {"present": "x"}}) assert r == "{{var.absent}}" def test_resolve_in_dict_recursive(): r = _resolve_runtime_vars( {"msg": "IPP {{ipp}}", "nested": {"k": "{{ipp}}"}, "list": ["{{age}}"]}, {"ipp": "X", "age": 77}, ) assert r == {"msg": "IPP X", "nested": {"k": "X"}, "list": ["77"]} def test_resolve_empty_vars_noop(): val = {"k": "{{var}}"} assert _resolve_runtime_vars(val, {}) == val assert _resolve_runtime_vars(val, None) == val def test_resolve_non_string_passthrough(): assert _resolve_runtime_vars(42, {"x": "y"}) == 42 assert _resolve_runtime_vars(None, {"x": "y"}) is None def test_resolve_handles_whitespace_in_braces(): r = _resolve_runtime_vars("{{ ipp }}", {"ipp": "X"}) assert r == "X" # ---------------------------------------------------------------------- # Action types & types serveur # ---------------------------------------------------------------------- def test_extract_text_in_allowed(): assert "extract_text" in _ALLOWED_ACTION_TYPES def test_t2a_decision_in_allowed(): assert "t2a_decision" in _ALLOWED_ACTION_TYPES def test_server_side_types(): # Set extensible : on vérifie au minimum les 2 types historiques. # extract_text_scroll ajoute `_concat_text_vars` (action serveur interne). assert {"extract_text", "t2a_decision"}.issubset(_SERVER_SIDE_ACTION_TYPES) # ---------------------------------------------------------------------- # Handler extract_text # ---------------------------------------------------------------------- def test_handle_extract_text_stores_variable(): state = _create_replay_state("rep1", "wf", "sess", 3) last_hb = {"sess": {"path": "/fake/heartbeat.png", "timestamp": 0}} action = { "type": "extract_text", "parameters": {"output_var": "texte_motif", "paragraph": True}, } with patch( "core.llm.extract_text_from_image", return_value="Patient asthme peakflow 260", ): ok = _handle_extract_text_action(action, state, "sess", last_hb) assert ok is True assert state["variables"]["texte_motif"] == "Patient asthme peakflow 260" def test_handle_extract_text_no_heartbeat_stores_empty(): state = _create_replay_state("rep1", "wf", "sess", 3) last_hb = {} # pas de heartbeat action = {"type": "extract_text", "parameters": {"output_var": "v"}} ok = _handle_extract_text_action(action, state, "sess", last_hb) assert ok is False assert state["variables"]["v"] == "" def test_handle_extract_text_default_var_name(): state = _create_replay_state("rep1", "wf", "sess", 3) last_hb = {"sess": {"path": "/x.png", "timestamp": 0}} action = {"type": "extract_text", "parameters": {}} with patch("core.llm.extract_text_from_image", return_value="abc"): _handle_extract_text_action(action, state, "sess", last_hb) assert "extracted_text" in state["variables"] # ---------------------------------------------------------------------- # Handler t2a_decision # ---------------------------------------------------------------------- def test_handle_t2a_decision_stores_json(): state = _create_replay_state("rep1", "wf", "sess", 3) action = { "type": "t2a_decision", "parameters": { "input_template": "Patient 78 ans, asthme, peakflow 260", "output_var": "decision_t2a", "model": "qwen2.5:7b", }, } fake_result = { "decision": "REQUALIFICATION_HOSPITALISATION", "justification": "Surveillance continue requise", "confiance": "elevee", "_elapsed_s": 4.2, } with patch("core.llm.analyze_dpi", return_value=fake_result): ok = _handle_t2a_decision_action(action, state) assert ok is True assert state["variables"]["decision_t2a"]["decision"] == "REQUALIFICATION_HOSPITALISATION" def test_handle_t2a_decision_empty_input_returns_indetermine(): state = _create_replay_state("rep1", "wf", "sess", 3) action = {"type": "t2a_decision", "parameters": {"input_template": "", "output_var": "r"}} ok = _handle_t2a_decision_action(action, state) assert ok is False assert state["variables"]["r"]["decision"] == "INDETERMINE" def test_handle_t2a_decision_analyze_exception(): state = _create_replay_state("rep1", "wf", "sess", 3) action = {"type": "t2a_decision", "parameters": {"input_template": "x", "output_var": "r"}} with patch("core.llm.analyze_dpi", side_effect=RuntimeError("ollama down")): ok = _handle_t2a_decision_action(action, state) assert ok is False assert state["variables"]["r"]["decision"] == "INDETERMINE" assert "ollama down" in state["variables"]["r"]["_error"] # ---------------------------------------------------------------------- # Edge → action normalisée # ---------------------------------------------------------------------- class _FakeAction: def __init__(self, type_, parameters=None): self.type = type_ self.target = None self.parameters = parameters or {} class _FakeEdge: def __init__(self, action, edge_id="e1", from_node="n1", to_node="n2"): self.edge_id = edge_id self.from_node = from_node self.to_node = to_node self.action = action def test_edge_to_action_extract_text(): edge = _FakeEdge(_FakeAction( "extract_text", parameters={"output_var": "texte_examens", "paragraph": True}, )) actions = _edge_to_normalized_actions(edge, params={}) assert len(actions) == 1 a = actions[0] assert a["type"] == "extract_text" assert a["parameters"]["output_var"] == "texte_examens" assert a["parameters"]["paragraph"] is True def test_edge_to_action_t2a_decision(): edge = _FakeEdge(_FakeAction( "t2a_decision", parameters={ "input_template": "{{texte_motif}}", "output_var": "result", "model": "qwen2.5:7b", }, )) actions = _edge_to_normalized_actions(edge, params={}) a = actions[0] assert a["type"] == "t2a_decision" assert a["parameters"]["input_template"] == "{{texte_motif}}" assert a["parameters"]["output_var"] == "result" assert a["parameters"]["model"] == "qwen2.5:7b" # ---------------------------------------------------------------------- # Bridge VWB → core # ---------------------------------------------------------------------- def test_vwb_extract_text_passthrough(): assert VWB_ACTION_TO_CORE["extract_text"] == "extract_text" def test_vwb_t2a_decision_passthrough(): assert VWB_ACTION_TO_CORE["t2a_decision"] == "t2a_decision" def test_vwb_params_extract_text_preserves_output_var(): p = _vwb_params_to_core("extract_text", {"output_var": "v", "paragraph": False}) assert p == {"output_var": "v", "paragraph": False} def test_vwb_params_extract_text_legacy_variable_name(): """Compat avec l'ancien paramètre variable_name côté VWB.""" p = _vwb_params_to_core("extract_text", {"variable_name": "v_legacy"}) assert p["output_var"] == "v_legacy" def test_vwb_params_t2a_decision_preserves_all(): p = _vwb_params_to_core("t2a_decision", { "input_template": "DPI {{ipp}}", "output_var": "dec", "model": "qwen2.5:7b", }) assert p == {"input_template": "DPI {{ipp}}", "output_var": "dec", "model": "qwen2.5:7b"} def test_export_workflow_with_t2a_chain(): """Workflow VWB extract_text → t2a_decision → pause_for_human export propre.""" workflow_data = {"id": "wf_t2a", "name": "Demo T2A"} steps_data = [ {"id": "s1", "action_type": "click_anchor", "parameters": {"target_text": "25003284"}, "label": "Clic IPP"}, {"id": "s2", "action_type": "extract_text", "parameters": {"output_var": "dpi"}, "label": "OCR"}, {"id": "s3", "action_type": "t2a_decision", "parameters": { "input_template": "{{dpi}}", "output_var": "dec", "model": "qwen2.5:7b", }, "label": "Analyse"}, {"id": "s4", "action_type": "pause_for_human", "parameters": { "message": "Décision : {{dec.decision}} — {{dec.justification}}", }, "label": "Validation"}, {"id": "s5", "action_type": "click_anchor", "parameters": {"target_text": "Enregistrer"}, "label": "Clic Enregistrer"}, ] core = convert_vwb_to_core_workflow(workflow_data, steps_data) edge_types = [e["action"]["type"] for e in core["edges"]] assert "extract_text" in edge_types assert "t2a_decision" in edge_types assert "pause_for_human" in edge_types # Vérifier que le templating est bien transporté t2a_edge = next(e for e in core["edges"] if e["action"]["type"] == "t2a_decision") assert t2a_edge["action"]["parameters"]["input_template"] == "{{dpi}}" # ---------------------------------------------------------------------- # extract_text_scroll — expansion + handler concat # ---------------------------------------------------------------------- def test_extract_text_scroll_in_allowed(): assert "extract_text_scroll" in _ALLOWED_ACTION_TYPES assert "_concat_text_vars" in _ALLOWED_ACTION_TYPES def test_extract_text_scroll_concat_is_server_side(): assert "_concat_text_vars" in _SERVER_SIDE_ACTION_TYPES def test_edge_to_action_extract_text_scroll_expands_to_six_steps(): edge = _FakeEdge(_FakeAction( "extract_text_scroll", parameters={"variable_name": "t_full", "paragraph": True}, )) actions = _edge_to_normalized_actions(edge, params={}) # 6 actions : OCR(top), Ctrl+End, wait, OCR(bottom), concat, Ctrl+Home assert len(actions) == 6 types = [a["type"] for a in actions] assert types == [ "extract_text", "key_combo", "wait", "extract_text", "_concat_text_vars", "key_combo", ] # Sub-actions OCR utilisent des vars internes différentes assert actions[0]["parameters"]["output_var"] == "__t_full_top" assert actions[3]["parameters"]["output_var"] == "__t_full_bottom" # Ctrl+End / Ctrl+Home corrects assert actions[1]["keys"] == ["ctrl", "end"] assert actions[5]["keys"] == ["ctrl", "home"] # Wait = SCROLL_PAUSE_MS assert actions[2]["duration_ms"] == SCROLL_PAUSE_MS # Concat lit les bons noms et écrit dans la var finale concat_params = actions[4]["parameters"] assert concat_params["top_var"] == "__t_full_top" assert concat_params["bottom_var"] == "__t_full_bottom" assert concat_params["output_var"] == "t_full" assert concat_params["separator"] == "\n\n" # Tous les action_id sont uniques et toutes les actions héritent de l'edge action_ids = {a["action_id"] for a in actions} assert len(action_ids) == 6 for a in actions: assert a["edge_id"] == "e1" assert a["from_node"] == "n1" def test_edge_to_action_extract_text_scroll_default_var_name(): edge = _FakeEdge(_FakeAction("extract_text_scroll", parameters={})) actions = _edge_to_normalized_actions(edge, params={}) # Default = extracted_text, donc internal vars = __extracted_text_top/bottom assert actions[0]["parameters"]["output_var"] == "__extracted_text_top" assert actions[4]["parameters"]["output_var"] == "extracted_text" def test_edge_to_action_extract_text_scroll_accepts_output_var_legacy(): """Compat : `output_var` accepté en plus de `variable_name`.""" edge = _FakeEdge(_FakeAction( "extract_text_scroll", parameters={"output_var": "legacy_var"}, )) actions = _edge_to_normalized_actions(edge, params={}) assert actions[4]["parameters"]["output_var"] == "legacy_var" def test_handle_concat_text_vars_merges_top_and_bottom(): state = _create_replay_state("rep1", "wf", "sess", 3) state["variables"] = { "__t_full_top": "Lignes du haut", "__t_full_bottom": "Lignes du bas", "other": "intact", } action = { "type": "_concat_text_vars", "parameters": { "top_var": "__t_full_top", "bottom_var": "__t_full_bottom", "output_var": "t_full", "separator": "\n\n", }, } ok = _handle_concat_text_vars_action(action, state) assert ok is True assert state["variables"]["t_full"] == "Lignes du haut\n\nLignes du bas" # Variables internes nettoyées assert "__t_full_top" not in state["variables"] assert "__t_full_bottom" not in state["variables"] # Autres variables préservées assert state["variables"]["other"] == "intact" def test_handle_concat_text_vars_handles_empty_top(): state = _create_replay_state("rep1", "wf", "sess", 3) state["variables"] = {"__a_top": "", "__a_bottom": "Bas seul"} action = { "type": "_concat_text_vars", "parameters": { "top_var": "__a_top", "bottom_var": "__a_bottom", "output_var": "a", "separator": "\n\n", }, } ok = _handle_concat_text_vars_action(action, state) assert ok is True # Pas de séparateur en début/fin si une var est vide assert state["variables"]["a"] == "Bas seul" def test_handle_concat_text_vars_handles_both_empty(): state = _create_replay_state("rep1", "wf", "sess", 3) state["variables"] = {"__a_top": "", "__a_bottom": ""} action = { "type": "_concat_text_vars", "parameters": { "top_var": "__a_top", "bottom_var": "__a_bottom", "output_var": "a", }, } ok = _handle_concat_text_vars_action(action, state) assert ok is False # rien d'utile produit assert state["variables"]["a"] == "" def test_handle_concat_text_vars_preserves_user_named_vars(): """Si top/bottom ne commencent pas par __ on ne les supprime pas.""" state = _create_replay_state("rep1", "wf", "sess", 3) state["variables"] = {"user_top": "haut", "user_bottom": "bas"} action = { "type": "_concat_text_vars", "parameters": { "top_var": "user_top", "bottom_var": "user_bottom", "output_var": "merged", }, } _handle_concat_text_vars_action(action, state) assert state["variables"]["user_top"] == "haut" assert state["variables"]["user_bottom"] == "bas" assert state["variables"]["merged"] == "haut\n\nbas" def test_vwb_extract_text_scroll_passthrough(): assert VWB_ACTION_TO_CORE["extract_text_scroll"] == "extract_text_scroll" def test_vwb_params_extract_text_scroll_preserves_variable_name(): p = _vwb_params_to_core("extract_text_scroll", {"variable_name": "t_full"}) assert p == {"variable_name": "t_full"} def test_vwb_params_extract_text_scroll_legacy_output_var(): p = _vwb_params_to_core("extract_text_scroll", {"output_var": "legacy"}) assert p["variable_name"] == "legacy"