feat(agent): add learn action flow and grounding guards
This commit is contained in:
86
tests/unit/test_agent_chat_cors_lan.py
Normal file
86
tests/unit/test_agent_chat_cors_lan.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Tests de non-régression pour le fix CORS engineio sur le service
|
||||
rpa-agent-chat (port 5004).
|
||||
|
||||
Avant fix : les origines `http://192.168.1.40:5004` (self loopback) et
|
||||
`http://192.168.1.11:5004` (Léa Windows) étaient rejetées par engineio,
|
||||
provoquant `is not an accepted origin` dans le journal (24 mai 2026).
|
||||
|
||||
Fix : élargissement de `_ALLOWED_ORIGINS` dans agent_chat/app.py l. 83-99,
|
||||
plus override possible via `LEA_CORS_ALLOWED_ORIGINS=comma,separated`.
|
||||
|
||||
Référence : inbox_codex/2026-05-25_1235_..._enquete-feedbackbus-5004.md
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lan_self_loopback_origin_allowed():
|
||||
"""Le serveur doit accepter sa propre origine `http://192.168.1.40:5004`."""
|
||||
from agent_chat import app
|
||||
assert "http://192.168.1.40:5004" in app._ALLOWED_ORIGINS, (
|
||||
"Origine self loopback 5004 absente — engineio va rejeter les "
|
||||
"connexions SocketIO depuis le serveur lui-même (cf. journal "
|
||||
"2026-05-24 11:00:47)."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lan_lea_windows_origin_allowed():
|
||||
"""Le serveur doit accepter l'origine Léa Windows `http://192.168.1.11:5004`."""
|
||||
from agent_chat import app
|
||||
assert "http://192.168.1.11:5004" in app._ALLOWED_ORIGINS, (
|
||||
"Origine Léa Windows 5004 absente — la ChatWindow tkinter ne peut "
|
||||
"pas établir une session SocketIO."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_legacy_origins_preserved():
|
||||
"""Les origines historiques doivent rester acceptées (pas de régression)."""
|
||||
from agent_chat import app
|
||||
for origin in [
|
||||
"http://localhost:3002",
|
||||
"http://localhost:5002",
|
||||
"https://vwb.labs.laurinebazin.design",
|
||||
"https://lea.labs.laurinebazin.design",
|
||||
]:
|
||||
assert origin in app._ALLOWED_ORIGINS, f"Origine historique perdue : {origin}"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_env_override_extends_allowed_origins(monkeypatch):
|
||||
"""`LEA_CORS_ALLOWED_ORIGINS=...` étend la liste par défaut."""
|
||||
monkeypatch.setenv(
|
||||
"LEA_CORS_ALLOWED_ORIGINS",
|
||||
"https://demo.client.example,http://10.0.0.5:5004",
|
||||
)
|
||||
# Re-import du module pour relire l'env
|
||||
import agent_chat.app as app_module
|
||||
importlib.reload(app_module)
|
||||
assert "https://demo.client.example" in app_module._ALLOWED_ORIGINS
|
||||
assert "http://10.0.0.5:5004" in app_module._ALLOWED_ORIGINS
|
||||
# Origines par défaut toujours présentes
|
||||
assert "http://192.168.1.40:5004" in app_module._ALLOWED_ORIGINS
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_env_override_empty_keeps_defaults(monkeypatch):
|
||||
"""`LEA_CORS_ALLOWED_ORIGINS=''` ne casse rien — défauts conservés."""
|
||||
monkeypatch.setenv("LEA_CORS_ALLOWED_ORIGINS", "")
|
||||
import agent_chat.app as app_module
|
||||
importlib.reload(app_module)
|
||||
assert "http://192.168.1.40:5004" in app_module._ALLOWED_ORIGINS
|
||||
assert len(app_module._ALLOWED_ORIGINS) >= 9, (
|
||||
"Liste tronquée : attendu au moins 9 origines par défaut"
|
||||
)
|
||||
526
tests/unit/test_agent_chat_learn_action.py
Normal file
526
tests/unit/test_agent_chat_learn_action.py
Normal file
@@ -0,0 +1,526 @@
|
||||
"""Tests unit pour agent_chat.handlers.learn_action.
|
||||
|
||||
Couvre :
|
||||
- LearnIntentParser (regex)
|
||||
- OptionCFormatter
|
||||
- StateStore (write atomique + reprise)
|
||||
- LearnActionOrchestrator (transitions, garde-fous, persistance)
|
||||
- PersistPayloadBuilder
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from agent_chat.handlers.learn_action import (
|
||||
LearnActionOrchestrator,
|
||||
LearnIntent,
|
||||
LearnIntentParser,
|
||||
LearnState,
|
||||
OptionCFormatter,
|
||||
PersistPayloadBuilder,
|
||||
SessionState,
|
||||
StateStore,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# LearnIntentParser
|
||||
# ============================================================
|
||||
class TestLearnIntentParser:
|
||||
def setup_method(self):
|
||||
# Désactive le LLM fallback pour isoler les tests regex
|
||||
self.parser = LearnIntentParser(use_llm_fallback=False)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"msg",
|
||||
[
|
||||
"apprends-moi",
|
||||
"Apprends moi",
|
||||
"regarde-moi faire",
|
||||
"observe",
|
||||
"enregistre",
|
||||
"on apprend",
|
||||
"tu vas apprendre",
|
||||
"Léa apprends",
|
||||
],
|
||||
)
|
||||
def test_start_observe(self, msg):
|
||||
r = self.parser.parse(msg, current_state=LearnState.IDLE)
|
||||
assert r.intent == LearnIntent.START_OBSERVE
|
||||
assert r.confidence >= 0.9
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"msg",
|
||||
[
|
||||
"stop",
|
||||
"c'est bon",
|
||||
"j'ai fini",
|
||||
"voilà c'est tout",
|
||||
"fini",
|
||||
"arrête",
|
||||
"termine",
|
||||
],
|
||||
)
|
||||
def test_user_stop_observe(self, msg):
|
||||
r = self.parser.parse(msg, current_state=LearnState.WAITING_USER_STOP)
|
||||
assert r.intent == LearnIntent.USER_STOP_OBSERVE
|
||||
|
||||
def test_correct_step_with_index(self):
|
||||
r = self.parser.parse(
|
||||
"Corrige l'étape 3 : il faut cliquer sur Valider",
|
||||
current_state=LearnState.ITERATING_FEEDBACK,
|
||||
)
|
||||
assert r.intent == LearnIntent.CORRECT_STEP
|
||||
assert r.step_index == 3
|
||||
assert "valider" in (r.extra.get("new_intent") or "").lower()
|
||||
|
||||
def test_undo_step(self):
|
||||
r = self.parser.parse(
|
||||
"Retire l'étape 2", current_state=LearnState.ITERATING_FEEDBACK
|
||||
)
|
||||
assert r.intent == LearnIntent.UNDO_STEP
|
||||
assert r.step_index == 2
|
||||
|
||||
def test_merge_next(self):
|
||||
r = self.parser.parse(
|
||||
"Fusionne avec la suivante", current_state=LearnState.ITERATING_FEEDBACK
|
||||
)
|
||||
assert r.intent == LearnIntent.MERGE_NEXT
|
||||
|
||||
def test_split_step(self):
|
||||
r = self.parser.parse(
|
||||
"Coupe l'étape 4", current_state=LearnState.ITERATING_FEEDBACK
|
||||
)
|
||||
assert r.intent == LearnIntent.SPLIT_STEP
|
||||
assert r.step_index == 4
|
||||
|
||||
def test_cancel(self):
|
||||
r = self.parser.parse("annule tout", current_state=LearnState.LISTENING)
|
||||
assert r.intent == LearnIntent.CANCEL
|
||||
|
||||
def test_validate_in_iterating(self):
|
||||
r = self.parser.parse(
|
||||
"c'est parfait", current_state=LearnState.ITERATING_FEEDBACK
|
||||
)
|
||||
assert r.intent == LearnIntent.VALIDATE_STEP
|
||||
|
||||
def test_mark_parameter_variable(self):
|
||||
r = self.parser.parse(
|
||||
"ça change à chaque fois", current_state=LearnState.NAMING
|
||||
)
|
||||
assert r.intent == LearnIntent.MARK_PARAMETER
|
||||
assert r.extra.get("is_parameter") is True
|
||||
|
||||
def test_mark_parameter_constant(self):
|
||||
r = self.parser.parse(
|
||||
"toujours pareil", current_state=LearnState.NAMING
|
||||
)
|
||||
assert r.intent == LearnIntent.MARK_PARAMETER
|
||||
assert r.extra.get("is_parameter") is False
|
||||
|
||||
def test_name_competence_when_naming(self):
|
||||
r = self.parser.parse(
|
||||
"facturation urgences", current_state=LearnState.NAMING
|
||||
)
|
||||
assert r.intent == LearnIntent.NAME_COMPETENCE
|
||||
assert "facturation" in (r.extra.get("name") or "")
|
||||
|
||||
def test_unknown_in_idle(self):
|
||||
r = self.parser.parse(
|
||||
"blabla random", current_state=LearnState.IDLE
|
||||
)
|
||||
assert r.intent == LearnIntent.UNKNOWN
|
||||
|
||||
def test_llm_fallback_disabled_after_failure(self, monkeypatch):
|
||||
# Active le LLM mais simule une erreur réseau
|
||||
parser = LearnIntentParser(use_llm_fallback=True)
|
||||
# Force exception sur httpx
|
||||
parser._parse_llm = lambda *args, **kwargs: None # type: ignore[method-assign]
|
||||
r = parser.parse("zorglub blabla truc", current_state=LearnState.IDLE)
|
||||
# Doit retomber gracieusement sur UNKNOWN sans crasher
|
||||
assert r.intent == LearnIntent.UNKNOWN
|
||||
|
||||
|
||||
# ============================================================
|
||||
# OptionCFormatter
|
||||
# ============================================================
|
||||
class TestOptionCFormatter:
|
||||
def setup_method(self):
|
||||
self.fmt = OptionCFormatter()
|
||||
|
||||
def test_empty(self):
|
||||
assert "aucune étape" in self.fmt.format([])
|
||||
|
||||
def test_simple_click(self):
|
||||
understanding = [
|
||||
{"action_type": "click", "target_label": "Valider", "widget_type": "Bouton"}
|
||||
]
|
||||
out = self.fmt.format(understanding)
|
||||
assert "1." in out
|
||||
assert "« Valider »" in out
|
||||
assert "cliqué" in out
|
||||
|
||||
def test_type_with_value(self):
|
||||
understanding = [
|
||||
{
|
||||
"action_type": "type",
|
||||
"target_label": "IPP",
|
||||
"widget_type": "Champ",
|
||||
"value": "25003284",
|
||||
}
|
||||
]
|
||||
out = self.fmt.format(understanding)
|
||||
assert "« IPP »" in out
|
||||
assert "« 25003284 »" in out
|
||||
assert "saisi" in out
|
||||
|
||||
def test_low_confidence_suffix(self):
|
||||
understanding = [
|
||||
{
|
||||
"action_type": "click",
|
||||
"target_label": "Patient",
|
||||
"widget_type": "Fenêtre",
|
||||
"confidence_ocr": 0.4,
|
||||
}
|
||||
]
|
||||
out = self.fmt.format(understanding)
|
||||
assert "(à confirmer)" in out
|
||||
|
||||
def test_unknown_action_fallback(self):
|
||||
understanding = [{"action_type": "wibble", "target_label": "X"}]
|
||||
out = self.fmt.format(understanding)
|
||||
assert "effectuée" in out
|
||||
|
||||
def test_closing_question(self):
|
||||
q = self.fmt.closing_question()
|
||||
assert "trompée" in q or "trompee" in q.lower().replace("é", "e")
|
||||
|
||||
|
||||
# ============================================================
|
||||
# StateStore
|
||||
# ============================================================
|
||||
class TestStateStore:
|
||||
def test_save_and_load(self, tmp_path):
|
||||
store = StateStore(tmp_path)
|
||||
st = SessionState(
|
||||
session_id="abc123",
|
||||
user_id="dom",
|
||||
state=LearnState.ITERATING_FEEDBACK,
|
||||
)
|
||||
store.save(st)
|
||||
loaded = store.load("abc123")
|
||||
assert loaded is not None
|
||||
assert loaded.session_id == "abc123"
|
||||
assert loaded.user_id == "dom"
|
||||
assert loaded.state == LearnState.ITERATING_FEEDBACK
|
||||
|
||||
def test_atomic_write_no_partial(self, tmp_path):
|
||||
store = StateStore(tmp_path)
|
||||
st = SessionState(session_id="atomic1")
|
||||
store.save(st)
|
||||
# Pas de fichier .tmp restant
|
||||
tmp_files = list(tmp_path.glob("*.tmp"))
|
||||
assert tmp_files == []
|
||||
|
||||
def test_list_active_filters_done(self, tmp_path):
|
||||
store = StateStore(tmp_path)
|
||||
store.save(SessionState(session_id="s1", state=LearnState.ITERATING_FEEDBACK))
|
||||
store.save(SessionState(session_id="s2", state=LearnState.DONE))
|
||||
store.save(SessionState(session_id="s3", state=LearnState.ABORTED))
|
||||
active = store.list_active()
|
||||
ids = {s.session_id for s in active}
|
||||
assert ids == {"s1"}
|
||||
|
||||
def test_session_id_sanitized(self, tmp_path):
|
||||
store = StateStore(tmp_path)
|
||||
st = SessionState(session_id="../../etc/passwd")
|
||||
store.save(st)
|
||||
# Aucun fichier hors tmp_path
|
||||
files = list(tmp_path.glob("*.json"))
|
||||
assert len(files) == 1
|
||||
assert files[0].parent == tmp_path
|
||||
|
||||
def test_delete(self, tmp_path):
|
||||
store = StateStore(tmp_path)
|
||||
store.save(SessionState(session_id="del_me"))
|
||||
store.delete("del_me")
|
||||
assert store.load("del_me") is None
|
||||
|
||||
|
||||
# ============================================================
|
||||
# PersistPayloadBuilder
|
||||
# ============================================================
|
||||
class TestPersistPayloadBuilder:
|
||||
def test_build_with_parameters(self):
|
||||
st = SessionState(
|
||||
session_id="sX",
|
||||
competence_name="Test compétence",
|
||||
user_id="dom",
|
||||
parameters_marked=[
|
||||
{
|
||||
"step_index": 3,
|
||||
"is_parameter": True,
|
||||
"name": "ipp",
|
||||
"example_value": "25003284",
|
||||
"field_label": "IPP",
|
||||
},
|
||||
{
|
||||
"step_index": 4,
|
||||
"is_parameter": False,
|
||||
"name": "type",
|
||||
"example_value": "C2",
|
||||
"field_label": "Type",
|
||||
},
|
||||
],
|
||||
)
|
||||
payload = PersistPayloadBuilder().build(st)
|
||||
assert payload["name"] == "Test compétence"
|
||||
assert payload["session_id"] == "sX"
|
||||
assert payload["user_id"] == "dom"
|
||||
# Seul le param flagué is_parameter=True doit apparaître
|
||||
assert len(payload["parameters"]) == 1
|
||||
assert payload["parameters"][0]["name"] == "ipp"
|
||||
|
||||
def test_persist_payload_includes_machine_id(self):
|
||||
"""Correction #1 — payload doit inclure machine_id."""
|
||||
st = SessionState(
|
||||
session_id="sM",
|
||||
competence_name="X",
|
||||
machine_id="DESKTOP-58D5CAC_windows",
|
||||
)
|
||||
payload = PersistPayloadBuilder().build(st)
|
||||
assert "machine_id" in payload
|
||||
assert payload["machine_id"] == "DESKTOP-58D5CAC_windows"
|
||||
|
||||
def test_persist_payload_machine_id_none_when_absent(self):
|
||||
"""Quand non fourni, machine_id reste présent à None dans le payload."""
|
||||
st = SessionState(session_id="sM2", competence_name="X")
|
||||
payload = PersistPayloadBuilder().build(st)
|
||||
assert "machine_id" in payload
|
||||
assert payload["machine_id"] is None
|
||||
|
||||
|
||||
# ============================================================
|
||||
# LearnActionOrchestrator (avec StreamingClient mocké)
|
||||
# ============================================================
|
||||
@pytest.fixture
|
||||
def mock_streaming():
|
||||
"""StreamingClient simulé."""
|
||||
m = MagicMock()
|
||||
m.shadow_start.return_value = {"ok": True}
|
||||
m.shadow_stop.return_value = {"ok": True}
|
||||
m.shadow_understanding.return_value = {
|
||||
"understanding": [
|
||||
{"action_type": "click", "target_label": "Patient", "widget_type": "Fenêtre"},
|
||||
{
|
||||
"action_type": "type",
|
||||
"target_label": "IPP",
|
||||
"widget_type": "Champ",
|
||||
"value": "25003284",
|
||||
},
|
||||
]
|
||||
}
|
||||
m.shadow_feedback.return_value = {"ok": True}
|
||||
m.shadow_build.return_value = {"ok": True}
|
||||
m.competence_persist.return_value = {"slug": "facturation_urgences"}
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def orchestrator(tmp_path, mock_streaming):
|
||||
parser = LearnIntentParser(use_llm_fallback=False)
|
||||
store = StateStore(tmp_path)
|
||||
return LearnActionOrchestrator(
|
||||
streaming_client=mock_streaming,
|
||||
intent_parser=parser,
|
||||
state_store=store,
|
||||
emit=MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
class TestLearnActionOrchestrator:
|
||||
def test_start_session_transitions(self, orchestrator, mock_streaming):
|
||||
st, reply = orchestrator.start_session(user_id="dom", trigger_source="button")
|
||||
assert st.state == LearnState.WAITING_USER_STOP
|
||||
mock_streaming.shadow_start.assert_called_once()
|
||||
assert "je te regarde" in reply.lower() or "regarde" in reply.lower()
|
||||
|
||||
def test_full_happy_path(self, orchestrator, mock_streaming):
|
||||
st, _ = orchestrator.start_session(user_id="dom", machine_id="m1")
|
||||
sid = st.session_id
|
||||
|
||||
# Utilisateur dit stop
|
||||
reply = orchestrator.handle_chat_message(sid, "c'est bon")
|
||||
assert reply is not None
|
||||
assert "j'ai compris" in reply.lower()
|
||||
assert orchestrator._sessions[sid].state == LearnState.ITERATING_FEEDBACK
|
||||
|
||||
# Utilisateur valide globalement → NAMING
|
||||
reply = orchestrator.handle_chat_message(sid, "c'est parfait")
|
||||
assert orchestrator._sessions[sid].state == LearnState.NAMING
|
||||
|
||||
# Nomination
|
||||
reply = orchestrator.handle_chat_message(sid, "facturation urgences")
|
||||
# Maintenant Léa doit poser une question sur le paramètre IPP
|
||||
assert "25003284" in (reply or "")
|
||||
assert orchestrator._sessions[sid].competence_name == "facturation urgences"
|
||||
|
||||
# Marquer le paramètre comme variable
|
||||
reply = orchestrator.handle_chat_message(sid, "ça change à chaque fois")
|
||||
# Plus de pending → persist
|
||||
mock_streaming.shadow_build.assert_called_once()
|
||||
mock_streaming.competence_persist.assert_called_once()
|
||||
assert orchestrator._sessions[sid].state == LearnState.DONE
|
||||
|
||||
def test_emergency_exit_after_3_corrections(self, orchestrator, mock_streaming):
|
||||
st, _ = orchestrator.start_session(user_id="dom")
|
||||
sid = st.session_id
|
||||
orchestrator.handle_chat_message(sid, "c'est bon") # stop
|
||||
|
||||
for i in range(3):
|
||||
r = orchestrator.handle_chat_message(
|
||||
sid, "corrige l'étape 3 : clique sur Valider"
|
||||
)
|
||||
assert orchestrator._sessions[sid].state == LearnState.ITERATING_FEEDBACK
|
||||
|
||||
# 4e correction → ABORTED
|
||||
r = orchestrator.handle_chat_message(
|
||||
sid, "corrige l'étape 3 : clique sur Valider"
|
||||
)
|
||||
assert orchestrator._sessions[sid].state == LearnState.ABORTED
|
||||
assert "n°3" in (r or "")
|
||||
|
||||
def test_cancel_anywhere(self, orchestrator, mock_streaming):
|
||||
st, _ = orchestrator.start_session(user_id="dom")
|
||||
sid = st.session_id
|
||||
reply = orchestrator.handle_chat_message(sid, "annule tout")
|
||||
assert orchestrator._sessions[sid].state == LearnState.ABORTED
|
||||
assert "annule" in (reply or "").lower()
|
||||
|
||||
def test_idle_message_returns_none(self, orchestrator):
|
||||
# Aucune session ouverte → None (laisser le flux normal gérer)
|
||||
r = orchestrator.handle_chat_message("nonexistent", "Bonjour")
|
||||
assert r is None
|
||||
|
||||
def test_state_persistence_across_reload(self, tmp_path, mock_streaming):
|
||||
store = StateStore(tmp_path)
|
||||
parser = LearnIntentParser(use_llm_fallback=False)
|
||||
orch1 = LearnActionOrchestrator(
|
||||
streaming_client=mock_streaming,
|
||||
intent_parser=parser,
|
||||
state_store=store,
|
||||
emit=MagicMock(),
|
||||
)
|
||||
st, _ = orch1.start_session(user_id="dom")
|
||||
sid = st.session_id
|
||||
orch1.handle_chat_message(sid, "c'est bon") # passe en ITERATING_FEEDBACK
|
||||
|
||||
# Simule un crash + redémarrage
|
||||
orch2 = LearnActionOrchestrator(
|
||||
streaming_client=mock_streaming,
|
||||
intent_parser=parser,
|
||||
state_store=store,
|
||||
emit=MagicMock(),
|
||||
)
|
||||
resumed = orch2.resume_sessions()
|
||||
assert sid in resumed
|
||||
assert orch2._sessions[sid].state == LearnState.ITERATING_FEEDBACK
|
||||
|
||||
def test_proactive_signal_cooldown(self, orchestrator):
|
||||
r1 = orchestrator.handle_proactive_signal("action_repeat", {})
|
||||
assert r1 is not None
|
||||
# Deuxième signal immédiat → ignoré
|
||||
r2 = orchestrator.handle_proactive_signal("action_repeat", {})
|
||||
assert r2 is None
|
||||
|
||||
def test_illegal_transition_ignored(self, orchestrator, mock_streaming):
|
||||
st, _ = orchestrator.start_session(user_id="dom")
|
||||
# Tentative de passer directement de WAITING_USER_STOP à DONE
|
||||
prev = orchestrator._sessions[st.session_id].state
|
||||
orchestrator._transition(
|
||||
orchestrator._sessions[st.session_id], LearnState.DONE
|
||||
)
|
||||
assert orchestrator._sessions[st.session_id].state == prev
|
||||
|
||||
# ============================================================
|
||||
# Corrections P1-LEA-SHADOW 2026-06-01 (NO-GO Qwen)
|
||||
# ============================================================
|
||||
def test_start_session_stores_machine_id(self, orchestrator):
|
||||
"""Correction #1 — machine_id transmis à start_session est stocké."""
|
||||
st, _ = orchestrator.start_session(
|
||||
user_id="dom",
|
||||
trigger_source="windows_button",
|
||||
machine_id="DESKTOP-58D5CAC_windows",
|
||||
)
|
||||
assert st.machine_id == "DESKTOP-58D5CAC_windows"
|
||||
# Et la session en mémoire aussi
|
||||
assert (
|
||||
orchestrator._sessions[st.session_id].machine_id
|
||||
== "DESKTOP-58D5CAC_windows"
|
||||
)
|
||||
|
||||
def test_persist_blocked_without_machine_id(self, orchestrator, mock_streaming):
|
||||
"""Correction #1 — persist refusé conversationnellement sans machine_id."""
|
||||
st, _ = orchestrator.start_session(user_id="dom") # pas de machine_id
|
||||
sid = st.session_id
|
||||
orchestrator.handle_chat_message(sid, "c'est bon") # → ITERATING
|
||||
orchestrator.handle_chat_message(sid, "c'est parfait") # → NAMING
|
||||
orchestrator.handle_chat_message(sid, "ma competence") # nom
|
||||
# Marquer paramètre → tentative persist
|
||||
reply = orchestrator.handle_chat_message(sid, "ça change à chaque fois")
|
||||
# competence_persist NE doit PAS avoir été appelée
|
||||
mock_streaming.competence_persist.assert_not_called()
|
||||
# Message métier explicite côté Léa
|
||||
assert reply is not None
|
||||
assert "machine" in reply.lower()
|
||||
|
||||
def test_datetime_uses_timezone_aware(self):
|
||||
"""Correction #2 — created_at / last_transition_at sont timezone-aware."""
|
||||
st = SessionState(session_id="tz1")
|
||||
# Le format ISO doit contenir un offset (+00:00 ou Z) — tzinfo présent
|
||||
# après reparse via fromisoformat (Python 3.11+).
|
||||
from datetime import datetime as _dt
|
||||
parsed_created = _dt.fromisoformat(st.created_at)
|
||||
parsed_transition = _dt.fromisoformat(st.last_transition_at)
|
||||
assert parsed_created.tzinfo is not None
|
||||
assert parsed_transition.tzinfo is not None
|
||||
# Sanity check : c'est bien UTC.
|
||||
assert "+00:00" in st.created_at or st.created_at.endswith("Z")
|
||||
|
||||
def test_confirm_blocked_when_name_missing(self, orchestrator, mock_streaming):
|
||||
"""Correction #3 — CONFIRM en NAMING avec competence_name=None reste NAMING."""
|
||||
st, _ = orchestrator.start_session(
|
||||
user_id="dom", machine_id="machine_x"
|
||||
)
|
||||
sid = st.session_id
|
||||
orchestrator.handle_chat_message(sid, "c'est bon")
|
||||
orchestrator.handle_chat_message(sid, "c'est parfait") # → NAMING
|
||||
# Forcer competence_name à None et envoyer un CONFIRM
|
||||
orchestrator._sessions[sid].competence_name = None
|
||||
reply = orchestrator.handle_chat_message(sid, "ok") # CONFIRM
|
||||
assert orchestrator._sessions[sid].state == LearnState.NAMING
|
||||
assert reply is not None
|
||||
assert "nom" in reply.lower() or "appeler" in reply.lower()
|
||||
mock_streaming.competence_persist.assert_not_called()
|
||||
|
||||
def test_confirm_blocked_when_name_empty(self, orchestrator, mock_streaming):
|
||||
"""Correction #3 — CONFIRM en NAMING avec competence_name='' reste NAMING."""
|
||||
st, _ = orchestrator.start_session(
|
||||
user_id="dom", machine_id="machine_x"
|
||||
)
|
||||
sid = st.session_id
|
||||
orchestrator.handle_chat_message(sid, "c'est bon")
|
||||
orchestrator.handle_chat_message(sid, "c'est parfait") # → NAMING
|
||||
orchestrator._sessions[sid].competence_name = " " # vide après strip
|
||||
reply = orchestrator.handle_chat_message(sid, "ok")
|
||||
assert orchestrator._sessions[sid].state == LearnState.NAMING
|
||||
assert reply is not None
|
||||
assert "nom" in reply.lower() or "appeler" in reply.lower()
|
||||
mock_streaming.competence_persist.assert_not_called()
|
||||
121
tests/unit/test_autonomous_planner_owl_flag.py
Normal file
121
tests/unit/test_autonomous_planner_owl_flag.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Tests pour le feature flag AGENT_CHAT_ENABLE_OWL (C1b).
|
||||
|
||||
Contexte : depuis 2026-05-25, OWL-v2 ne se charge plus au boot du service
|
||||
rpa-agent-chat par défaut (économie ~600 MiB VRAM constatée par Codex après
|
||||
restart C1). Activation via AGENT_CHAT_ENABLE_OWL=1.
|
||||
|
||||
Référence : inbox_claude/2026-05-25_1327_codex-to-claude_C1-post-restart-ok-c1b-vram.md
|
||||
Fix : agent_chat/autonomous_planner.py _init_visual_detection() l. 139-...
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_owl_skipped_by_default(monkeypatch):
|
||||
"""Sans AGENT_CHAT_ENABLE_OWL, OWL ne doit PAS se charger au boot."""
|
||||
monkeypatch.delenv("AGENT_CHAT_ENABLE_OWL", raising=False)
|
||||
from agent_chat.autonomous_planner import AutonomousPlanner
|
||||
|
||||
planner = AutonomousPlanner(llm_model="qwen2.5:7b")
|
||||
assert planner._owl_detector is None, (
|
||||
f"OWL chargé alors que flag OFF (économie VRAM perdue) : "
|
||||
f"{planner._owl_detector}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_owl_skipped_when_flag_zero(monkeypatch):
|
||||
"""AGENT_CHAT_ENABLE_OWL=0 → OWL skip."""
|
||||
monkeypatch.setenv("AGENT_CHAT_ENABLE_OWL", "0")
|
||||
from agent_chat.autonomous_planner import AutonomousPlanner
|
||||
|
||||
planner = AutonomousPlanner(llm_model="qwen2.5:7b")
|
||||
assert planner._owl_detector is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_owl_skipped_when_flag_false(monkeypatch):
|
||||
"""AGENT_CHAT_ENABLE_OWL=false → OWL skip (alias accepté)."""
|
||||
monkeypatch.setenv("AGENT_CHAT_ENABLE_OWL", "false")
|
||||
from agent_chat.autonomous_planner import AutonomousPlanner
|
||||
|
||||
planner = AutonomousPlanner(llm_model="qwen2.5:7b")
|
||||
assert planner._owl_detector is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_owl_init_attempted_when_flag_one(monkeypatch):
|
||||
"""AGENT_CHAT_ENABLE_OWL=1 → tentative d'init (succès ou échec rattrapé).
|
||||
|
||||
Le test ne valide PAS que OWL charge effectivement (dépend GPU + modèle
|
||||
HF disponible), juste que le code passe la garde du flag et tente l'init.
|
||||
On mocke OwlDetector pour vérifier qu'il est instancié.
|
||||
"""
|
||||
monkeypatch.setenv("AGENT_CHAT_ENABLE_OWL", "1")
|
||||
from agent_chat import autonomous_planner as ap_module
|
||||
|
||||
calls = []
|
||||
|
||||
class FakeOwl:
|
||||
def __init__(self, **kwargs):
|
||||
calls.append(kwargs)
|
||||
|
||||
monkeypatch.setattr(ap_module, "OwlDetector", FakeOwl)
|
||||
monkeypatch.setattr(ap_module, "VISUAL_DETECTION_AVAILABLE", True)
|
||||
|
||||
planner = ap_module.AutonomousPlanner(llm_model="qwen2.5:7b")
|
||||
assert planner._owl_detector is not None, (
|
||||
"OWL doit être instancié quand AGENT_CHAT_ENABLE_OWL=1"
|
||||
)
|
||||
assert len(calls) == 1
|
||||
assert calls[0].get("confidence_threshold") == 0.1
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_owl_device_override(monkeypatch):
|
||||
"""AGENT_CHAT_OWL_DEVICE=cpu force le device CPU même si CUDA dispo."""
|
||||
monkeypatch.setenv("AGENT_CHAT_ENABLE_OWL", "1")
|
||||
monkeypatch.setenv("AGENT_CHAT_OWL_DEVICE", "cpu")
|
||||
from agent_chat import autonomous_planner as ap_module
|
||||
|
||||
calls = []
|
||||
|
||||
class FakeOwl:
|
||||
def __init__(self, **kwargs):
|
||||
calls.append(kwargs)
|
||||
|
||||
monkeypatch.setattr(ap_module, "OwlDetector", FakeOwl)
|
||||
monkeypatch.setattr(ap_module, "VISUAL_DETECTION_AVAILABLE", True)
|
||||
|
||||
ap_module.AutonomousPlanner(llm_model="qwen2.5:7b")
|
||||
assert calls[0].get("device") == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_owl_init_exception_caught(monkeypatch):
|
||||
"""Si OWL crash à l'init (OOM CUDA, modèle absent, etc.), AutonomousPlanner
|
||||
doit continuer à booter avec _owl_detector=None."""
|
||||
monkeypatch.setenv("AGENT_CHAT_ENABLE_OWL", "1")
|
||||
from agent_chat import autonomous_planner as ap_module
|
||||
|
||||
class CrashOwl:
|
||||
def __init__(self, **kwargs):
|
||||
raise RuntimeError("CUDA out of memory (simulation)")
|
||||
|
||||
monkeypatch.setattr(ap_module, "OwlDetector", CrashOwl)
|
||||
monkeypatch.setattr(ap_module, "VISUAL_DETECTION_AVAILABLE", True)
|
||||
|
||||
planner = ap_module.AutonomousPlanner(llm_model="qwen2.5:7b")
|
||||
assert planner._owl_detector is None, (
|
||||
"L'exception doit être catchée — AutonomousPlanner ne doit pas crash"
|
||||
)
|
||||
@@ -120,7 +120,7 @@ class TestDispatchPausedAction:
|
||||
|
||||
|
||||
class TestPausedBubbleHeight:
|
||||
"""Couvre _compute_paused_bubble_height — patch troncature 22 mai 2026."""
|
||||
"""Couvre _compute_paused_bubble_height — anti-troncature pause UI."""
|
||||
|
||||
def test_empty_message_uses_minimum_height(self):
|
||||
h, scroll = ChatWindow._compute_paused_bubble_height("")
|
||||
@@ -133,10 +133,27 @@ class TestPausedBubbleHeight:
|
||||
assert scroll is False
|
||||
|
||||
def test_long_single_line_triggers_scrollbar(self):
|
||||
# ~600 chars sans \n → wrapped_lines = 600 // 60 + 1 = 11
|
||||
msg = "x" * 600
|
||||
h, scroll = ChatWindow._compute_paused_bubble_height(msg)
|
||||
assert h == 11
|
||||
assert h == 12
|
||||
assert scroll is True
|
||||
|
||||
def test_narrow_window_estimate_keeps_wrong_window_message_visible(self):
|
||||
"""Cas observé sur Windows : fenêtre Léa ~380px, message wrong_window
|
||||
coupé après "attendu". Avec ~34 caractères par ligne, il faut
|
||||
prévoir assez de lignes pour afficher le détail."""
|
||||
msg = (
|
||||
"Je m'attendais à voir la bonne fenêtre mais je vois autre chose. "
|
||||
"Peux-tu vérifier que l'application est au premier plan ? "
|
||||
"(Fenêtre incorrecte : attendu "
|
||||
"'http192.168.1.408765dossier.htmlid=.txt - Bloc-notes', "
|
||||
"actuel 'Program Manager')"
|
||||
)
|
||||
h, scroll = ChatWindow._compute_paused_bubble_height(
|
||||
msg,
|
||||
chars_per_line=34,
|
||||
)
|
||||
assert h >= 7
|
||||
assert scroll is True
|
||||
|
||||
def test_message_with_many_newlines_uses_explicit_count(self):
|
||||
@@ -150,11 +167,11 @@ class TestPausedBubbleHeight:
|
||||
assert scroll is False
|
||||
|
||||
def test_cap_reached_triggers_scrollbar_even_if_short(self):
|
||||
"""Quand on dépasse le cap (12 lignes), la scrollbar DOIT
|
||||
"""Quand on dépasse le cap, la scrollbar DOIT
|
||||
s'afficher quel que soit la longueur en caractères."""
|
||||
msg = "\n".join([f"l{i}" for i in range(20)])
|
||||
h, scroll = ChatWindow._compute_paused_bubble_height(msg)
|
||||
assert h == 12 # plafond
|
||||
assert h == 14 # plafond
|
||||
assert scroll is True
|
||||
|
||||
def test_long_content_triggers_scrollbar_at_200_chars(self):
|
||||
@@ -163,3 +180,18 @@ class TestPausedBubbleHeight:
|
||||
msg = "x" * 220
|
||||
h, scroll = ChatWindow._compute_paused_bubble_height(msg)
|
||||
assert scroll is True
|
||||
|
||||
def test_dynamic_small_viewport_caps_rows_and_scrolls(self):
|
||||
msg = (
|
||||
"Je m'attendais à voir la bonne fenêtre mais je vois autre chose. "
|
||||
"Peux-tu vérifier que l'application est au premier plan ? "
|
||||
"(Post-vérif échouée : fenêtre '*test – Bloc-notes' au lieu de "
|
||||
"'Enregistrer sous')"
|
||||
)
|
||||
h, scroll = ChatWindow._compute_paused_bubble_height(
|
||||
msg,
|
||||
chars_per_line=32,
|
||||
max_rows=5,
|
||||
)
|
||||
assert h == 5
|
||||
assert scroll is True
|
||||
|
||||
269
tests/unit/test_enrich_click_skip_build_vision.py
Normal file
269
tests/unit/test_enrich_click_skip_build_vision.py
Normal file
@@ -0,0 +1,269 @@
|
||||
"""Tests C2d-bis : short-circuit SomEngine + _gemma4_read_element au build.
|
||||
|
||||
Niveau A : si vision_info.text non vide → SomEngine pas appelé (faible risque,
|
||||
comportement par défaut depuis 2026-05-25).
|
||||
Niveau B : flag RPA_SKIP_BUILD_VISION (ou alias RPA_SKIP_BUILD_VLM) actif →
|
||||
SomEngine + _gemma4_read_element jamais appelés, même si
|
||||
vision_info.text vide.
|
||||
|
||||
Référence : inbox_claude/2026-05-25_1700_codex-to-claude_AMEND-C2d-bis-gemini-short-circuit.md
|
||||
Découverte C2c : inbox_codex/2026-05-25_1500_claude-to-codex_C2c-analyse-step4-crops.md
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_screenshot(tmp_path):
|
||||
"""Crée un screenshot PNG bidon (1920x1080 gris) pour les tests."""
|
||||
from PIL import Image
|
||||
img = Image.new("RGB", (1920, 1080), color=(128, 128, 128))
|
||||
path = tmp_path / "shots" / "shot_0001_full.png"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
img.save(path, "PNG")
|
||||
return path
|
||||
|
||||
|
||||
def _make_session_dir(tmp_path):
|
||||
"""Session dir contenant shots/ vide (pour passer le check Path.is_dir)."""
|
||||
session = tmp_path / "session"
|
||||
(session / "shots").mkdir(parents=True, exist_ok=True)
|
||||
return session
|
||||
|
||||
|
||||
# ────────────────────────────────────────────────────────────────────────────
|
||||
# Niveau A — short-circuit vision_info.text
|
||||
# ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_niveau_a_vision_info_text_skips_som_and_gemma4(
|
||||
monkeypatch, fake_screenshot, tmp_path
|
||||
):
|
||||
"""vision_info.text non vide → SomEngine et _gemma4_read_element JAMAIS appelés."""
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VISION", raising=False)
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VLM", raising=False)
|
||||
from agent_v0.server_v1 import stream_processor as sp
|
||||
|
||||
# Mocks : SomEngine et gemma4 ne doivent PAS être appelés
|
||||
som_calls = []
|
||||
gemma_calls = []
|
||||
|
||||
def fake_som(*args, **kwargs):
|
||||
som_calls.append(args)
|
||||
return {"label": "should_not_be_used", "source": "som"}
|
||||
|
||||
def fake_gemma(*args, **kwargs):
|
||||
gemma_calls.append(args)
|
||||
return "should_not_be_used"
|
||||
|
||||
monkeypatch.setattr(sp, "_som_identify_clicked_element", fake_som)
|
||||
monkeypatch.setattr(sp, "_gemma4_read_element", fake_gemma)
|
||||
|
||||
result = sp.enrich_click_from_screenshot(
|
||||
screenshot_path=fake_screenshot,
|
||||
click_x=500,
|
||||
click_y=300,
|
||||
screen_w=1920,
|
||||
screen_h=1080,
|
||||
window_title="Bloc-notes",
|
||||
vision_info={"text": "Enregistrer", "type": "button"},
|
||||
session_dir=_make_session_dir(tmp_path),
|
||||
screenshot_id="shot_0001",
|
||||
)
|
||||
|
||||
# Assertions
|
||||
assert len(som_calls) == 0, f"SomEngine appelé alors que vision_info.text présent : {som_calls}"
|
||||
assert len(gemma_calls) == 0, f"_gemma4_read_element appelé : {gemma_calls}"
|
||||
# L'action garde tous les champs critiques
|
||||
assert result["by_text"] == "Enregistrer"
|
||||
assert result["by_text_source"] == "ocr"
|
||||
assert result["by_role"] == "button"
|
||||
assert result["window_title"] == "Bloc-notes"
|
||||
assert result["anchor_image_base64"] # crop calculé
|
||||
assert result["by_position"] == [round(500 / 1920, 6), round(300 / 1080, 6)]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_niveau_a_vision_info_text_empty_calls_som(
|
||||
monkeypatch, fake_screenshot, tmp_path
|
||||
):
|
||||
"""vision_info.text vide ET flag absent → SomEngine appelé (comportement legacy)."""
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VISION", raising=False)
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VLM", raising=False)
|
||||
from agent_v0.server_v1 import stream_processor as sp
|
||||
|
||||
som_calls = []
|
||||
gemma_calls = []
|
||||
|
||||
def fake_som(*args, **kwargs):
|
||||
som_calls.append(args)
|
||||
return {"label": "label_from_som", "source": "som"}
|
||||
|
||||
def fake_gemma(*args, **kwargs):
|
||||
gemma_calls.append(args)
|
||||
return "" # gemma trouve rien
|
||||
|
||||
monkeypatch.setattr(sp, "_som_identify_clicked_element", fake_som)
|
||||
monkeypatch.setattr(sp, "_gemma4_read_element", fake_gemma)
|
||||
|
||||
result = sp.enrich_click_from_screenshot(
|
||||
screenshot_path=fake_screenshot,
|
||||
click_x=500,
|
||||
click_y=300,
|
||||
screen_w=1920,
|
||||
screen_h=1080,
|
||||
window_title="App",
|
||||
vision_info={"text": "", "type": ""}, # vide
|
||||
session_dir=_make_session_dir(tmp_path),
|
||||
screenshot_id="shot_0001",
|
||||
)
|
||||
|
||||
# SomEngine doit être appelé (comportement legacy préservé)
|
||||
assert len(som_calls) == 1
|
||||
# Gemma appelé aussi car SomEngine label utilisé comme element_text → on
|
||||
# n'entre PAS dans le bloc gemma4
|
||||
# (cf. ligne 974-981 : si som_elem.label → element_text = som_elem.label)
|
||||
assert len(gemma_calls) == 0
|
||||
# by_text vient de SomEngine
|
||||
assert result["by_text"] == "label_from_som"
|
||||
assert result["by_text_source"] == "ocr"
|
||||
|
||||
|
||||
# ────────────────────────────────────────────────────────────────────────────
|
||||
# Niveau B — flag RPA_SKIP_BUILD_VISION
|
||||
# ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_niveau_b_flag_skip_build_vision_blocks_all(
|
||||
monkeypatch, fake_screenshot, tmp_path
|
||||
):
|
||||
"""RPA_SKIP_BUILD_VISION=true → SomEngine et gemma4 jamais appelés, même
|
||||
si vision_info.text est vide."""
|
||||
monkeypatch.setenv("RPA_SKIP_BUILD_VISION", "true")
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VLM", raising=False)
|
||||
from agent_v0.server_v1 import stream_processor as sp
|
||||
|
||||
som_calls = []
|
||||
gemma_calls = []
|
||||
monkeypatch.setattr(sp, "_som_identify_clicked_element",
|
||||
lambda *a, **kw: som_calls.append(a) or {"label": "X"})
|
||||
monkeypatch.setattr(sp, "_gemma4_read_element",
|
||||
lambda *a, **kw: gemma_calls.append(a) or "X")
|
||||
|
||||
result = sp.enrich_click_from_screenshot(
|
||||
screenshot_path=fake_screenshot,
|
||||
click_x=100, click_y=100,
|
||||
screen_w=1920, screen_h=1080,
|
||||
window_title="App",
|
||||
vision_info={"text": "", "type": ""},
|
||||
session_dir=_make_session_dir(tmp_path),
|
||||
screenshot_id="shot_0001",
|
||||
)
|
||||
|
||||
assert len(som_calls) == 0, f"SomEngine appelé malgré flag actif : {som_calls}"
|
||||
assert len(gemma_calls) == 0, f"gemma4 appelé malgré flag actif : {gemma_calls}"
|
||||
# Action conservée avec canaux fallback
|
||||
assert result["anchor_image_base64"] # crop préservé
|
||||
assert result["window_title"] == "App"
|
||||
assert result["by_position"] # position préservée
|
||||
# by_text vide acceptable (le replay tombera sur anchor/position)
|
||||
assert result["by_text"] == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_niveau_b_alias_skip_build_vlm_works(
|
||||
monkeypatch, fake_screenshot, tmp_path
|
||||
):
|
||||
"""Alias RPA_SKIP_BUILD_VLM=true accepté (compat message Codex 1650)."""
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VISION", raising=False)
|
||||
monkeypatch.setenv("RPA_SKIP_BUILD_VLM", "true")
|
||||
from agent_v0.server_v1 import stream_processor as sp
|
||||
|
||||
som_calls = []
|
||||
monkeypatch.setattr(sp, "_som_identify_clicked_element",
|
||||
lambda *a, **kw: som_calls.append(a))
|
||||
monkeypatch.setattr(sp, "_gemma4_read_element",
|
||||
lambda *a, **kw: "should_not_be_called")
|
||||
|
||||
sp.enrich_click_from_screenshot(
|
||||
screenshot_path=fake_screenshot,
|
||||
click_x=100, click_y=100,
|
||||
screen_w=1920, screen_h=1080,
|
||||
window_title="App",
|
||||
vision_info={"text": ""},
|
||||
session_dir=_make_session_dir(tmp_path),
|
||||
screenshot_id="shot_0001",
|
||||
)
|
||||
assert len(som_calls) == 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_flag_off_calls_som_when_no_vision_text(
|
||||
monkeypatch, fake_screenshot, tmp_path
|
||||
):
|
||||
"""Flag explicitement OFF + vision_info.text vide → comportement legacy."""
|
||||
monkeypatch.setenv("RPA_SKIP_BUILD_VISION", "0")
|
||||
monkeypatch.delenv("RPA_SKIP_BUILD_VLM", raising=False)
|
||||
from agent_v0.server_v1 import stream_processor as sp
|
||||
|
||||
som_calls = []
|
||||
monkeypatch.setattr(sp, "_som_identify_clicked_element",
|
||||
lambda *a, **kw: som_calls.append(a) or None)
|
||||
monkeypatch.setattr(sp, "_gemma4_read_element",
|
||||
lambda *a, **kw: "from_gemma")
|
||||
|
||||
result = sp.enrich_click_from_screenshot(
|
||||
screenshot_path=fake_screenshot,
|
||||
click_x=100, click_y=100,
|
||||
screen_w=1920, screen_h=1080,
|
||||
window_title="App",
|
||||
vision_info={"text": ""},
|
||||
session_dir=_make_session_dir(tmp_path),
|
||||
screenshot_id="shot_0001",
|
||||
)
|
||||
assert len(som_calls) == 1, "Flag OFF doit conserver SomEngine"
|
||||
# gemma4 appelé car SomEngine retourne None
|
||||
assert result["by_text"] == "from_gemma"
|
||||
assert result["by_text_source"] == "vlm"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_niveau_b_priority_over_niveau_a(
|
||||
monkeypatch, fake_screenshot, tmp_path
|
||||
):
|
||||
"""Flag actif + vision_info.text non vide → log skip_som flag, pas vision_info."""
|
||||
monkeypatch.setenv("RPA_SKIP_BUILD_VISION", "true")
|
||||
from agent_v0.server_v1 import stream_processor as sp
|
||||
|
||||
som_calls = []
|
||||
monkeypatch.setattr(sp, "_som_identify_clicked_element",
|
||||
lambda *a, **kw: som_calls.append(a))
|
||||
monkeypatch.setattr(sp, "_gemma4_read_element",
|
||||
lambda *a, **kw: "should_not")
|
||||
|
||||
result = sp.enrich_click_from_screenshot(
|
||||
screenshot_path=fake_screenshot,
|
||||
click_x=100, click_y=100,
|
||||
screen_w=1920, screen_h=1080,
|
||||
window_title="App",
|
||||
vision_info={"text": "Save", "type": "button"},
|
||||
session_dir=_make_session_dir(tmp_path),
|
||||
screenshot_id="shot_0001",
|
||||
)
|
||||
assert len(som_calls) == 0
|
||||
# vision_info.text reste utilisé (priorité ligne 974-981 préservée)
|
||||
assert result["by_text"] == "Save"
|
||||
assert result["by_text_source"] == "ocr"
|
||||
@@ -15,6 +15,7 @@ On teste deux choses :
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch, MagicMock
|
||||
@@ -22,6 +23,95 @@ from unittest.mock import patch, MagicMock
|
||||
ROOT = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
def _install_fake_pynput_if_missing():
|
||||
try:
|
||||
import pynput # noqa: F401
|
||||
return
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
class FakeKeyValue:
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __repr__(self):
|
||||
return f"Key.{self.name}"
|
||||
|
||||
def __hash__(self):
|
||||
return hash(("key", self.name))
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, FakeKeyValue) and self.name == other.name
|
||||
|
||||
class FakeKey:
|
||||
pass
|
||||
|
||||
for name in (
|
||||
"enter", "tab", "esc", "backspace", "delete", "space",
|
||||
"up", "down", "left", "right", "home", "end",
|
||||
"page_up", "page_down", "f1", "f2", "f3", "f4", "f5", "f6",
|
||||
"f7", "f8", "f9", "f10", "f11", "f12", "ctrl", "ctrl_l",
|
||||
"ctrl_r", "alt", "alt_l", "alt_r", "shift", "shift_l",
|
||||
"shift_r", "cmd", "insert", "print_screen", "caps_lock",
|
||||
"num_lock",
|
||||
):
|
||||
setattr(FakeKey, name, FakeKeyValue(name))
|
||||
|
||||
class FakeKeyCode:
|
||||
def __init__(self, char=None, vk=None):
|
||||
self.char = char
|
||||
self.vk = vk
|
||||
|
||||
@classmethod
|
||||
def from_char(cls, char):
|
||||
return cls(char=char)
|
||||
|
||||
@classmethod
|
||||
def from_vk(cls, vk):
|
||||
return cls(vk=vk)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(("keycode", self.char, self.vk))
|
||||
|
||||
def __eq__(self, other):
|
||||
return (
|
||||
isinstance(other, FakeKeyCode)
|
||||
and self.char == other.char
|
||||
and self.vk == other.vk
|
||||
)
|
||||
|
||||
class FakeController:
|
||||
def press(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
def release(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
def click(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
class FakeButton:
|
||||
left = "left"
|
||||
right = "right"
|
||||
|
||||
pynput = types.ModuleType("pynput")
|
||||
mouse = types.ModuleType("pynput.mouse")
|
||||
keyboard = types.ModuleType("pynput.keyboard")
|
||||
mouse.Button = FakeButton
|
||||
mouse.Controller = FakeController
|
||||
keyboard.Controller = FakeController
|
||||
keyboard.Key = FakeKey
|
||||
keyboard.KeyCode = FakeKeyCode
|
||||
pynput.mouse = mouse
|
||||
pynput.keyboard = keyboard
|
||||
sys.modules["pynput"] = pynput
|
||||
sys.modules["pynput.mouse"] = mouse
|
||||
sys.modules["pynput.keyboard"] = keyboard
|
||||
|
||||
|
||||
_install_fake_pynput_if_missing()
|
||||
|
||||
from agent_v0.agent_v1.core.executor import ActionExecutorV1 # noqa: E402
|
||||
|
||||
|
||||
@@ -184,6 +274,44 @@ class TestPostVerifyWindowTransition:
|
||||
expected_after="test – Bloc-notes",
|
||||
)
|
||||
|
||||
def test_enrich_target_context_marks_transition_and_generic_button(self):
|
||||
spec = ActionExecutorV1._enrich_target_context_from_action(
|
||||
{
|
||||
"expected_window_before": "*test – Bloc-notes",
|
||||
"expected_window_title": "Enregistrer sous",
|
||||
},
|
||||
{
|
||||
"by_text": "Enregistrer",
|
||||
"by_role": "button",
|
||||
"window_title": "*test – Bloc-notes",
|
||||
},
|
||||
)
|
||||
|
||||
hints = spec["context_hints"]
|
||||
assert hints["requires_window_transition"] is True
|
||||
assert hints["expected_window_before"] == "*test – Bloc-notes"
|
||||
assert hints["expected_window_after"] == "Enregistrer sous"
|
||||
assert hints["generic_button_text"] == "Enregistrer"
|
||||
assert hints["button_expected_after_window"] == "Enregistrer sous"
|
||||
|
||||
def test_enrich_target_context_keeps_same_window_non_transition(self):
|
||||
spec = ActionExecutorV1._enrich_target_context_from_action(
|
||||
{
|
||||
"expected_window_before": "*test – Bloc-notes",
|
||||
"expected_window_title": "test – Bloc-notes",
|
||||
},
|
||||
{
|
||||
"by_text": "test",
|
||||
"by_role": "tab",
|
||||
"window_title": "*test – Bloc-notes",
|
||||
},
|
||||
)
|
||||
|
||||
hints = spec["context_hints"]
|
||||
assert hints["expected_window_before"] == "*test – Bloc-notes"
|
||||
assert hints["expected_window_after"] == "test – Bloc-notes"
|
||||
assert "requires_window_transition" not in hints
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Routage de la garde dans verify_screen
|
||||
|
||||
@@ -44,3 +44,80 @@ def test_template_strategy_passes_fallback_coords_to_anchor_drift_guard():
|
||||
fallback_x_pct=0.708594,
|
||||
fallback_y_pct=0.35,
|
||||
)
|
||||
|
||||
|
||||
def test_server_explicit_reject_skips_local_text_fallback():
|
||||
executor = MagicMock()
|
||||
executor._server_resolve_target = MagicMock(
|
||||
return_value={
|
||||
"resolved": False,
|
||||
"method": "rejected_close_tab_zone_hybrid_text_direct",
|
||||
"reason": "close_tab_out_of_recorded_zone",
|
||||
"score": 0.8,
|
||||
}
|
||||
)
|
||||
executor._hybrid_vlm_resolve = MagicMock(
|
||||
return_value={
|
||||
"resolved": True,
|
||||
"x_pct": 0.1,
|
||||
"y_pct": 0.13,
|
||||
"method": "hybrid_text_direct",
|
||||
"score": 0.9,
|
||||
}
|
||||
)
|
||||
|
||||
engine = GroundingEngine(executor)
|
||||
engine._capture_window_or_screen = MagicMock(return_value="shot")
|
||||
|
||||
result = engine.locate(
|
||||
"http://server",
|
||||
{
|
||||
"by_text": "test",
|
||||
"context_hints": {"interaction": "close_tab"},
|
||||
"screen_scope": "full_screen",
|
||||
},
|
||||
fallback_x=0.7,
|
||||
fallback_y=0.04,
|
||||
screen_width=2560,
|
||||
screen_height=1600,
|
||||
)
|
||||
|
||||
assert result.found is False
|
||||
executor._hybrid_vlm_resolve.assert_not_called()
|
||||
|
||||
|
||||
def test_server_plain_not_found_allows_local_text_fallback():
|
||||
executor = MagicMock()
|
||||
executor._server_resolve_target = MagicMock(
|
||||
return_value={
|
||||
"resolved": False,
|
||||
"method": "server_no_match",
|
||||
"reason": "not_found",
|
||||
"score": 0.0,
|
||||
}
|
||||
)
|
||||
executor._hybrid_vlm_resolve = MagicMock(
|
||||
return_value={
|
||||
"resolved": True,
|
||||
"x_pct": 0.45,
|
||||
"y_pct": 0.5,
|
||||
"method": "hybrid_text_direct",
|
||||
"score": 0.9,
|
||||
}
|
||||
)
|
||||
|
||||
engine = GroundingEngine(executor)
|
||||
engine._capture_window_or_screen = MagicMock(return_value="shot")
|
||||
|
||||
result = engine.locate(
|
||||
"http://server",
|
||||
{"by_text": "Enregistrer", "screen_scope": "full_screen"},
|
||||
fallback_x=0.5,
|
||||
fallback_y=0.5,
|
||||
screen_width=1920,
|
||||
screen_height=1080,
|
||||
)
|
||||
|
||||
assert result.found is True
|
||||
assert result.method == "hybrid_text_direct"
|
||||
executor._hybrid_vlm_resolve.assert_called_once()
|
||||
|
||||
162
tests/unit/test_keyboard_system_keys.py
Normal file
162
tests/unit/test_keyboard_system_keys.py
Normal file
@@ -0,0 +1,162 @@
|
||||
import importlib
|
||||
import sys
|
||||
import types
|
||||
|
||||
|
||||
def _install_fake_pynput(monkeypatch):
|
||||
class FakeKey:
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __repr__(self):
|
||||
return f"Key.{self.name}"
|
||||
|
||||
for name in (
|
||||
"ctrl",
|
||||
"ctrl_l",
|
||||
"ctrl_r",
|
||||
"alt",
|
||||
"alt_l",
|
||||
"alt_r",
|
||||
"shift",
|
||||
"shift_l",
|
||||
"shift_r",
|
||||
"cmd",
|
||||
"cmd_l",
|
||||
"cmd_r",
|
||||
"esc",
|
||||
"enter",
|
||||
"tab",
|
||||
"space",
|
||||
"backspace",
|
||||
):
|
||||
setattr(FakeKey, name, FakeKey(name))
|
||||
|
||||
class FakeKeyCode:
|
||||
def __init__(self, char=None, vk=None):
|
||||
self.char = char
|
||||
self.vk = vk
|
||||
|
||||
pynput = types.ModuleType("pynput")
|
||||
mouse = types.ModuleType("pynput.mouse")
|
||||
keyboard = types.ModuleType("pynput.keyboard")
|
||||
|
||||
class FakeButton:
|
||||
pass
|
||||
|
||||
mouse.Button = FakeButton
|
||||
mouse.Listener = object
|
||||
keyboard.Key = FakeKey
|
||||
keyboard.KeyCode = FakeKeyCode
|
||||
keyboard.Listener = object
|
||||
pynput.mouse = mouse
|
||||
pynput.keyboard = keyboard
|
||||
|
||||
monkeypatch.setitem(sys.modules, "pynput", pynput)
|
||||
monkeypatch.setitem(sys.modules, "pynput.mouse", mouse)
|
||||
monkeypatch.setitem(sys.modules, "pynput.keyboard", keyboard)
|
||||
sys.modules.pop("agent_v0.agent_v1.core.captor", None)
|
||||
return FakeKey, FakeKeyCode
|
||||
|
||||
|
||||
def _load_captor(monkeypatch):
|
||||
fake_key, fake_key_code = _install_fake_pynput(monkeypatch)
|
||||
module = importlib.import_module("agent_v0.agent_v1.core.captor")
|
||||
return module, fake_key, fake_key_code
|
||||
|
||||
|
||||
def test_standalone_windows_key_is_emitted_on_release(monkeypatch):
|
||||
captor_module, key, _key_code = _load_captor(monkeypatch)
|
||||
events = []
|
||||
captor = captor_module.EventCaptorV1(events.append)
|
||||
captor._inject_screen_metadata = lambda _event: None
|
||||
|
||||
captor._on_press(key.cmd)
|
||||
assert events == []
|
||||
|
||||
captor._on_release(key.cmd)
|
||||
|
||||
assert [event["keys"] for event in events] == [["win"]]
|
||||
assert [raw["action"] for raw in events[0]["raw_keys"]] == ["press", "release"]
|
||||
assert "win" not in captor.modifiers
|
||||
|
||||
|
||||
def test_windows_shortcut_cancels_standalone_windows_key(monkeypatch):
|
||||
captor_module, key, key_code = _load_captor(monkeypatch)
|
||||
events = []
|
||||
captor = captor_module.EventCaptorV1(events.append)
|
||||
captor._inject_screen_metadata = lambda _event: None
|
||||
|
||||
captor._on_press(key.cmd)
|
||||
captor._on_press(key_code(char="s", vk=83))
|
||||
captor._on_release(key_code(char="s", vk=83))
|
||||
captor._on_release(key.cmd)
|
||||
|
||||
assert [event["keys"] for event in events] == [["win", "s"]]
|
||||
|
||||
|
||||
def test_release_only_windows_shortcut_is_inferred(monkeypatch):
|
||||
captor_module, key, key_code = _load_captor(monkeypatch)
|
||||
events = []
|
||||
captor = captor_module.EventCaptorV1(events.append)
|
||||
captor._inject_screen_metadata = lambda _event: None
|
||||
|
||||
# Windows/NoMachine can swallow press events for Win+S and only deliver
|
||||
# release('s') then release('cmd').
|
||||
captor._on_release(key_code(char="s", vk=83))
|
||||
captor._on_release(key.cmd)
|
||||
|
||||
assert [event["keys"] for event in events] == [["win", "s"]]
|
||||
assert [raw["action"] for raw in events[0]["raw_keys"]] == ["release", "release"]
|
||||
|
||||
|
||||
def test_escape_key_is_emitted_as_key_combo(monkeypatch):
|
||||
captor_module, key, _key_code = _load_captor(monkeypatch)
|
||||
events = []
|
||||
captor = captor_module.EventCaptorV1(events.append)
|
||||
captor._inject_screen_metadata = lambda _event: None
|
||||
|
||||
captor._on_press(key.esc)
|
||||
|
||||
assert [event["keys"] for event in events] == [["escape"]]
|
||||
|
||||
|
||||
def test_stream_processor_keeps_win_but_filters_other_modifiers():
|
||||
from agent_v0.server_v1.stream_processor import (
|
||||
_is_parasitic_event,
|
||||
_needs_post_wait,
|
||||
clean_compound_steps,
|
||||
clean_enriched_actions,
|
||||
)
|
||||
|
||||
assert _is_parasitic_event({"type": "key_combo", "keys": ["ctrl"]}) is True
|
||||
assert _is_parasitic_event({"type": "key_combo", "keys": ["win"]}) is False
|
||||
|
||||
assert clean_enriched_actions(
|
||||
[
|
||||
{"type": "key_combo", "keys": ["ctrl"]},
|
||||
{"type": "key_combo", "keys": ["win"]},
|
||||
]
|
||||
) == [{"type": "key_combo", "keys": ["win"]}]
|
||||
|
||||
assert clean_compound_steps(
|
||||
[
|
||||
{"type": "key_combo", "keys": ["shift"]},
|
||||
{"type": "key_combo", "keys": ["win"]},
|
||||
]
|
||||
) == [{"type": "key_combo", "keys": ["win"]}]
|
||||
|
||||
assert _needs_post_wait({"type": "key_combo", "keys": ["win"]}) >= 1500
|
||||
assert _needs_post_wait({"type": "key_combo", "keys": ["win", "s"]}) >= 1500
|
||||
assert _needs_post_wait({"type": "key_combo", "keys": ["escape"]}) >= 500
|
||||
|
||||
|
||||
def test_streamer_prioritizes_real_captor_event_types():
|
||||
from agent_v0.agent_v1.network.streamer import TraceStreamer
|
||||
|
||||
streamer = TraceStreamer("sess_keyboard_priority")
|
||||
|
||||
assert streamer._is_priority_item("event", {"type": "key_combo"}) is True
|
||||
assert streamer._is_priority_item("event", {"type": "text_input"}) is True
|
||||
assert streamer._is_priority_item("event", {"type": "mouse_click"}) is True
|
||||
assert streamer._is_priority_item("event", {"type": "heartbeat"}) is False
|
||||
280
tests/unit/test_lea_message_contract.py
Normal file
280
tests/unit/test_lea_message_contract.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""Tests du contrat de messages humains pour Lea."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from agent_v0.agent_v1.ui.message_contract import (
|
||||
MAX_FIELD_CHARS,
|
||||
MessageContractError,
|
||||
coerce_supervised_pause_message,
|
||||
format_supervised_pause_from_mapping,
|
||||
format_supervised_pause_message,
|
||||
validate_supervised_pause_message,
|
||||
validate_visible_message,
|
||||
warn_visible_message,
|
||||
)
|
||||
|
||||
|
||||
def _valid_pause(**overrides: str) -> str:
|
||||
fields = {
|
||||
"intention": "ouvrir le dossier patient dans Aiva Urgence",
|
||||
"attendu": "voir la fiche du patient ouverte avec la liste des passages",
|
||||
"vu": "la page d'accueil Aiva Urgence sans le dossier patient",
|
||||
"demande": "ouvrir le dossier patient puis me rendre la main",
|
||||
}
|
||||
fields.update(overrides)
|
||||
return format_supervised_pause_message(**fields)
|
||||
|
||||
|
||||
def _raw_pause(**overrides: str) -> str:
|
||||
fields = {
|
||||
"intention": "ouvrir le dossier patient dans Aiva Urgence",
|
||||
"attendu": "voir la fiche du patient ouverte avec la liste des passages",
|
||||
"vu": "la page d'accueil Aiva Urgence sans le dossier patient",
|
||||
"demande": "ouvrir le dossier patient puis me rendre la main",
|
||||
}
|
||||
fields.update(overrides)
|
||||
return "\n".join(
|
||||
[
|
||||
f"J'essaie de : {fields['intention']}",
|
||||
f"J'attendais : {fields['attendu']}",
|
||||
f"Je vois : {fields['vu']}",
|
||||
f"Peux-tu : {fields['demande']}",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _issue_codes(message: str) -> set[str]:
|
||||
return {issue.code for issue in validate_supervised_pause_message(message).issues}
|
||||
|
||||
|
||||
def test_format_supervised_pause_has_exact_four_field_structure():
|
||||
message = _valid_pause()
|
||||
|
||||
assert message.splitlines() == [
|
||||
"J'essaie de : ouvrir le dossier patient dans Aiva Urgence",
|
||||
"J'attendais : voir la fiche du patient ouverte avec la liste des passages",
|
||||
"Je vois : la page d'accueil Aiva Urgence sans le dossier patient",
|
||||
"Peux-tu : ouvrir le dossier patient puis me rendre la main",
|
||||
]
|
||||
assert validate_supervised_pause_message(message).valid
|
||||
|
||||
|
||||
def test_format_from_mapping_accepts_runtime_aliases():
|
||||
message = format_supervised_pause_from_mapping(
|
||||
{
|
||||
"trying_to": "selectionner le passage aux urgences",
|
||||
"expected": "voir le formulaire de codage du passage",
|
||||
"observed": "la liste des passages reste affichee",
|
||||
"request": "selectionner le bon passage puis me rendre la main",
|
||||
}
|
||||
)
|
||||
|
||||
assert "J'essaie de : selectionner le passage aux urgences" in message
|
||||
assert validate_supervised_pause_message(message).valid
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"bad_phrase",
|
||||
[
|
||||
"un element",
|
||||
"un élément",
|
||||
"cette action",
|
||||
"Validation requise",
|
||||
"cible inconnue",
|
||||
],
|
||||
)
|
||||
def test_blacklist_refuses_generic_formulations(bad_phrase):
|
||||
message = _raw_pause(vu=f"je vois {bad_phrase}")
|
||||
|
||||
result = validate_supervised_pause_message(message)
|
||||
|
||||
assert not result.valid
|
||||
assert "generic_phrase" in {issue.code for issue in result.issues}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"technical_text",
|
||||
[
|
||||
"action_click_12ab34",
|
||||
"replay_9f8e7d6c",
|
||||
"session_id",
|
||||
"target_spec.by_text",
|
||||
"550e8400-e29b-41d4-a716-446655440000",
|
||||
"a3f6c9d8e1b24567",
|
||||
],
|
||||
)
|
||||
def test_refuses_raw_technical_identifiers(technical_text):
|
||||
message = _raw_pause(attendu=f"voir le dossier patient apres {technical_text}")
|
||||
|
||||
assert "technical_identifier" in _issue_codes(message) or "technical_field" in _issue_codes(message)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"technical_text",
|
||||
[
|
||||
"(123, 456)",
|
||||
"x=120 y=340",
|
||||
"340px",
|
||||
"score=0.87",
|
||||
"confidence=0.91",
|
||||
"similarité=0.42",
|
||||
],
|
||||
)
|
||||
def test_refuses_pixels_and_raw_scores(technical_text):
|
||||
message = _raw_pause(vu=f"la page Aiva avec {technical_text}")
|
||||
|
||||
codes = _issue_codes(message)
|
||||
|
||||
assert "raw_coordinates" in codes or "raw_score" in codes
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"technical_english",
|
||||
[
|
||||
"target_not_found",
|
||||
"no_screen_change",
|
||||
"wrong_window",
|
||||
"validation required",
|
||||
"retry",
|
||||
"screenshot",
|
||||
],
|
||||
)
|
||||
def test_refuses_technical_english(technical_english):
|
||||
message = _raw_pause(vu=f"le message {technical_english} est affiche")
|
||||
|
||||
assert "technical_english" in _issue_codes(message)
|
||||
|
||||
|
||||
def test_refuses_raw_english_instruction():
|
||||
message = _raw_pause(demande="please click the target button")
|
||||
|
||||
codes = _issue_codes(message)
|
||||
|
||||
assert "technical_english" in codes
|
||||
assert "not_actionable" in codes
|
||||
|
||||
|
||||
def test_refuses_messages_without_four_required_lines():
|
||||
result = validate_supervised_pause_message("Je ne trouve pas le dossier patient.")
|
||||
|
||||
assert not result.valid
|
||||
assert "invalid_structure" in {issue.code for issue in result.issues}
|
||||
|
||||
|
||||
def test_refuses_wrong_label_order():
|
||||
message = "\n".join(
|
||||
[
|
||||
"J'attendais : voir la fiche patient",
|
||||
"J'essaie de : ouvrir le dossier patient",
|
||||
"Je vois : la page d'accueil",
|
||||
"Peux-tu : ouvrir le dossier puis me rendre la main",
|
||||
]
|
||||
)
|
||||
|
||||
assert "invalid_structure" in _issue_codes(message)
|
||||
|
||||
|
||||
def test_demande_must_be_actionable_in_french():
|
||||
message = "\n".join(
|
||||
[
|
||||
"J'essaie de : ouvrir le dossier patient",
|
||||
"J'attendais : voir la fiche patient ouverte",
|
||||
"Je vois : la page d'accueil Aiva Urgence",
|
||||
"Peux-tu : merci beaucoup",
|
||||
]
|
||||
)
|
||||
|
||||
assert "not_actionable" in _issue_codes(message)
|
||||
|
||||
|
||||
def test_visible_message_validator_accepts_clear_french_actionable_text():
|
||||
message = (
|
||||
"Je ne trouve pas le dossier patient dans Aiva Urgence. "
|
||||
"Peux-tu ouvrir le dossier puis me rendre la main ?"
|
||||
)
|
||||
|
||||
assert validate_visible_message(message).valid
|
||||
|
||||
|
||||
def test_formatter_raises_instead_of_emitting_generic_message():
|
||||
with pytest.raises(MessageContractError):
|
||||
format_supervised_pause_message(
|
||||
intention="faire cette action",
|
||||
attendu="validation requise",
|
||||
vu="un element",
|
||||
demande="corriger",
|
||||
)
|
||||
|
||||
|
||||
def test_formatter_raises_on_too_short_request():
|
||||
with pytest.raises(MessageContractError):
|
||||
format_supervised_pause_message(
|
||||
intention="ouvrir le dossier patient dans Aiva Urgence",
|
||||
attendu="voir la fiche du patient ouverte",
|
||||
vu="la page d'accueil Aiva Urgence",
|
||||
demande="corriger",
|
||||
)
|
||||
|
||||
|
||||
def test_coerce_turns_legacy_validation_required_into_structured_pause():
|
||||
message = coerce_supervised_pause_message("Validation requise")
|
||||
|
||||
assert validate_supervised_pause_message(message).valid
|
||||
assert "Validation requise" not in message
|
||||
assert message.splitlines()[0].startswith("J'essaie de :")
|
||||
|
||||
|
||||
def test_coerce_keeps_clear_legacy_request_as_demande():
|
||||
message = coerce_supervised_pause_message(
|
||||
"Valider le dossier patient avant enregistrement",
|
||||
intention="enregistrer le dossier patient",
|
||||
attendu="avoir ton accord avant l'enregistrement",
|
||||
vu="le formulaire patient est pret a etre enregistre",
|
||||
)
|
||||
|
||||
assert validate_supervised_pause_message(message).valid
|
||||
assert "Valider le dossier patient avant enregistrement" in message
|
||||
|
||||
|
||||
def test_warn_visible_message_logs_without_modifying_message(caplog):
|
||||
raw = "Validation requise"
|
||||
|
||||
returned = warn_visible_message(raw, source="unit.raw")
|
||||
|
||||
assert returned == raw
|
||||
assert "invalid_message source=unit.raw" in caplog.text
|
||||
assert "generic_phrase" in caplog.text
|
||||
|
||||
|
||||
def test_warn_visible_message_accepts_supervised_pause_without_log(caplog):
|
||||
message = _valid_pause()
|
||||
|
||||
returned = warn_visible_message(
|
||||
message,
|
||||
source="unit.final",
|
||||
supervised_pause=True,
|
||||
)
|
||||
|
||||
assert returned == message
|
||||
assert "invalid_message" not in caplog.text
|
||||
|
||||
|
||||
def test_refuses_overlong_fields_and_messages():
|
||||
long_field = "ouvrir " + ("le dossier patient " * 45)
|
||||
assert len(long_field) > MAX_FIELD_CHARS
|
||||
|
||||
message = "\n".join(
|
||||
[
|
||||
f"J'essaie de : {long_field}",
|
||||
"J'attendais : voir la fiche patient ouverte",
|
||||
"Je vois : la page d'accueil Aiva Urgence",
|
||||
"Peux-tu : ouvrir le dossier patient puis me rendre la main",
|
||||
]
|
||||
)
|
||||
|
||||
codes = _issue_codes(message)
|
||||
|
||||
assert "field_too_long" in codes
|
||||
assert "message_too_long" in codes
|
||||
109
tests/unit/test_lea_micro_preflight.py
Normal file
109
tests/unit/test_lea_micro_preflight.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from tools import lea_micro_preflight as preflight
|
||||
|
||||
|
||||
FREE_OUTPUT = """\
|
||||
total used free shared buff/cache available
|
||||
Mem: 64202 15500 32000 123 16702 47000
|
||||
Swap: 8192 1024 7168
|
||||
"""
|
||||
|
||||
|
||||
def test_parse_free_m_extracts_ram_and_swap():
|
||||
parsed = preflight.parse_free_m(FREE_OUTPUT)
|
||||
|
||||
assert parsed["mem"]["total"] == 64202
|
||||
assert parsed["mem"]["available"] == 47000
|
||||
assert parsed["swap"] == {"total": 8192, "used": 1024, "free": 7168}
|
||||
|
||||
|
||||
def test_parse_free_m_accepts_french_locale_labels():
|
||||
output = """\
|
||||
total utilisé libre partagé tamp/cache disponible
|
||||
Mem: 126365 60425 2919 12847 77071 65939
|
||||
Échange: 8191 3397 4794
|
||||
"""
|
||||
|
||||
parsed = preflight.parse_free_m(output)
|
||||
|
||||
assert parsed["mem"]["used"] == 60425
|
||||
assert parsed["mem"]["available"] == 65939
|
||||
assert parsed["swap"] == {"total": 8191, "used": 3397, "free": 4794}
|
||||
|
||||
|
||||
def test_parse_nvidia_smi_memory_multiple_gpus():
|
||||
parsed = preflight.parse_nvidia_smi_memory("8123, 24576\n3999 MiB, 12288 MiB\n")
|
||||
|
||||
assert parsed == [
|
||||
{"free_mib": 8123, "total_mib": 24576},
|
||||
{"free_mib": 3999, "total_mib": 12288},
|
||||
]
|
||||
|
||||
|
||||
def test_extract_ollama_tags_accepts_name_and_model_keys():
|
||||
tags = preflight.extract_ollama_tags(
|
||||
{
|
||||
"models": [
|
||||
{"name": "qwen2.5vl:7b-rpa"},
|
||||
{"model": "qwen2.5:7b"},
|
||||
{"name": ""},
|
||||
"ignored",
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
assert tags == {"qwen2.5vl:7b-rpa", "qwen2.5:7b"}
|
||||
|
||||
|
||||
def _install_fakes(monkeypatch, *, resident=True, tags_ok=True, swap_used=1024):
|
||||
free_output = FREE_OUTPUT.replace("1024", str(swap_used), 1)
|
||||
|
||||
def fake_run_command(args, timeout=5.0):
|
||||
if args[0] == "nvidia-smi":
|
||||
return 0, "8123, 24576", ""
|
||||
if args[0] == "free":
|
||||
return 0, free_output, ""
|
||||
raise AssertionError(f"unexpected command: {args!r}")
|
||||
|
||||
def fake_http_json(url, timeout=2.0):
|
||||
if url.endswith("/api/tags"):
|
||||
models = [{"name": "qwen2.5vl:7b-rpa"}]
|
||||
if tags_ok:
|
||||
models.append({"name": "qwen2.5:7b"})
|
||||
return True, {"models": models}, ""
|
||||
if url.endswith("/api/ps"):
|
||||
models = [{"name": "qwen2.5vl:7b-rpa"}] if resident else []
|
||||
return True, {"models": models}, ""
|
||||
raise AssertionError(f"unexpected url: {url!r}")
|
||||
|
||||
monkeypatch.setattr(preflight, "run_command", fake_run_command)
|
||||
monkeypatch.setattr(preflight, "http_json", fake_http_json)
|
||||
|
||||
|
||||
def test_main_returns_zero_when_all_checks_ok(monkeypatch, capsys):
|
||||
_install_fakes(monkeypatch)
|
||||
|
||||
assert preflight.main(["--json"]) == 0
|
||||
report = json.loads(capsys.readouterr().out)
|
||||
assert report["overall"] == "ok"
|
||||
assert report["warmup"] == "disabled"
|
||||
|
||||
|
||||
def test_main_warns_when_vlm_not_resident_and_strict_exits_one(monkeypatch):
|
||||
_install_fakes(monkeypatch, resident=False)
|
||||
|
||||
assert preflight.main([]) == 0
|
||||
assert preflight.main(["--strict"]) == 1
|
||||
|
||||
|
||||
def test_main_fails_when_required_model_missing(monkeypatch):
|
||||
_install_fakes(monkeypatch, tags_ok=False)
|
||||
|
||||
assert preflight.main([]) == 2
|
||||
@@ -88,9 +88,9 @@ class TestExtraction:
|
||||
assert _nettoyer_description_cible(None) == ""
|
||||
|
||||
def test_nettoyer_description_tronque(self):
|
||||
longue = "x" * 200
|
||||
longue = "x" * 1100
|
||||
resultat = _nettoyer_description_cible(longue)
|
||||
assert len(resultat) <= 80
|
||||
assert len(resultat) <= 1024
|
||||
assert resultat.endswith("...")
|
||||
|
||||
|
||||
@@ -345,9 +345,10 @@ class TestFormatterErreurGenerique:
|
||||
assert msg.niveau == NiveauMessage.ATTENTION
|
||||
|
||||
def test_message_inconnu_tronque(self):
|
||||
long_msg = "erreur très longue " * 20
|
||||
long_msg = "erreur très longue " * 80
|
||||
msg = formatter_erreur_generique(long_msg)
|
||||
assert len(msg.corps) <= 200 # tronqué avec "..."
|
||||
assert len(msg.corps) <= len("J'ai rencontré un souci : ") + 1024
|
||||
assert msg.corps.endswith("...")
|
||||
|
||||
def test_pas_de_code_technique_dans_message_utilisateur(self):
|
||||
"""Les messages présentés à l'utilisateur ne doivent pas contenir de
|
||||
|
||||
Reference in New Issue
Block a user