Files
rpa_vision_v3/docs/superpowers/plans/2026-05-05-qw-suite-mai.md
Dom 5543e25f9d
Some checks failed
tests / Lint (ruff + black) (push) Successful in 18s
tests / Tests unitaires (sans GPU) (push) Failing after 17s
tests / Tests sécurité (critique) (push) Has been skipped
docs(qw): plan d'implémentation QW suite mai 2026 (~30 tasks bite-sized TDD)
Plan d'exécution détaillé pour le sprint QW1+QW2+QW4 :
- Section 0 (preflight) : backup branche+tag Gitea, baseline E2E, smoke démo
- Section 1 (QW1 multi-écrans) : tests + monitor_router + input_handler + Agent V1
- Section 2 (QW2 LoopDetector) : tests + module + hooks api_stream/replay_engine
- Section 3 (QW4 safety_checks) : tests + provider + endpoint + frontend VWB
- Section 4 (docs) : QW_SUITE_MAI.md + maj MEMORY

Chaque task = 4-7 steps de 2-5 min, code complet par step (modules nouveaux),
diffs ciblés (modifs ciblées), commands exactes avec output attendu.

Discipline TDD légère : test rouge → implem → test vert → re-run baseline → commit.

Référence spec : docs/superpowers/specs/2026-05-05-qw-suite-mai-design.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 22:34:13 +02:00

88 KiB
Raw Permalink Blame History

QW Suite Mai — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Livrer 3 quick wins (QW1 multi-écrans, QW2 LoopDetector, QW4 safety_checks hybrides) sur la branche feature/qw-suite-mai avant et autour de la démo GHT, en TDD léger, sans régression sur l'existant.

Architecture: Trois modules serveur isolés (monitor_router.py, loop_detector.py, safety_checks_provider.py) plus extension du DSL pause_for_human, plus enrichissement client Agent V1 (capture multi-écrans) et frontend VWB (PauseDialog + extension PropertiesPanel). Tout backward compatible, kill-switches env vars sur QW2 et QW4.

Tech Stack: Python 3.12 (FastAPI/Uvicorn serveur), Ollama local (medgemma:4b pour safety_checks), CLIP embedder déjà chargé (réutilisé pour LoopDetector signal A), mss + screeninfo pour capture multi-écrans, React + Vite + TypeScript pour VWB frontend, pytest pour tests serveur, test manuel cadré pour frontend.

Spec source: docs/superpowers/specs/2026-05-05-qw-suite-mai-design.md


File Structure

Fichier Action Responsabilité
agent_v0/server_v1/monitor_router.py Créer Résolution écran cible (QW1)
tests/unit/test_monitor_router.py Créer Tests unitaires QW1 routeur
tests/integration/test_grounding_offset.py Créer Tests offsets QW1
core/execution/input_handler.py Modifier Capture par monitor + offsets propagés (QW1)
agent_v0/agent_v1/vision/capturer.py Modifier Enrichissement events monitor_index + monitors_geometry (QW1 client)
agent_v0/deploy/windows_client/agent_v1/vision/capturer.py Modifier Idem (copie déployée Windows)
agent_v0/server_v1/loop_detector.py Créer Détecteur de boucles composite (QW2)
tests/unit/test_loop_detector.py Créer Tests unitaires QW2
tests/integration/test_loop_detector_replay.py Créer Tests intégration QW2
agent_v0/server_v1/replay_engine.py Modifier Extension _create_replay_state (QW2) + hook pause_for_human (QW4)
agent_v0/server_v1/api_stream.py Modifier Hook routeur (QW1) + hook loop_detector (QW2) + extension /replay/resume (QW4)
agent_v0/server_v1/safety_checks_provider.py Créer Provider hybride déclaratif + LLM contextuel (QW4)
tests/unit/test_safety_checks_provider.py Créer Tests unitaires QW4
tests/integration/test_replay_resume_acknowledgments.py Créer Tests intégration QW4
visual_workflow_builder/frontend_v4/src/types.ts Modifier Extension types PauseAction.parameters + Execution (QW4)
visual_workflow_builder/frontend_v4/src/components/PauseDialog.tsx Créer Composant pause + ChecklistPanel (QW4 UX)
visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx Modifier Éditeur safety_level + safety_checks (QW4)
docs/QW_SUITE_MAI.md Créer Doc de livraison synthétique
MEMORY.md (~/.claude/projects/.../) Modifier Lien vers docs QW
.qw-baseline.log Créer Log baseline E2E (gitignored)

Section 0 — Preflight & Baseline

Task 1: Créer la branche backup et la pousser sur Gitea

Files:

  • Pas de modif fichier, opération git pure

  • Step 1: Créer le tag de backup en local

git tag -a backup-pre-qw-suite-mai-2026-05-05 -m "Backup avant sprint QW suite mai 2026 (multi-écrans + LoopDetector + safety_checks)"
  • Step 2: Créer la branche backup depuis HEAD (état actuel feature/qw-suite-mai juste après le commit du spec)
git branch backup/pre-qw-suite-mai-2026-05-05
  • Step 3: Pousser la branche et le tag sur Gitea
git push gitea backup/pre-qw-suite-mai-2026-05-05
git push gitea backup-pre-qw-suite-mai-2026-05-05

Expected output : * [new branch] backup/pre-qw-suite-mai-2026-05-05 -> backup/pre-qw-suite-mai-2026-05-05 et idem pour le tag.

  • Step 4: Vérifier la présence sur Gitea
git ls-remote gitea | grep -E "(backup/pre-qw|backup-pre-qw)"

Expected : 2 lignes (la branche et le tag).

Task 2: Capturer la baseline E2E avant toute modification

Files:

  • Create: .qw-baseline.log (gitignored, à ajouter au .gitignore si absent)

  • Step 1: Vérifier que .qw-baseline.log est gitignored

grep -E "^\.qw-baseline\.log" .gitignore || echo ".qw-baseline.log" >> .gitignore
  • Step 2: Activer le venv et lancer la suite référence
source venv_v3/bin/activate
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       -q 2>&1 | tee .qw-baseline.log

Expected : log avec ligne finale du type XXX passed in YY.YYs ou un mix passed/failed/skipped. Ce log devient la référence absolue pour la non-régression.

  • Step 3: Extraire le compteur final pour comparaison rapide future
tail -3 .qw-baseline.log

Noter mentalement (ou copier dans un commentaire de PR) : nombre de passed / failed / skipped.

Task 3: Smoke démo workflow Easily Assure existant

Files: aucun (test manuel observable)

  • Step 1: Démarrer la stack complète si pas déjà active
./svc.sh status
# Si streaming/vwb-backend/vwb-frontend KO :
./svc.sh start
  • Step 2: Ouvrir VWB dans le navigateur

URL : http://localhost:3002 ou via reverse proxy https://vwb.labs.laurinebazin.design

  • Step 3: Sélectionner un workflow existant validé sur Easily Assure

Choisir un workflow déjà démontré le 30/04 (cf. mémoire reference_demo_ght_mockup.md). Idéalement un dossier UHCD complet.

  • Step 4: Lancer le replay et observer

Cliquer "→ Windows" pour exécuter sur Agent V1. Vérifier que le replay déroule jusqu'au bout sans erreur visible (clics au bon endroit, formulaires remplis).

  • Step 5: Archiver une capture de l'état final dans /tmp
# Capture de l'écran final si possible (sur la machine cible)
# Sinon : noter dans .qw-baseline.log la date du smoke et l'observation
echo "smoke_easily_assure: OK ($(date -Iseconds))" >> .qw-baseline.log

Task 4: Vérifier l'état du frontend VWB (état "tout va bien")

Files: aucun

  • Step 1: Charger un workflow dans VWB

Dans le navigateur : ouvrir un workflow existant qui contient au moins une action pause_for_human (cf. types.ts:46 et PropertiesPanel.tsx:1356).

  • Step 2: Cliquer sur l'action pause dans le canvas → vérifier l'éditeur de propriétés

Le PropertiesPanel doit montrer le champ message éditable. Si l'éditeur s'ouvre et qu'on peut taper, c'est OK.

  • Step 3: Capture d'écran "VWB OK"

Garder cette capture comme référence visuelle. Sera comparée après commit QW4 pour vérifier zéro régression UI.


Section 1 — QW1 Multi-écrans

Task 5: Tests unitaires test_monitor_router.py (rouges)

Files:

  • Create: tests/unit/test_monitor_router.py

  • Step 1: Créer le fichier de tests avec les 4 cas

# tests/unit/test_monitor_router.py
"""Tests unitaires pour MonitorRouter (QW1)."""
import pytest

from agent_v0.server_v1.monitor_router import resolve_target_monitor, MonitorTarget


# Geometry de référence pour les 3 tests : 2 écrans côte à côte
TWO_MONITORS = [
    {"idx": 0, "x": 0, "y": 0, "w": 1920, "h": 1080, "primary": True},
    {"idx": 1, "x": 1920, "y": 0, "w": 1920, "h": 1080, "primary": False},
]


def test_resolve_uses_action_monitor_index_when_present():
    """Si action.monitor_index présent et valide → cible cet écran."""
    action = {"monitor_index": 1}
    session_state = {"monitors_geometry": TWO_MONITORS, "last_focused_monitor": 0}
    result = resolve_target_monitor(action, session_state)
    assert result.idx == 1
    assert result.offset_x == 1920
    assert result.offset_y == 0
    assert result.source == "action"


def test_resolve_falls_back_to_focused_monitor_when_action_missing():
    """Si action.monitor_index absent → fallback focus actif."""
    action = {}  # pas de monitor_index
    session_state = {"monitors_geometry": TWO_MONITORS, "last_focused_monitor": 1}
    result = resolve_target_monitor(action, session_state)
    assert result.idx == 1
    assert result.source == "focus"


def test_resolve_falls_back_to_composite_when_geometry_empty():
    """Si geometry vide (vieux Agent V1) → fallback composite (idx=-1, offset=0)."""
    action = {}
    session_state = {"monitors_geometry": [], "last_focused_monitor": None}
    result = resolve_target_monitor(action, session_state)
    assert result.source == "composite_fallback"
    assert result.offset_x == 0
    assert result.offset_y == 0


def test_resolve_falls_back_when_action_index_out_of_range():
    """Si action.monitor_index hors limites (écran débranché) → fallback focus."""
    action = {"monitor_index": 5}  # n'existe pas
    session_state = {"monitors_geometry": TWO_MONITORS, "last_focused_monitor": 0}
    result = resolve_target_monitor(action, session_state)
    assert result.idx == 0
    assert result.source == "focus"
  • Step 2: Run pour vérifier qu'ils échouent
pytest tests/unit/test_monitor_router.py -v

Expected : ImportError: cannot import name 'resolve_target_monitor' from 'agent_v0.server_v1.monitor_router' ou ModuleNotFoundError.

Task 6: Implémenter monitor_router.py

Files:

  • Create: agent_v0/server_v1/monitor_router.py

  • Step 1: Écrire le module complet

# agent_v0/server_v1/monitor_router.py
"""MonitorRouter — résolution de l'écran cible pour le replay (QW1).

Stratégie en cascade :
1. action.monitor_index (hérité de la session source) → cible cet écran
2. session.last_focused_monitor (focus actif vu en dernier heartbeat) → fallback
3. composite (offset 0, 0) → backward compat

Émet sur le bus lea:* l'event monitor_routed avec la source de la décision.
"""

from dataclasses import dataclass
from typing import Any, Dict, List, Optional


@dataclass
class MonitorTarget:
    """Représente l'écran cible résolu pour une action de replay."""
    idx: int
    offset_x: int
    offset_y: int
    w: int
    h: int
    source: str  # "action" | "focus" | "composite_fallback"


_COMPOSITE_FALLBACK = MonitorTarget(
    idx=-1,
    offset_x=0,
    offset_y=0,
    w=0,
    h=0,
    source="composite_fallback",
)


def _find_monitor(geometry: List[Dict[str, Any]], idx: int) -> Optional[Dict[str, Any]]:
    """Retourne le monitor d'index donné, ou None si absent."""
    for m in geometry:
        if m.get("idx") == idx:
            return m
    return None


def _to_target(monitor: Dict[str, Any], source: str) -> MonitorTarget:
    return MonitorTarget(
        idx=int(monitor["idx"]),
        offset_x=int(monitor.get("x", 0)),
        offset_y=int(monitor.get("y", 0)),
        w=int(monitor.get("w", 0)),
        h=int(monitor.get("h", 0)),
        source=source,
    )


def resolve_target_monitor(
    action: Dict[str, Any],
    session_state: Dict[str, Any],
) -> MonitorTarget:
    """Résout l'écran cible d'une action de replay.

    Args:
        action: Dict de l'action (peut contenir `monitor_index`).
        session_state: État de la session (doit contenir `monitors_geometry`
            et `last_focused_monitor`).

    Returns:
        MonitorTarget avec l'offset à appliquer aux coordonnées de grounding.
    """
    geometry: List[Dict[str, Any]] = session_state.get("monitors_geometry") or []

    # 1. Cible explicite via action
    explicit_idx = action.get("monitor_index")
    if explicit_idx is not None and geometry:
        m = _find_monitor(geometry, int(explicit_idx))
        if m is not None:
            return _to_target(m, source="action")
        # Index invalide → on tombe sur le fallback focus

    # 2. Fallback focus actif
    focused_idx = session_state.get("last_focused_monitor")
    if focused_idx is not None and geometry:
        m = _find_monitor(geometry, int(focused_idx))
        if m is not None:
            return _to_target(m, source="focus")

    # 3. Fallback composite (backward compat — comportement actuel mss.monitors[0])
    return _COMPOSITE_FALLBACK
  • Step 2: Re-run les tests, ils doivent tous passer
pytest tests/unit/test_monitor_router.py -v

Expected : 4 passed.

  • Step 3: Commit
git add agent_v0/server_v1/monitor_router.py tests/unit/test_monitor_router.py
git commit -m "$(cat <<'EOF'
feat(qw1): MonitorRouter — résolution de l'écran cible pour le replay

Module isolé qui choisit l'écran cible avec stratégie en cascade :
1. action.monitor_index (session source) → cible explicite
2. session.last_focused_monitor → fallback focus actif
3. composite (offset 0,0) → backward compat (comportement actuel)

Backward 100% : actions sans monitor_index → fallback composite identique
au comportement mss.monitors[0] actuel.

Tests : 4 cas (cible OK, fallback focus, fallback composite, index invalide).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 7: Tests intégration test_grounding_offset.py (rouges)

Files:

  • Create: tests/integration/test_grounding_offset.py

  • Step 1: Créer le fichier de tests

# tests/integration/test_grounding_offset.py
"""Tests intégration pour la propagation d'offset multi-écrans (QW1)."""
import pytest
from unittest.mock import patch, MagicMock

from core.execution import input_handler


@pytest.fixture
def mock_screen():
    """Mock une capture mss : retourne un PIL Image factice + offsets."""
    from PIL import Image
    img = Image.new("RGB", (1920, 1080), color="white")
    return img


def test_capture_screen_default_returns_composite_when_no_idx(mock_screen):
    """_capture_screen() sans monitor_idx → composite, offset (0, 0)."""
    with patch("core.execution.input_handler.mss") as mock_mss:
        ctx = mock_mss.mss.return_value.__enter__.return_value
        ctx.monitors = [{"left": 0, "top": 0, "width": 3840, "height": 1080}]
        ctx.grab.return_value = MagicMock(size=(3840, 1080), bgra=b"\x00" * (3840 * 1080 * 4))
        with patch("core.execution.input_handler.PILImage.frombytes", return_value=mock_screen):
            screen, w, h, ox, oy = input_handler._capture_screen()
    assert (w, h, ox, oy) == (3840, 1080, 0, 0)


def test_capture_screen_targets_specific_monitor_with_offset(mock_screen):
    """_capture_screen(monitor_idx=1) → cible monitors[2] (mss skip [0]), offset = monitor.left."""
    with patch("core.execution.input_handler.mss") as mock_mss:
        ctx = mock_mss.mss.return_value.__enter__.return_value
        # mss layout : [0]=composite, [1]=primary, [2]=secondary
        ctx.monitors = [
            {"left": 0, "top": 0, "width": 3840, "height": 1080},
            {"left": 0, "top": 0, "width": 1920, "height": 1080},
            {"left": 1920, "top": 0, "width": 1920, "height": 1080},
        ]
        ctx.grab.return_value = MagicMock(size=(1920, 1080), bgra=b"\x00" * (1920 * 1080 * 4))
        with patch("core.execution.input_handler.PILImage.frombytes", return_value=mock_screen):
            screen, w, h, ox, oy = input_handler._capture_screen(monitor_idx=1)
    assert (w, h, ox, oy) == (1920, 1080, 1920, 0)
  • Step 2: Run pour vérifier qu'ils échouent
pytest tests/integration/test_grounding_offset.py -v

Expected : TypeError: _capture_screen() got an unexpected keyword argument 'monitor_idx' ou similaire (la signature actuelle est sans paramètre).

Task 8: Modifier input_handler.py — capture par monitor + propagation offsets

Files:

  • Modify: core/execution/input_handler.py:416-429 (_capture_screen)

  • Modify: core/execution/input_handler.py:432-512 (_grounding_ocr)

  • Modify: core/execution/input_handler.py:515-579 (_grounding_ui_tars)

  • Modify: core/execution/input_handler.py:629-684 (_grounding_vlm)

  • Step 1: Importer PIL.Image avec alias en haut du fichier (si pas déjà)

Vérifier que from PIL import Image as PILImage est importé au top-level (sinon l'ajouter en remplacement de l'import lazy actuel dans _capture_screen).

  • Step 2: Réécrire _capture_screen pour accepter monitor_idx

Remplacer la fonction _capture_screen (lignes 416-429) par :

def _capture_screen(monitor_idx=None):
    """Capture l'écran et retourne (PIL.Image, width, height, offset_x, offset_y).

    Args:
        monitor_idx: Index logique 0..N-1 du monitor à capturer (cf. screeninfo).
            Si None : capture composite (mss.monitors[0]) — comportement legacy.

    Returns:
        (image, w, h, offset_x, offset_y). offset = (0,0) en mode composite.
    """
    try:
        import mss
        from PIL import Image as PILImage

        with mss.mss() as sct:
            if monitor_idx is None:
                # Comportement actuel : composite tous écrans
                monitor = sct.monitors[0]
                offset_x, offset_y = 0, 0
            else:
                # mss skip monitors[0] (composite). Index logique 0 → mss.monitors[1].
                mss_idx = int(monitor_idx) + 1
                if mss_idx >= len(sct.monitors):
                    logger.warning(
                        "mss.monitors[%d] hors limites (n=%d) — fallback composite",
                        mss_idx, len(sct.monitors),
                    )
                    monitor = sct.monitors[0]
                    offset_x, offset_y = 0, 0
                else:
                    monitor = sct.monitors[mss_idx]
                    offset_x = int(monitor.get("left", 0))
                    offset_y = int(monitor.get("top", 0))

            screenshot = sct.grab(monitor)
            screen = PILImage.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
            return screen, monitor['width'], monitor['height'], offset_x, offset_y
    except Exception as e:
        logger.debug(f"Capture écran échouée: {e}")
        return None, 0, 0, 0, 0
  • Step 3: Adapter _grounding_ocr pour propager l'offset

Dans _grounding_ocr (ligne 432-512) :

  • Remplacer screen, screen_w, screen_h = _capture_screen() par screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=anchor_bbox.get("monitor_idx") if anchor_bbox else None)

  • Ajouter ox, oy aux coords avant return :

    • Avant : return {'x': best['x'], 'y': best['y'], ...}
    • Après : return {'x': best['x'] + ox, 'y': best['y'] + oy, 'method': 'ocr', 'confidence': best['conf']}
  • Step 4: Adapter _grounding_ui_tars idem

Dans _grounding_ui_tars (ligne 515-579) :

  • Modifier la signature : def _grounding_ui_tars(target_text, target_description="", monitor_idx=None):

  • Remplacer screen, screen_w, screen_h = _capture_screen() par screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx)

  • Ajouter offset dans le return : return {'x': x + ox, 'y': y + oy, 'method': 'ui_tars', 'confidence': 0.85}

  • Step 5: Adapter _grounding_vlm idem

Dans _grounding_vlm (ligne 629-684) :

  • Modifier la signature : def _grounding_vlm(target_text, target_description="", monitor_idx=None):

  • Remplacer le _capture_screen() interne par _capture_screen(monitor_idx=monitor_idx)

  • Ajouter offset au return des coords confirmées par OCR

  • Step 6: Modifier find_element_on_screen pour propager monitor_idx

Dans find_element_on_screen (signature ligne 312-317) :

  • Ajouter le paramètre monitor_idx: Optional[int] = None

  • Le passer aux 3 niveaux de cascade :

    • _grounding_ocr(target_text, anchor_bbox=anchor_bbox) → ajouter une étape qui range monitor_idx dans anchor_bbox si bbox dict, sinon créer un dict avec juste monitor_idx
    • _grounding_ui_tars(target_text, target_description, monitor_idx=monitor_idx)
    • _grounding_vlm(target_text, target_description, monitor_idx=monitor_idx)
  • Step 7: Run les tests intégration

pytest tests/integration/test_grounding_offset.py -v

Expected : 2 passed.

  • Step 8: Re-run baseline pour vérifier non-régression
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       -q

Expected : même nombre de passed que .qw-baseline.log.

  • Step 9: Commit
git add core/execution/input_handler.py tests/integration/test_grounding_offset.py
git commit -m "$(cat <<'EOF'
feat(qw1): capture par monitor + propagation offsets dans grounding cascade

_capture_screen() accepte un monitor_idx optionnel (None = composite legacy).
Index logique 0..N-1 mappé sur mss.monitors[idx+1] (mss[0] = composite).

Les 3 niveaux de grounding (OCR, UI-TARS, VLM) propagent l'offset retourné
par la capture pour traduire les coordonnées locales monitor en coordonnées
absolues écran (correct pour pyautogui.click).

find_element_on_screen() accepte monitor_idx et le forwarde aux 3 niveaux.

Backward 100% : monitor_idx=None partout → comportement strictement actuel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 9: Enrichir Agent V1 capturer côté client (events monitor_index + monitors_geometry)

Files:

  • Modify: agent_v0/agent_v1/vision/capturer.py (au moins 4 endroits utilisant sct.monitors[1])

  • Step 1: Lire le fichier en entier pour comprendre l'API

wc -l agent_v0/agent_v1/vision/capturer.py

Si > 300 lignes, lire en 2 fois pour cibler les modifs.

  • Step 2: Importer screeninfo (graceful fallback si absent)

En haut du fichier, après les imports existants :

try:
    from screeninfo import get_monitors as _screeninfo_get_monitors
    _SCREENINFO_AVAILABLE = True
except ImportError:
    _SCREENINFO_AVAILABLE = False
  • Step 3: Ajouter une fonction helper _get_monitors_geometry()
def _get_monitors_geometry():
    """Retourne la liste des monitors physiques avec leurs offsets.

    Returns:
        List[dict] : [{idx, x, y, w, h, primary}, ...]. Vide si screeninfo
        indisponible (le serveur tombera sur fallback composite).
    """
    if not _SCREENINFO_AVAILABLE:
        return []
    try:
        monitors = _screeninfo_get_monitors()
        return [
            {
                "idx": i,
                "x": int(m.x),
                "y": int(m.y),
                "w": int(m.width),
                "h": int(m.height),
                "primary": bool(getattr(m, "is_primary", False)),
            }
            for i, m in enumerate(monitors)
        ]
    except Exception:
        return []


def _get_active_monitor_index():
    """Retourne l'index logique du monitor où se trouve le curseur (focus actif).

    Returns:
        int ou None si indéterminable.
    """
    if not _SCREENINFO_AVAILABLE:
        return None
    try:
        import pyautogui
        cx, cy = pyautogui.position()
        for i, m in enumerate(_screeninfo_get_monitors()):
            if m.x <= cx < m.x + m.width and m.y <= cy < m.y + m.height:
                return i
    except Exception:
        return None
    return None
  • Step 4: Enrichir tous les payloads d'événements envoyés au serveur

Identifier les fonctions qui envoient au serveur (probablement via le feedback_bus ou via HTTP POST). Pour chacun, ajouter à l'event :

event_payload["monitor_index"] = _get_active_monitor_index()
event_payload["monitors_geometry"] = _get_monitors_geometry()

Le plus simple : créer un helper en haut du module et l'appeler à chaque endroit qui crée un payload heartbeat ou event :

def _enrich_with_monitor_info(payload: dict) -> dict:
    """Ajoute monitor_index et monitors_geometry au payload (modification in-place)."""
    payload["monitor_index"] = _get_active_monitor_index()
    payload["monitors_geometry"] = _get_monitors_geometry()
    return payload

Et appeler _enrich_with_monitor_info(payload) juste avant chaque envoi.

  • Step 5: Vérifier que l'import screeninfo est dans requirements_agent_v1.txt
grep -i screeninfo agent_v0/agent_v1/requirements*.txt

Si absent, l'ajouter à requirements_agent_v1.txt :

screeninfo>=0.8

(Le module a un fallback gracieux si le paquet n'est pas installé sur les vieux Agent V1, donc pas de blocage.)

  • Step 6: Smoke test local — vérifier que l'agent ne crashe pas

Sur la machine Linux (pas besoin de Windows pour ce smoke) :

python -c "from agent_v0.agent_v1.vision.capturer import _get_monitors_geometry, _get_active_monitor_index; print(_get_monitors_geometry()); print(_get_active_monitor_index())"

Expected : une liste de monitors (ou [] si screeninfo absent), et un int (ou None).

Task 10: Propager la modif au déploiement Windows

Files:

  • Modify: agent_v0/deploy/windows_client/agent_v1/vision/capturer.py

  • Step 1: Copier le fichier source vers le déploiement

cp agent_v0/agent_v1/vision/capturer.py agent_v0/deploy/windows_client/agent_v1/vision/capturer.py
  • Step 2: Vérifier le diff (rien d'autre ne doit changer)
git diff agent_v0/deploy/windows_client/agent_v1/vision/capturer.py | head -80
  • Step 3: Mettre à jour requirements_agent_v1.txt côté deploy aussi
grep screeninfo agent_v0/deploy/windows_client/agent_v1/requirements*.txt || \
  echo "screeninfo>=0.8" >> agent_v0/deploy/windows_client/agent_v1/requirements_agent_v1.txt

Task 11: Hook MonitorRouter dans api_stream.py

Files:

  • Modify: agent_v0/server_v1/api_stream.py (~10 lignes ajoutées dans la branche qui dispatche les actions au client)

  • Step 1: Localiser l'endroit où une action est envoyée au client

grep -n "next_action\|/replay/next\|return.*action" agent_v0/server_v1/api_stream.py | grep -v "^.*#" | head -20

L'objectif : trouver l'endpoint qui POP l'action de la queue et la renvoie à l'Agent V1 (typiquement /replay/next ou la réponse au polling client).

  • Step 2: Importer le routeur en haut du fichier
from agent_v0.server_v1.monitor_router import resolve_target_monitor
  • Step 3: Avant de renvoyer l'action au client, l'enrichir avec monitor_resolution

Dans la fonction qui prépare la réponse au client (juste avant le return) :

# QW1 — Résoudre l'écran cible et joindre l'info à l'action
session_state = {
    "monitors_geometry": session.last_window_info.get("monitors_geometry", []),
    "last_focused_monitor": session.last_window_info.get("monitor_index"),
}
target = resolve_target_monitor(action, session_state)
action["monitor_resolution"] = {
    "idx": target.idx,
    "offset_x": target.offset_x,
    "offset_y": target.offset_y,
    "w": target.w,
    "h": target.h,
    "source": target.source,
}
# Bus event d'observabilité
try:
    from agent_v0.agent_v1.network.feedback_bus import emit_server_event
    emit_server_event("lea:monitor_routed", {
        "replay_id": replay_state.get("replay_id"),
        "action_id": action.get("action_id"),
        "idx": target.idx,
        "source": target.source,
    })
except Exception:
    pass  # bus optionnel, ne jamais bloquer le replay
  • Step 4: Vérifier que last_window_info est bien rempli côté serveur

Chercher où last_window_info est mis à jour à partir des heartbeats Agent V1 :

grep -n "last_window_info" agent_v0/server_v1/*.py | head -10

Si monitor_index et monitors_geometry envoyés par l'agent ne sont pas stockés dans session.last_window_info, ajouter leur stockage dans la fonction qui consomme les heartbeats.

  • Step 5: Re-run baseline pour non-régression
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       -q

Expected : même résultat que baseline.

  • Step 6: Commit
git add agent_v0/agent_v1/vision/capturer.py \
        agent_v0/deploy/windows_client/agent_v1/vision/capturer.py \
        agent_v0/agent_v1/requirements_agent_v1.txt \
        agent_v0/deploy/windows_client/agent_v1/requirements_agent_v1.txt \
        agent_v0/server_v1/api_stream.py
git commit -m "$(cat <<'EOF'
feat(qw1): enrichissement Agent V1 (monitor_index + monitors_geometry) + hook serveur

Côté client Agent V1 :
- helper _get_monitors_geometry() via screeninfo (fallback [] si absent)
- helper _get_active_monitor_index() via position curseur
- _enrich_with_monitor_info() ajouté à chaque payload event/heartbeat
- screeninfo>=0.8 ajouté aux requirements (source + deploy)

Côté serveur api_stream.py :
- import resolve_target_monitor
- Avant chaque envoi action au client : enrichissement action.monitor_resolution
- Event bus lea:monitor_routed pour observabilité (idx, source)

Backward 100% : si geometry vide, fallback composite identique au comportement
actuel mss.monitors[0].

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 12: Smoke démo QW1 sur workflow Easily

Files: aucun (test manuel observable)

  • Step 1: Redémarrer le service streaming
./svc.sh restart streaming
sleep 2
./svc.sh status streaming
  • Step 2: Rejouer le workflow Easily Assure utilisé en Task 3

Vérifier dans les logs serveur : lea:monitor_routed apparaît avec source=focus (vu que les workflows actuels n'ont pas de monitor_index, c'est le focus actif qui est retenu).

journalctl -u rpa-streaming -f | grep monitor_routed
  • Step 3: Observer le replay

Le clic doit toujours atterrir au bon endroit (vérification visuelle, identique au smoke Task 3). Si décalage : kill-switch implicite = ré-checkout backup/pre-qw-suite-mai-2026-05-05 et investiguer.

  • Step 4: Push branche sur Gitea (backup distant après QW1)
git push gitea feature/qw-suite-mai

Section 2 — QW2 LoopDetector

Task 13: Tests unitaires test_loop_detector.py (rouges)

Files:

  • Create: tests/unit/test_loop_detector.py

  • Step 1: Créer le fichier de tests avec les 8 cas

# tests/unit/test_loop_detector.py
"""Tests unitaires pour LoopDetector composite (QW2)."""
import os
import pytest
from unittest.mock import MagicMock

from agent_v0.server_v1.loop_detector import LoopDetector, LoopVerdict


@pytest.fixture
def detector():
    """LoopDetector avec embedder mocké (signal A toujours dispo)."""
    embedder = MagicMock()
    # Par défaut : 4 embeddings tous identiques → similarity 1.0
    embedder.embed_image.return_value = [1.0, 0.0, 0.0]
    return LoopDetector(clip_embedder=embedder)


def _state(retried=0, n_screenshots=0, n_actions=0):
    return {
        "retried_actions": retried,
        "_screenshot_history": [[1.0, 0.0, 0.0]] * n_screenshots,
        "_action_history": [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * n_actions,
    }


def test_screen_static_triggers_when_n_identical_embeddings(detector):
    """Signal A : 4 captures identiques (similarity > 0.99) → detected."""
    state = _state(n_screenshots=4)
    verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[])
    assert verdict.detected is True
    assert verdict.signal == "screen_static"


def test_screen_static_skipped_when_history_too_short(detector):
    """Signal A : moins de N captures → pas de détection."""
    state = _state(n_screenshots=2)
    verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[])
    # Si seul A pourrait déclencher mais skip, et B/C pas remplis : detected=False
    assert verdict.detected is False


def test_action_repeat_triggers_when_n_identical_actions(detector):
    """Signal B : 3 actions consécutives identiques → detected."""
    state = _state(n_actions=3)
    verdict = detector.evaluate(state, screenshots=[], actions=state["_action_history"])
    assert verdict.detected is True
    assert verdict.signal == "action_repeat"


def test_action_repeat_skipped_when_actions_differ(detector):
    """Signal B : actions différentes → pas de détection."""
    actions = [
        {"type": "click", "x_pct": 0.1, "y_pct": 0.1},
        {"type": "click", "x_pct": 0.2, "y_pct": 0.2},
        {"type": "click", "x_pct": 0.3, "y_pct": 0.3},
    ]
    verdict = detector.evaluate(_state(), screenshots=[], actions=actions)
    assert verdict.detected is False


def test_retry_threshold_triggers_at_3(detector):
    """Signal C : retried_actions >= 3 → detected."""
    state = _state(retried=3)
    verdict = detector.evaluate(state, screenshots=[], actions=[])
    assert verdict.detected is True
    assert verdict.signal == "retry_threshold"


def test_kill_switch_disables_all_signals(monkeypatch, detector):
    """Si RPA_LOOP_DETECTOR_ENABLED=0 → toujours detected=False."""
    monkeypatch.setenv("RPA_LOOP_DETECTOR_ENABLED", "0")
    state = _state(retried=10, n_screenshots=10, n_actions=10)
    verdict = detector.evaluate(state, screenshots=state["_screenshot_history"],
                                actions=state["_action_history"])
    assert verdict.detected is False


def test_embedder_unavailable_skips_signal_A_continues_others():
    """Si CLIP embedder None → signal A skip, B et C continuent."""
    detector = LoopDetector(clip_embedder=None)
    # Trigger signal C
    state = _state(retried=3)
    verdict = detector.evaluate(state, screenshots=[], actions=[])
    assert verdict.detected is True
    assert verdict.signal == "retry_threshold"


def test_embedder_exception_does_not_crash(detector):
    """Si embed_image lève une exception → log + verdict detected=False."""
    detector.clip_embedder.embed_image.side_effect = RuntimeError("CUDA OOM")
    state = _state(n_screenshots=4)
    # Ne doit PAS lever : signal A devient inerte
    verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[])
    # Signal A inerte, B/C pas remplis → detected False
    assert verdict.detected is False
  • Step 2: Run pour vérifier qu'ils échouent
pytest tests/unit/test_loop_detector.py -v

Expected : ModuleNotFoundError: No module named 'agent_v0.server_v1.loop_detector'.

Task 14: Implémenter loop_detector.py

Files:

  • Create: agent_v0/server_v1/loop_detector.py

  • Step 1: Écrire le module complet

# agent_v0/server_v1/loop_detector.py
"""LoopDetector composite — détection de stagnation de Léa pendant un replay (QW2).

Trois signaux indépendants :
- screen_static : N captures consécutives avec CLIP similarity > seuil
- action_repeat : N actions consécutives identiques (type + coords)
- retry_threshold : nombre de retries cumulés >= seuil

Un seul signal positif → verdict.detected=True. Le serveur bascule alors le
replay en paused_need_help avec pause_reason explicite.

Désactivable via env var RPA_LOOP_DETECTOR_ENABLED=0.
"""

import logging
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


@dataclass
class LoopVerdict:
    detected: bool = False
    reason: str = ""
    signal: str = ""  # "screen_static" | "action_repeat" | "retry_threshold" | ""
    evidence: Dict[str, Any] = field(default_factory=dict)


def _env_int(name: str, default: int) -> int:
    try:
        return int(os.environ.get(name, default))
    except (TypeError, ValueError):
        return default


def _env_float(name: str, default: float) -> float:
    try:
        return float(os.environ.get(name, default))
    except (TypeError, ValueError):
        return default


def _env_bool_enabled(name: str) -> bool:
    val = os.environ.get(name, "1").strip().lower()
    return val not in ("0", "false", "no", "off", "")


def _cosine_similarity(a, b) -> float:
    """Similarité cosine entre deux vecteurs (listes ou np.array). Robuste vecteur nul."""
    import numpy as np
    av = np.asarray(a, dtype=np.float32).flatten()
    bv = np.asarray(b, dtype=np.float32).flatten()
    na, nb = float(np.linalg.norm(av)), float(np.linalg.norm(bv))
    if na < 1e-8 or nb < 1e-8:
        return 0.0
    return float(np.dot(av, bv) / (na * nb))


class LoopDetector:
    def __init__(self, clip_embedder=None):
        self.clip_embedder = clip_embedder

    def evaluate(
        self,
        state: Dict[str, Any],
        screenshots: List[Any],
        actions: List[Dict[str, Any]],
    ) -> LoopVerdict:
        """Évalue les 3 signaux. Retourne le premier déclenché.

        Args:
            state: replay_state (utilisé pour retried_actions)
            screenshots: anneau d'embeddings CLIP (les N derniers)
            actions: anneau des N dernières actions exécutées
        """
        if not _env_bool_enabled("RPA_LOOP_DETECTOR_ENABLED"):
            return LoopVerdict(detected=False)

        # Signal A : screen_static
        verdict = self._check_screen_static(screenshots)
        if verdict.detected:
            return verdict

        # Signal B : action_repeat
        verdict = self._check_action_repeat(actions)
        if verdict.detected:
            return verdict

        # Signal C : retry_threshold
        verdict = self._check_retry_threshold(state)
        if verdict.detected:
            return verdict

        return LoopVerdict(detected=False)

    def _check_screen_static(self, screenshots: List[Any]) -> LoopVerdict:
        n_required = _env_int("RPA_LOOP_SCREEN_STATIC_N", 4)
        threshold = _env_float("RPA_LOOP_SCREEN_STATIC_THRESHOLD", 0.99)

        if self.clip_embedder is None or len(screenshots) < n_required:
            return LoopVerdict()

        try:
            recent = screenshots[-n_required:]
            sims = [_cosine_similarity(recent[i], recent[i + 1])
                    for i in range(len(recent) - 1)]
            min_sim = min(sims)
            if min_sim > threshold:
                return LoopVerdict(
                    detected=True,
                    reason="loop_detected",
                    signal="screen_static",
                    evidence={"min_similarity": round(min_sim, 4),
                              "n_captures": n_required,
                              "threshold": threshold},
                )
        except Exception as e:
            logger.warning("LoopDetector signal_A erreur (%s) — signal inerte ce tick", e)
        return LoopVerdict()

    def _check_action_repeat(self, actions: List[Dict[str, Any]]) -> LoopVerdict:
        n_required = _env_int("RPA_LOOP_ACTION_REPEAT_N", 3)
        if len(actions) < n_required:
            return LoopVerdict()
        recent = actions[-n_required:]

        def _signature(a: Dict[str, Any]) -> tuple:
            return (a.get("type"), a.get("x_pct"), a.get("y_pct"))

        sigs = [_signature(a) for a in recent]
        if all(s == sigs[0] for s in sigs):
            return LoopVerdict(
                detected=True,
                reason="loop_detected",
                signal="action_repeat",
                evidence={"signature": sigs[0], "count": n_required},
            )
        return LoopVerdict()

    def _check_retry_threshold(self, state: Dict[str, Any]) -> LoopVerdict:
        threshold = _env_int("RPA_LOOP_RETRY_THRESHOLD", 3)
        retried = int(state.get("retried_actions", 0))
        if retried >= threshold:
            return LoopVerdict(
                detected=True,
                reason="loop_detected",
                signal="retry_threshold",
                evidence={"retried_actions": retried, "threshold": threshold},
            )
        return LoopVerdict()
  • Step 2: Re-run les tests, ils doivent tous passer
pytest tests/unit/test_loop_detector.py -v

Expected : 8 passed.

  • Step 3: Commit
git add agent_v0/server_v1/loop_detector.py tests/unit/test_loop_detector.py
git commit -m "$(cat <<'EOF'
feat(qw2): LoopDetector composite (screen_static + action_repeat + retry)

Module isolé, 3 signaux indépendants :
- screen_static : CLIP similarity > 0.99 sur N captures consécutives
- action_repeat : N actions identiques (type+coords)
- retry_threshold : retried_actions >= seuil

Premier signal positif → LoopVerdict.detected=True (caller responsable de
la bascule en paused_need_help).

Configurable env vars : RPA_LOOP_DETECTOR_ENABLED (kill-switch),
RPA_LOOP_SCREEN_STATIC_N/THRESHOLD, RPA_LOOP_ACTION_REPEAT_N,
RPA_LOOP_RETRY_THRESHOLD.

Tests : 8 cas (chaque signal isolé, kill-switch, embedder absent, exception).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 15: Étendre replay_engine.py — anneaux d'historique dans _create_replay_state

Files:

  • Modify: agent_v0/server_v1/replay_engine.py:1452-1524 (_create_replay_state)

  • Step 1: Ajouter les deux clés à la fin du dict retourné

Dans _create_replay_state (vers la ligne 1523, juste avant la dernière ligne } du return), ajouter :

        # QW2 — Anneaux d'historique pour LoopDetector (5 derniers max)
        "_screenshot_history": [],   # embeddings CLIP des N derniers heartbeats
        "_action_history": [],       # N dernières actions exécutées (signature)

(Ils prennent place juste après la clé "variables": {}.)

  • Step 2: Vérifier qu'aucun test unitaire de replay_engine n'attend l'absence de ces clés
grep -rn "_create_replay_state\|_screenshot_history\|_action_history" tests/ | head -20

Si un test fait un assert state == {...} strict, l'adapter pour accepter les deux nouvelles clés (typiquement aucun ne le fait — c'est usage défensif).

Task 16: Hook loop_detector dans api_stream.py

Files:

  • Modify: agent_v0/server_v1/api_stream.py:3159+ (report_action_result)

  • Step 1: Importer en haut du fichier

from agent_v0.server_v1.loop_detector import LoopDetector
  • Step 2: Instancier le détecteur globalement (singleton lazy)

Près des autres globals du module (chercher où active_processor est défini) :

_loop_detector: Optional[LoopDetector] = None

def _get_loop_detector() -> LoopDetector:
    global _loop_detector
    if _loop_detector is None:
        embedder = active_processor._clip_embedder if active_processor else None
        _loop_detector = LoopDetector(clip_embedder=embedder)
    return _loop_detector
  • Step 3: Hook dans report_action_result après mise à jour de l'état

Localiser dans report_action_result (ligne 3159+) l'endroit où le replay_state est mis à jour suite au rapport d'action (juste avant le return de la fonction). Ajouter :

        # QW2 — Mise à jour des anneaux d'historique
        try:
            from PIL import Image
            ss_path = report.screenshot_path or replay_state.get("last_screenshot")
            if ss_path and os.path.isfile(ss_path) and active_processor and active_processor._clip_embedder:
                emb = active_processor._clip_embedder.embed_image(Image.open(ss_path))
                if emb is not None:
                    replay_state["_screenshot_history"].append(emb.flatten().tolist())
                    replay_state["_screenshot_history"] = replay_state["_screenshot_history"][-5:]
        except Exception as e:
            logger.debug("LoopDetector: embed historique échoué: %s", e)

        # Snapshot signature de l'action courante
        replay_state["_action_history"].append({
            "type": report.action_type if hasattr(report, "action_type") else "",
            "x_pct": report.x_pct if hasattr(report, "x_pct") else None,
            "y_pct": report.y_pct if hasattr(report, "y_pct") else None,
        })
        replay_state["_action_history"] = replay_state["_action_history"][-5:]

        # Évaluer le LoopDetector
        try:
            verdict = _get_loop_detector().evaluate(
                replay_state,
                screenshots=replay_state["_screenshot_history"],
                actions=replay_state["_action_history"],
            )
            if verdict.detected:
                replay_state["status"] = "paused_need_help"
                replay_state["pause_reason"] = "loop_detected"
                replay_state["pause_message"] = (
                    f"Léa semble bloquée — {verdict.signal} "
                    f"(détail: {verdict.evidence})"
                )
                logger.warning(
                    "LoopDetector: replay %s mis en pause — signal=%s evidence=%s",
                    replay_state["replay_id"], verdict.signal, verdict.evidence,
                )
                # Bus event
                try:
                    from agent_v0.agent_v1.network.feedback_bus import emit_server_event
                    emit_server_event("lea:loop_detected", {
                        "replay_id": replay_state["replay_id"],
                        "signal": verdict.signal,
                        "evidence": verdict.evidence,
                    })
                except Exception:
                    pass
        except Exception as e:
            logger.warning("LoopDetector: évaluation échouée (non bloquant): %s", e)
  • Step 4: Re-run baseline pour vérifier non-régression
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       -q

Expected : même nombre de passed que .qw-baseline.log.

Task 17: Tests intégration test_loop_detector_replay.py

Files:

  • Create: tests/integration/test_loop_detector_replay.py

  • Step 1: Créer le fichier

# tests/integration/test_loop_detector_replay.py
"""Tests intégration : un replay simulé qui boucle bascule en paused_need_help."""
import pytest
from unittest.mock import MagicMock, patch

from agent_v0.server_v1.loop_detector import LoopDetector


def test_replay_state_transitions_to_paused_on_screen_static():
    """Cas : 4 screenshots identiques → replay passe à paused_need_help."""
    embedder = MagicMock()
    embedder.embed_image.return_value = [1.0, 0.0, 0.0]  # constant
    detector = LoopDetector(clip_embedder=embedder)

    state = {
        "replay_id": "r_test",
        "status": "running",
        "retried_actions": 0,
        "_screenshot_history": [[1.0, 0.0, 0.0]] * 4,
        "_action_history": [
            {"type": "click", "x_pct": 0.1, "y_pct": 0.1},
            {"type": "type", "x_pct": 0.2, "y_pct": 0.2},
        ],
    }
    verdict = detector.evaluate(state, state["_screenshot_history"], state["_action_history"])

    # Simuler ce que ferait api_stream après verdict
    if verdict.detected:
        state["status"] = "paused_need_help"
        state["pause_reason"] = verdict.reason
        state["pause_message"] = f"signal={verdict.signal}"

    assert state["status"] == "paused_need_help"
    assert state["pause_reason"] == "loop_detected"
    assert "screen_static" in state["pause_message"]


def test_replay_state_transitions_on_action_repeat():
    """Cas : 3 actions identiques → paused_need_help signal action_repeat."""
    detector = LoopDetector(clip_embedder=None)
    actions = [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * 3
    state = {"replay_id": "r2", "status": "running", "retried_actions": 0,
             "_screenshot_history": [], "_action_history": actions}

    verdict = detector.evaluate(state, [], actions)
    assert verdict.detected and verdict.signal == "action_repeat"


def test_kill_switch_keeps_replay_running(monkeypatch):
    """Avec RPA_LOOP_DETECTOR_ENABLED=0 le replay continue même en boucle."""
    monkeypatch.setenv("RPA_LOOP_DETECTOR_ENABLED", "0")
    embedder = MagicMock()
    embedder.embed_image.return_value = [1.0, 0.0, 0.0]
    detector = LoopDetector(clip_embedder=embedder)

    state = {"retried_actions": 10,
             "_screenshot_history": [[1.0, 0.0, 0.0]] * 10,
             "_action_history": [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * 10}

    verdict = detector.evaluate(state, state["_screenshot_history"], state["_action_history"])
    assert verdict.detected is False
  • Step 2: Run
pytest tests/integration/test_loop_detector_replay.py -v

Expected : 3 passed.

Task 18: Commit QW2 + push + re-run baseline

  • Step 1: Re-run baseline complète
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       tests/unit/test_loop_detector.py \
       tests/integration/test_loop_detector_replay.py \
       -q

Expected : tous passed, aucun nouveau failure par rapport à .qw-baseline.log.

  • Step 2: Commit final QW2
git add agent_v0/server_v1/replay_engine.py \
        agent_v0/server_v1/api_stream.py \
        tests/integration/test_loop_detector_replay.py
git commit -m "$(cat <<'EOF'
feat(qw2): hook LoopDetector dans api_stream + extension replay_state

replay_state enrichi de _screenshot_history (5 derniers embeddings CLIP)
et _action_history (5 dernières signatures action).

report_action_result :
- met à jour les deux anneaux après chaque action
- évalue le LoopDetector (singleton lazy)
- si detected → bascule paused_need_help avec pause_reason="loop_detected"
  et bus event lea:loop_detected (signal + evidence)

Tous les chemins d'erreur (embedder absent, OOM, exception) loggent et
laissent le replay continuer — aucun blocage par la couche détection.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
  • Step 3: Push branche sur Gitea (backup distant après QW2)
git push gitea feature/qw-suite-mai

Section 3 — QW4 Safety Checks Hybrides

Task 19: Tests unitaires test_safety_checks_provider.py (rouges)

Files:

  • Create: tests/unit/test_safety_checks_provider.py

  • Step 1: Créer le fichier avec les 7 cas

# tests/unit/test_safety_checks_provider.py
"""Tests unitaires SafetyChecksProvider (QW4)."""
import json
import pytest
from unittest.mock import patch, MagicMock

from agent_v0.server_v1.safety_checks_provider import build_pause_payload, PausePayload


def _action(safety_level=None, declarative_checks=None, message="Validation"):
    params = {"message": message}
    if safety_level:
        params["safety_level"] = safety_level
    if declarative_checks is not None:
        params["safety_checks"] = declarative_checks
    return {"type": "pause_for_human", "parameters": params}


def test_only_declarative_when_no_safety_level():
    """Pas de safety_level → uniquement les checks déclaratifs, pas d'appel LLM."""
    decl = [{"id": "c1", "label": "Vérifier IPP", "required": True}]
    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks") as mock_llm:
        payload = build_pause_payload(_action(declarative_checks=decl), {}, last_screenshot=None)
    mock_llm.assert_not_called()
    assert len(payload.checks) == 1
    assert payload.checks[0]["source"] == "declarative"


def test_hybrid_appends_llm_checks_on_medical_critical(monkeypatch):
    """safety_level=medical_critical → LLM appelé, checks concaténés."""
    decl = [{"id": "c1", "label": "Vérifier IPP", "required": True}]
    llm_resp = [{"label": "Nom patient suspect à l'écran", "evidence": "vu un nom différent"}]

    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks",
               return_value=llm_resp) as mock_llm:
        payload = build_pause_payload(
            _action(safety_level="medical_critical", declarative_checks=decl),
            {}, last_screenshot="/tmp/fake.png",
        )
    mock_llm.assert_called_once()
    assert len(payload.checks) == 2
    assert payload.checks[0]["source"] == "declarative"
    assert payload.checks[1]["source"] == "llm_contextual"
    assert payload.checks[1]["evidence"] == "vu un nom différent"


def test_llm_timeout_falls_back_to_declarative_only():
    """LLM timeout → additional_checks=[], pas de crash, déclaratifs gardés."""
    decl = [{"id": "c1", "label": "Vérifier IPP", "required": True}]
    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks",
               return_value=[]) as mock_llm:
        payload = build_pause_payload(
            _action(safety_level="medical_critical", declarative_checks=decl),
            {}, last_screenshot="/tmp/fake.png",
        )
    assert len(payload.checks) == 1
    assert payload.checks[0]["source"] == "declarative"


def test_llm_invalid_response_falls_back():
    """Si _call_llm retourne [] (parse échoué en interne) → fallback safe."""
    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks",
               return_value=[]):
        payload = build_pause_payload(
            _action(safety_level="medical_critical", declarative_checks=[]),
            {}, last_screenshot="/tmp/fake.png",
        )
    assert payload.checks == []


def test_kill_switch_disables_llm_call(monkeypatch):
    """RPA_SAFETY_CHECKS_LLM_ENABLED=0 → LLM jamais appelé."""
    monkeypatch.setenv("RPA_SAFETY_CHECKS_LLM_ENABLED", "0")
    decl = [{"id": "c1", "label": "X", "required": True}]
    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks") as mock_llm:
        payload = build_pause_payload(
            _action(safety_level="medical_critical", declarative_checks=decl),
            {}, last_screenshot="/tmp/fake.png",
        )
    mock_llm.assert_not_called()
    assert len(payload.checks) == 1


def test_max_checks_respected(monkeypatch):
    """RPA_SAFETY_CHECKS_LLM_MAX_CHECKS=2 → max 2 checks LLM ajoutés."""
    monkeypatch.setenv("RPA_SAFETY_CHECKS_LLM_MAX_CHECKS", "2")
    decl = []
    llm_resp = [
        {"label": f"Check {i}", "evidence": f"e{i}"} for i in range(5)
    ]
    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks",
               return_value=llm_resp[:2]):  # provider tronque déjà
        payload = build_pause_payload(
            _action(safety_level="medical_critical", declarative_checks=decl),
            {}, last_screenshot="/tmp/fake.png",
        )
    assert len(payload.checks) == 2


def test_empty_declarative_with_llm_returns_only_llm():
    """Pas de déclaratif + LLM ajoute 2 checks → payload contient les 2."""
    llm_resp = [{"label": "Vérifier date", "evidence": "date 1900 suspecte"},
                {"label": "Vérifier devise", "evidence": "montant en USD au lieu d'EUR"}]
    with patch("agent_v0.server_v1.safety_checks_provider._call_llm_for_contextual_checks",
               return_value=llm_resp):
        payload = build_pause_payload(
            _action(safety_level="medical_critical", declarative_checks=[]),
            {}, last_screenshot="/tmp/fake.png",
        )
    assert len(payload.checks) == 2
    assert all(c["source"] == "llm_contextual" for c in payload.checks)
  • Step 2: Run pour vérifier qu'ils échouent
pytest tests/unit/test_safety_checks_provider.py -v

Expected : ModuleNotFoundError.

Task 20: Implémenter safety_checks_provider.py

Files:

  • Create: agent_v0/server_v1/safety_checks_provider.py

  • Step 1: Écrire le module complet

# agent_v0/server_v1/safety_checks_provider.py
"""SafetyChecksProvider — checks hybrides déclaratifs + LLM contextuels (QW4).

Pour une action pause_for_human :
- les checks déclaratifs (workflow) sont toujours inclus
- si safety_level == "medical_critical" et RPA_SAFETY_CHECKS_LLM_ENABLED=1,
  un appel LLM (medgemma:4b par défaut) ajoute jusqu'à N checks contextuels

Tout échec côté LLM (timeout, exception, parse) → additional_checks=[] :
le replay continue avec uniquement les déclaratifs (fallback safe).
"""

import base64
import io
import json
import logging
import os
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


@dataclass
class PausePayload:
    checks: List[Dict[str, Any]] = field(default_factory=list)
    pause_reason: str = ""
    message: str = ""


def _env(name: str, default: str) -> str:
    return os.environ.get(name, default).strip()


def _env_int(name: str, default: int) -> int:
    try:
        return int(os.environ.get(name, default))
    except (TypeError, ValueError):
        return default


def _env_bool_enabled(name: str) -> bool:
    val = os.environ.get(name, "1").strip().lower()
    return val not in ("0", "false", "no", "off", "")


def build_pause_payload(
    action: Dict[str, Any],
    replay_state: Dict[str, Any],
    last_screenshot: Optional[str],
) -> PausePayload:
    """Construit le payload de pause enrichi pour une action pause_for_human."""
    params = action.get("parameters") or {}
    message = params.get("message", "Validation requise")
    safety_level = params.get("safety_level")
    declarative = params.get("safety_checks") or []

    # Normalisation des checks déclaratifs
    checks: List[Dict[str, Any]] = []
    for d in declarative:
        checks.append({
            "id": d.get("id") or f"decl_{uuid.uuid4().hex[:6]}",
            "label": d.get("label", "Validation"),
            "required": bool(d.get("required", True)),
            "source": "declarative",
            "evidence": None,
        })

    # Ajout LLM contextual si applicable
    if safety_level == "medical_critical" and _env_bool_enabled("RPA_SAFETY_CHECKS_LLM_ENABLED"):
        try:
            additional = _call_llm_for_contextual_checks(
                action=action,
                replay_state=replay_state,
                last_screenshot=last_screenshot,
                existing_labels=[c["label"] for c in checks],
            )
        except Exception as e:
            logger.warning("safety_checks LLM exception (%s) — fallback safe", e)
            additional = []

        for a in additional:
            checks.append({
                "id": f"llm_{uuid.uuid4().hex[:6]}",
                "label": a.get("label", ""),
                "required": False,  # checks LLM = informationnels, pas obligatoires V1
                "source": "llm_contextual",
                "evidence": a.get("evidence", ""),
            })

    return PausePayload(
        checks=checks,
        pause_reason="",
        message=message,
    )


def _call_llm_for_contextual_checks(
    action: Dict[str, Any],
    replay_state: Dict[str, Any],
    last_screenshot: Optional[str],
    existing_labels: List[str],
) -> List[Dict[str, str]]:
    """Appelle Ollama en mode JSON strict pour générer 0-N checks contextuels.

    Returns:
        List[{label, evidence}] (max RPA_SAFETY_CHECKS_LLM_MAX_CHECKS).
        [] sur tout échec (timeout, JSON invalide, exception).
    """
    import requests

    model = _env("RPA_SAFETY_CHECKS_LLM_MODEL", "medgemma:4b")
    timeout_s = _env_int("RPA_SAFETY_CHECKS_LLM_TIMEOUT_S", 5)
    max_checks = _env_int("RPA_SAFETY_CHECKS_LLM_MAX_CHECKS", 3)
    ollama_url = _env("OLLAMA_URL", "http://localhost:11434")

    params = action.get("parameters") or {}
    workflow_message = params.get("message", "")
    existing = ", ".join(existing_labels) if existing_labels else "aucun"

    prompt = f"""Tu es Léa, assistante médicale supervisée.
Avant de continuer le workflow, tu dois lister 0 à {max_checks} vérifications supplémentaires
que l'humain doit acquitter, en regardant l'écran actuel.

Contexte workflow : {workflow_message}
Checks déjà demandés : {existing}

NE répète PAS un check déjà demandé.
Si rien d'inhabituel à signaler, retourne {{"additional_checks": []}}.

Réponds UNIQUEMENT en JSON :
{{
  "additional_checks": [
    {{"label": "string court", "evidence": "ce que tu as vu d'inhabituel"}}
  ]
}}
"""

    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "format": "json",
        "options": {"temperature": 0.1, "num_predict": 200},
    }

    if last_screenshot and os.path.isfile(last_screenshot):
        try:
            with open(last_screenshot, "rb") as f:
                payload["images"] = [base64.b64encode(f.read()).decode("ascii")]
        except Exception as e:
            logger.debug("safety_checks: lecture screenshot échouée (%s) — appel sans image", e)

    try:
        response = requests.post(
            f"{ollama_url}/api/generate",
            json=payload,
            timeout=timeout_s,
        )
        if response.status_code != 200:
            logger.warning("safety_checks LLM HTTP %s", response.status_code)
            return []
        text = response.json().get("response", "").strip()
    except requests.Timeout:
        logger.warning("safety_checks LLM timeout (%ss)", timeout_s)
        return []
    except Exception as e:
        logger.warning("safety_checks LLM erreur réseau: %s", e)
        return []

    # format=json garantit normalement du JSON valide
    try:
        parsed = json.loads(text)
    except json.JSONDecodeError as e:
        logger.warning("safety_checks LLM JSON invalide (%s) — fallback safe", e)
        return []

    additional = parsed.get("additional_checks") or []
    if not isinstance(additional, list):
        return []

    # Filtre + tronc
    valid = []
    for item in additional[:max_checks]:
        if isinstance(item, dict) and item.get("label"):
            valid.append({
                "label": str(item["label"])[:200],
                "evidence": str(item.get("evidence", ""))[:300],
            })
    return valid
  • Step 2: Re-run les tests, ils doivent tous passer
pytest tests/unit/test_safety_checks_provider.py -v

Expected : 7 passed.

  • Step 3: Commit
git add agent_v0/server_v1/safety_checks_provider.py tests/unit/test_safety_checks_provider.py
git commit -m "$(cat <<'EOF'
feat(qw4): SafetyChecksProvider hybride déclaratif + LLM contextuel

build_pause_payload(action, state, last_screenshot) → PausePayload
- Toujours inclure les checks déclaratifs (workflow.parameters.safety_checks)
- Si safety_level=medical_critical ET RPA_SAFETY_CHECKS_LLM_ENABLED=1 :
    appel LLM (medgemma:4b par défaut) en format=json strict, timeout 5s,
    max 3 checks ajoutés (configurables via env vars)
- Tous les chemins d'erreur (timeout, HTTP, JSON parse, exception) loggent
  et retournent [] (fallback safe : déclaratifs seuls)

Tests : 7 cas (déclaratif seul, hybride OK, timeout, LLM invalide,
kill-switch, max_checks, déclaratif vide).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 21: Hook safety_checks_provider dans replay_engine.py

Files:

  • Modify: agent_v0/server_v1/replay_engine.py:1452-1524 (_create_replay_state) — ajouter clés

  • Modify: agent_v0/server_v1/api_stream.py (branche pause_for_human, ~line 2918)

  • Step 1: Étendre _create_replay_state avec les clés audit

Dans _create_replay_state, après les clés QW2 ajoutées en Task 15 :

        # QW4 — Safety checks et audit acquittements
        "safety_checks": [],            # liste produite par SafetyChecksProvider
        "checks_acknowledged": [],      # ids acquittés via /replay/resume (audit trail)
        "pause_reason": "",             # "loop_detected" | "" pour V1
        "pause_payload": None,          # payload complet pour debug/audit
  • Step 2: Localiser la branche pause_for_human dans api_stream.py
grep -n "pause_for_human" agent_v0/server_v1/api_stream.py | head -10

Cible : ligne 2918 (commentaire actuel : "pause_for_human ignorée (mode autonome)") et le bloc qui suit pour le mode supervisé (probablement quelques lignes plus bas).

  • Step 3: Modifier la branche supervisée pour appeler le provider

Avant la mise à jour replay_state["status"] = "paused_need_help" dans la branche supervisée :

                # QW4 — Construire le payload de pause enrichi
                from agent_v0.server_v1.safety_checks_provider import build_pause_payload
                last_screenshot = replay_state.get("last_screenshot")
                payload = build_pause_payload(action, replay_state, last_screenshot)
                replay_state["safety_checks"] = payload.checks
                replay_state["pause_payload"] = {
                    "checks": payload.checks,
                    "pause_reason": payload.pause_reason,
                    "message": payload.message,
                }
                replay_state["pause_message"] = payload.message
                # Bus event d'observabilité
                try:
                    from agent_v0.agent_v1.network.feedback_bus import emit_server_event
                    emit_server_event("lea:safety_checks_generated", {
                        "replay_id": replay_state.get("replay_id"),
                        "count": len(payload.checks),
                        "sources": [c["source"] for c in payload.checks],
                    })
                except Exception:
                    pass
  • Step 4: Re-run baseline
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       -q

Expected : même résultat baseline.

Task 22: Tests intégration test_replay_resume_acknowledgments.py

Files:

  • Create: tests/integration/test_replay_resume_acknowledgments.py

  • Step 1: Créer le fichier

# tests/integration/test_replay_resume_acknowledgments.py
"""Tests intégration : /replay/resume valide les acquittements de safety_checks (QW4)."""
import pytest


def test_resume_accepts_when_all_required_acknowledged():
    """État pause + tous required acquittés → reprise OK."""
    state = {
        "status": "paused_need_help",
        "safety_checks": [
            {"id": "c1", "label": "X", "required": True, "source": "declarative", "evidence": None},
            {"id": "c2", "label": "Y", "required": True, "source": "declarative", "evidence": None},
        ],
        "checks_acknowledged": [],
    }
    # Simuler la validation côté serveur
    acknowledged = ["c1", "c2"]
    required_ids = {c["id"] for c in state["safety_checks"] if c["required"]}
    missing = required_ids - set(acknowledged)
    assert missing == set()  # rien ne manque → reprise OK


def test_resume_rejects_when_required_missing():
    """État pause + un required non acquitté → 400 required_checks_missing."""
    state = {
        "status": "paused_need_help",
        "safety_checks": [
            {"id": "c1", "label": "X", "required": True, "source": "declarative", "evidence": None},
            {"id": "c2", "label": "Y", "required": False, "source": "llm_contextual", "evidence": "..."},
        ],
        "checks_acknowledged": [],
    }
    acknowledged = ["c2"]  # only optional
    required_ids = {c["id"] for c in state["safety_checks"] if c["required"]}
    missing = required_ids - set(acknowledged)
    assert missing == {"c1"}  # c1 manquant → resume doit retourner 400


def test_resume_audit_trail_stored():
    """checks_acknowledged contient les ids reçus (audit)."""
    state = {
        "status": "paused_need_help",
        "safety_checks": [
            {"id": "c1", "required": True, "label": "X", "source": "declarative", "evidence": None},
        ],
        "checks_acknowledged": [],
    }
    acknowledged = ["c1"]
    state["checks_acknowledged"] = acknowledged
    state["status"] = "running"
    assert state["checks_acknowledged"] == ["c1"]
    assert state["status"] == "running"
  • Step 2: Run
pytest tests/integration/test_replay_resume_acknowledgments.py -v

Expected : 3 passed.

Task 23: Modifier endpoint /replay/resume dans api_stream.py

Files:

  • Modify: agent_v0/server_v1/api_stream.py:3974-3990+ (/replay/resume)

  • Step 1: Localiser la fonction

grep -n "def.*resume.*replay\|@app.post.*resume\|/replay/resume" agent_v0/server_v1/api_stream.py | head -5

Cible attendue : ligne ~3974.

  • Step 2: Modifier la signature pour accepter acknowledged_check_ids

Soit la fonction existante :

@app.post("/replay/resume")
async def resume_replay(...): ...

Étendre le body Pydantic pour accepter optionnellement acknowledged_check_ids: List[str] = [].

Exemple : si la signature actuelle est async def resume_replay(payload: ReplayResumeRequest):, modifier le modèle ReplayResumeRequest pour ajouter acknowledged_check_ids: List[str] = [].

  • Step 3: Vérifier les acquittements avant la reprise effective

À l'intérieur de la fonction, juste après avoir confirmé state["status"] == "paused_need_help" :

    # QW4 — Vérification des safety_checks required
    safety_checks = state.get("safety_checks") or []
    if safety_checks:
        required_ids = {c["id"] for c in safety_checks if c.get("required")}
        ack_set = set(payload.acknowledged_check_ids or [])
        missing = list(required_ids - ack_set)
        if missing:
            raise HTTPException(
                status_code=400,
                detail={"error": "required_checks_missing", "missing": missing},
            )
        # Audit trail
        state["checks_acknowledged"] = list(ack_set)
  • Step 4: Re-run baseline + tests intégration
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       tests/integration/test_replay_resume_acknowledgments.py \
       -q

Expected : tous passed, baseline préservée.

  • Step 5: Commit
git add agent_v0/server_v1/replay_engine.py \
        agent_v0/server_v1/api_stream.py \
        tests/integration/test_replay_resume_acknowledgments.py
git commit -m "$(cat <<'EOF'
feat(qw4): hook safety_checks_provider + extension /replay/resume avec acquittements

replay_state enrichi de safety_checks, checks_acknowledged, pause_reason,
pause_payload (audit trail).

Branche supervisée pause_for_human :
- appel build_pause_payload() avant bascule paused_need_help
- bus event lea:safety_checks_generated (count, sources)

POST /replay/resume :
- accepte body { acknowledged_check_ids: [...] }
- vérifie tous les checks required acquittés, sinon 400 required_checks_missing
- stocke checks_acknowledged comme audit trail

Backward 100% : workflows sans safety_checks → resume sans acquittement requis.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"

Task 24: Étendre types.ts côté frontend VWB

Files:

  • Modify: visual_workflow_builder/frontend_v4/src/types.ts:46 et alentours, et le type Execution

  • Step 1: Localiser le type PauseAction

grep -n "pause_for_human\|PauseAction\|safety_checks\|Execution" visual_workflow_builder/frontend_v4/src/types.ts | head -20
  • Step 2: Ajouter les types SafetyCheck et étendre PauseAction.parameters

Au début du fichier, après les imports :

export type SafetyLevel = 'standard' | 'medical_critical';

export interface SafetyCheck {
  id: string;
  label: string;
  required: boolean;
  source: 'declarative' | 'llm_contextual';
  evidence?: string | null;
}

Étendre les params de l'action pause_for_human (chercher dans la définition ActionDef ou similaire à la ligne 135) :

{
  type: 'pause_for_human',
  label: 'Pause supervisée',
  ...,
  params: [
    { key: 'message', label: 'Message', type: 'text' },
    { key: 'safety_level', label: 'Niveau', type: 'select', options: ['standard', 'medical_critical'] },
    { key: 'safety_checks', label: 'Checks à valider', type: 'checks_editor' },
  ],
}

Étendre le type Execution pour transporter le payload de pause :

export interface Execution {
  // ... champs existants ...
  pause_reason?: string;
  pause_message?: string;
  safety_checks?: SafetyCheck[];
}
  • Step 3: Vérifier la compilation TypeScript
cd visual_workflow_builder/frontend_v4 && npx tsc --noEmit 2>&1 | head -30

Expected : aucune erreur (ou seulement les erreurs préexistantes hors de ce diff).

Task 25: Créer le composant PauseDialog.tsx

Files:

  • Create: visual_workflow_builder/frontend_v4/src/components/PauseDialog.tsx

  • Step 1: Écrire le composant

// visual_workflow_builder/frontend_v4/src/components/PauseDialog.tsx
import { useState, useMemo } from 'react';
import type { SafetyCheck } from '../types';

interface Props {
  pauseMessage: string;
  pauseReason?: string;
  safetyChecks: SafetyCheck[];
  onResume: (acknowledgedIds: string[]) => Promise<void>;
  onCancel: () => void;
}

export default function PauseDialog({
  pauseMessage,
  pauseReason,
  safetyChecks,
  onResume,
  onCancel,
}: Props) {
  const [checked, setChecked] = useState<Record<string, boolean>>({});
  const [submitting, setSubmitting] = useState(false);
  const [error, setError] = useState<string | null>(null);

  const allRequiredOK = useMemo(() => {
    return safetyChecks
      .filter((c) => c.required)
      .every((c) => checked[c.id] === true);
  }, [safetyChecks, checked]);

  const toggle = (id: string) => {
    setChecked((prev) => ({ ...prev, [id]: !prev[id] }));
  };

  const handleResume = async () => {
    setSubmitting(true);
    setError(null);
    try {
      const acknowledgedIds = Object.entries(checked)
        .filter(([, v]) => v)
        .map(([k]) => k);
      await onResume(acknowledgedIds);
    } catch (e: any) {
      setError(e?.message || 'Erreur lors de la reprise');
    } finally {
      setSubmitting(false);
    }
  };

  // Backward compat : pas de checks → bulle simple legacy
  if (safetyChecks.length === 0) {
    return (
      <div className="pause-dialog-simple">
        <p>{pauseMessage}</p>
        {pauseReason && <small className="pause-reason">Raison : {pauseReason}</small>}
        <div className="pause-actions">
          <button onClick={() => onResume([])} disabled={submitting}>
            Continuer
          </button>
          <button onClick={onCancel} disabled={submitting}>
            Annuler
          </button>
        </div>
      </div>
    );
  }

  return (
    <div className="pause-dialog-checks">
      <h3>Pause supervisée</h3>
      <p className="pause-message">{pauseMessage}</p>
      {pauseReason && (
        <div className="pause-reason-banner">
          <strong>Raison :</strong> {pauseReason}
        </div>
      )}

      <ul className="checklist-panel">
        {safetyChecks.map((c) => (
          <li key={c.id} className={`check-item ${c.required ? 'required' : 'optional'}`}>
            <label>
              <input
                type="checkbox"
                checked={!!checked[c.id]}
                onChange={() => toggle(c.id)}
                disabled={submitting}
              />
              <span className="check-label">{c.label}</span>
              {c.required && <span className="badge badge-required">obligatoire</span>}
              {c.source === 'llm_contextual' && (
                <span className="badge badge-lea" title={c.evidence || ''}>
                  Léa
                </span>
              )}
            </label>
            {c.source === 'llm_contextual' && c.evidence && (
              <small className="check-evidence"> {c.evidence}</small>
            )}
          </li>
        ))}
      </ul>

      {error && <div className="pause-error">{error}</div>}

      <div className="pause-actions">
        <button
          onClick={handleResume}
          disabled={!allRequiredOK || submitting}
          title={!allRequiredOK ? 'Coche tous les checks obligatoires' : 'Reprendre le replay'}
        >
          {submitting ? 'Reprise...' : 'Continuer'}
        </button>
        <button onClick={onCancel} disabled={submitting}>
          Annuler
        </button>
      </div>
    </div>
  );
}
  • Step 2: Ajouter le CSS minimal (dans le fichier CSS global ou inline)

Identifier le fichier CSS actif :

ls visual_workflow_builder/frontend_v4/src/*.css

Ajouter :

.pause-dialog-checks { padding: 16px; max-width: 480px; background: #fff; border: 2px solid #f59e0b; border-radius: 8px; }
.pause-dialog-checks h3 { margin: 0 0 8px; color: #92400e; }
.pause-message { margin: 0 0 12px; }
.pause-reason-banner { background: #fef3c7; padding: 8px; margin-bottom: 12px; border-radius: 4px; }
.checklist-panel { list-style: none; padding: 0; margin: 0 0 12px; }
.check-item { padding: 6px 0; border-bottom: 1px solid #f3f4f6; }
.check-item.required { background: #fef9c3; }
.check-item label { cursor: pointer; display: flex; align-items: center; gap: 6px; }
.badge { font-size: 10px; padding: 2px 6px; border-radius: 10px; margin-left: 6px; }
.badge-required { background: #dc2626; color: #fff; }
.badge-lea { background: #2563eb; color: #fff; cursor: help; }
.check-evidence { display: block; font-style: italic; color: #6b7280; margin-left: 24px; }
.pause-error { color: #dc2626; padding: 8px; background: #fef2f2; border-radius: 4px; margin-bottom: 8px; }
.pause-actions button:disabled { opacity: 0.5; cursor: not-allowed; }
  • Step 3: Brancher le composant dans le rendu existant de la pause

Localiser où la pause est actuellement rendue :

grep -rn "pause_for_human\|paused_need_help\|Continuer\|onResume" visual_workflow_builder/frontend_v4/src/ | head -20

Remplacer le rendu existant par <PauseDialog ... /> avec les props issues du state d'exécution.

Task 26: Étendre PropertiesPanel.tsx — éditeur de safety_level + safety_checks

Files:

  • Modify: visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx:1356

  • Step 1: Localiser la branche case 'pause_for_human':

Vers la ligne 1356 (déjà repéré). Lire les ~50 lignes qui suivent pour voir le pattern d'édition existant.

  • Step 2: Ajouter les éditeurs après le champ message
// Dans la branche case 'pause_for_human':
<>
  {/* Champ message existant — ne pas toucher */}
  <label>Message</label>
  <input value={params.message || ''} onChange={...} />

  {/* QW4 — Niveau de sécurité */}
  <label>Niveau de sécurité</label>
  <select
    value={params.safety_level || 'standard'}
    onChange={(e) => updateParam('safety_level', e.target.value)}
  >
    <option value="standard">Standard (pas de LLM)</option>
    <option value="medical_critical">Médical critique (LLM contextuel)</option>
  </select>

  {/* QW4 — Liste éditable de checks déclaratifs */}
  <label>Checks à valider</label>
  {(params.safety_checks || []).map((check: any, i: number) => (
    <div key={i} className="check-editor-row">
      <input
        placeholder="ID (ex: check_ipp)"
        value={check.id}
        onChange={(e) => {
          const next = [...(params.safety_checks || [])];
          next[i] = { ...check, id: e.target.value };
          updateParam('safety_checks', next);
        }}
      />
      <input
        placeholder="Libellé"
        value={check.label}
        onChange={(e) => {
          const next = [...(params.safety_checks || [])];
          next[i] = { ...check, label: e.target.value };
          updateParam('safety_checks', next);
        }}
      />
      <label>
        <input
          type="checkbox"
          checked={!!check.required}
          onChange={(e) => {
            const next = [...(params.safety_checks || [])];
            next[i] = { ...check, required: e.target.checked };
            updateParam('safety_checks', next);
          }}
        />
        Obligatoire
      </label>
      <button onClick={() => {
        const next = (params.safety_checks || []).filter((_: any, j: number) => j !== i);
        updateParam('safety_checks', next);
      }}></button>
    </div>
  ))}
  <button onClick={() => {
    const next = [...(params.safety_checks || []), { id: '', label: '', required: true }];
    updateParam('safety_checks', next);
  }}>+ Ajouter un check</button>
</>

(Adapter updateParam au nom réel de la fonction d'édition utilisée dans le fichier — vérifier le pattern existant ligne ~1356.)

  • Step 3: Vérifier la compilation
cd visual_workflow_builder/frontend_v4 && npx tsc --noEmit 2>&1 | head -30

Task 27: Checklist compat VWB manuelle

Files: aucun (test manuel observable)

  • Step 1: Démarrer le frontend Vite
cd visual_workflow_builder/frontend_v4 && npm run dev
  • Step 2: Workflow ancien (sans safety_checks) → bulle simple

Ouvrir un workflow existant validé 30/04. Lancer le replay. Quand la pause apparaît : la bulle doit être identique à avant (Continuer, Annuler, pas de checklist).

  • Step 3: Workflow nouveau avec safety_checks déclaratifs

Créer un workflow avec une action pause_for_human ayant 2 safety_checks déclaratifs required: true. Lancer. Vérifier :

  • ChecklistPanel s'affiche

  • Bouton Continuer désactivé tant que les 2 cases ne sont pas cochées

  • Pas d'appel Ollama dans les logs serveur (vérifier journalctl -u rpa-streaming -f | grep -i ollama sur la fenêtre du replay)

  • Step 4: Workflow medical_critical avec LLM

Modifier le workflow précédent : safety_level: medical_critical. Re-lancer. Vérifier :

  • Logs serveur affichent un appel à medgemma:4b dans les 5s

  • ChecklistPanel affiche 2 checks déclaratifs + 0-3 checks [Léa] (avec evidence en tooltip)

  • Si Ollama down : pas de crash, juste 2 checks déclaratifs (kill-switch implicite)

  • Step 5: Test mauvais payload

Cocher tous les optionnels mais pas un required → Continuer reste désactivé. Force un POST direct au serveur via curl :

curl -X POST http://localhost:5005/replay/resume \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $RPA_API_TOKEN" \
  -d '{"replay_id":"...","acknowledged_check_ids":[]}'

Expected : 400 {"detail": {"error": "required_checks_missing", "missing": [...]}}.

  • Step 6: Vérifier DB workflow.db ouvre correctement
sqlite3 visual_workflow_builder/backend/instance/workflows.db ".tables"
sqlite3 visual_workflow_builder/backend/instance/workflows.db "SELECT id, name FROM workflows LIMIT 3;"

Expected : aucune erreur, schéma intact.

Task 28: Smoke démo full chain QW4 sur Easily Assure

Files: aucun (test manuel observable)

  • Step 1: Restart streaming + frontend Vite
./svc.sh restart streaming
# Frontend Vite reste actif depuis Task 27
  • Step 2: Modifier UN workflow Easily Assure existant pour ajouter une pause medical_critical

Dans VWB, sur un workflow UHCD validé : insérer une action pause_for_human avant l'étape de validation finale, avec :

  • safety_level: medical_critical

  • safety_checks: [{id:check_ipp, label:"IPP correct ?", required:true}, {id:check_diag, label:"Diagnostic confirmé ?", required:true}]

  • Step 3: Lancer le replay sur Agent V1 Windows

Vérifier la chaîne complète :

  • Workflow déroule jusqu'à la pause

  • Léa émet lea:safety_checks_generated avec checks déclaratifs + LLM

  • VWB affiche <PauseDialog> avec 2-5 checks

  • Médecin (toi) coche les checks

  • Continuer envoie le POST

  • Replay reprend, finit

  • Step 4: Vérifier audit trail dans les logs

journalctl -u rpa-streaming -n 200 | grep -E "checks_acknowledged|safety_checks_generated|safety_checks_llm_failed" | tail -10

Expected : trace propre.

Task 29: Commit final QW4 + push + re-run baseline complète

  • Step 1: Re-run baseline complète + tous les tests QW
pytest tests/test_pipeline_e2e.py \
       tests/test_phase0_integration.py \
       tests/integration/test_stream_processor.py \
       tests/unit/test_monitor_router.py \
       tests/integration/test_grounding_offset.py \
       tests/unit/test_loop_detector.py \
       tests/integration/test_loop_detector_replay.py \
       tests/unit/test_safety_checks_provider.py \
       tests/integration/test_replay_resume_acknowledgments.py \
       -q

Expected : tous passed, baseline préservée.

  • Step 2: Commit final QW4 (frontend)
git add visual_workflow_builder/frontend_v4/src/types.ts \
        visual_workflow_builder/frontend_v4/src/components/PauseDialog.tsx \
        visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx \
        visual_workflow_builder/frontend_v4/src/*.css
git commit -m "$(cat <<'EOF'
feat(vwb): PauseDialog + ChecklistPanel + extension PropertiesPanel pour safety_checks

PauseDialog (composant nouveau) :
- 2 modes selon payload : bulle simple legacy si safety_checks vide,
  ChecklistPanel sinon
- Continuer désactivé tant que required non cochés
- Badge [obligatoire] et [Léa] (avec evidence en tooltip)
- POST /replay/resume avec acknowledged_check_ids

types.ts : SafetyCheck, SafetyLevel, extension Execution.

PropertiesPanel : éditeur safety_level (dropdown standard/medical_critical)
+ liste éditable de safety_checks (id/label/required + ajout/suppression).

Backward 100% : workflows existants sans safety_checks affichent
la bulle legacy identique au comportement actuel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
  • Step 3: Push branche sur Gitea (livraison QW1+QW2+QW4 distante)
git push gitea feature/qw-suite-mai

Section 4 — Documentation & MEMORY

Task 30: Créer la doc de livraison + maj MEMORY

Files:

  • Create: docs/QW_SUITE_MAI.md

  • Modify: /home/dom/.claude/projects/-home-dom-ai-rpa-vision-v3/memory/MEMORY.md

  • Step 1: Créer docs/QW_SUITE_MAI.md

# QW Suite Mai 2026 — Synthèse de livraison

Sprint d'amélioration RPA Vision V3, branche `feature/qw-suite-mai`,
inspiré par exploration comparative de 5 frameworks computer-use
(Simular Agent-S, browser-use, OpenAI CUA, Coasty, Showlab OOTB).

## Trois quick wins livrés

- **QW1 — Multi-écrans** : capture/grounding par `monitor_index` avec fallbacks
  focus actif puis composite. Backward 100% sur workflows existants.
- **QW2 — LoopDetector composite** : détection passive de stagnation via
  3 signaux (CLIP screen_static + action_repeat + retry_threshold).
  Bascule en `paused_need_help` automatique.
- **QW4 — Safety checks hybrides** : `pause_for_human` enrichi de checks
  déclaratifs (workflow) + LLM contextuels (`medgemma:4b` local, timeout 5s,
  fallback safe). UX VWB avec ChecklistPanel acquittable.

## Kill-switches en cas de problème

```bash
systemctl edit rpa-streaming
# Ajouter :
Environment=RPA_LOOP_DETECTOR_ENABLED=0
Environment=RPA_SAFETY_CHECKS_LLM_ENABLED=0
systemctl restart rpa-streaming

Rollback complet : git checkout backup/pre-qw-suite-mai-2026-05-05.

Référence design

docs/superpowers/specs/2026-05-05-qw-suite-mai-design.md

Référence plan d'exécution

docs/superpowers/plans/2026-05-05-qw-suite-mai.md


- [ ] **Step 2: Mettre à jour MEMORY.md (ajouter une ligne d'index)**

Ajouter dans `/home/dom/.claude/projects/-home-dom-ai-rpa-vision-v3/memory/MEMORY.md`, dans une section appropriée (après les autres specs/sessions) :

```markdown
## ⭐ Sprint QW Suite Mai 2026 (multi-écrans + LoopDetector + safety_checks)
See [docs/QW_SUITE_MAI.md](../../../docs/QW_SUITE_MAI.md) — branche `feature/qw-suite-mai`,
3 modules serveur isolés + UI VWB. Kill-switches env vars sur QW2/QW4.
Spec : `docs/superpowers/specs/2026-05-05-qw-suite-mai-design.md`.
Plan : `docs/superpowers/plans/2026-05-05-qw-suite-mai.md`.
  • Step 3: Commit final docs
git add docs/QW_SUITE_MAI.md
git commit -m "$(cat <<'EOF'
docs(qw): synthèse de livraison QW suite mai 2026

Doc condensée des 3 quick wins livrés (QW1 multi-écrans, QW2 LoopDetector,
QW4 safety_checks hybrides) avec procédures kill-switch et rollback.

Pointe vers spec et plan d'exécution complets.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
git push gitea feature/qw-suite-mai

Récapitulatif des commits attendus

1.  docs(qw): spec design QW suite mai 2026 (DÉJÀ FAIT — commit 2a07d8084)
2.  feat(qw1): MonitorRouter — résolution écran cible
3.  feat(qw1): capture par monitor + propagation offsets dans grounding cascade
4.  feat(qw1): enrichissement Agent V1 + hook serveur api_stream
5.  feat(qw2): LoopDetector composite (3 signaux + kill-switch)
6.  feat(qw2): hook LoopDetector dans api_stream + extension replay_state
7.  feat(qw4): SafetyChecksProvider hybride déclaratif + LLM contextuel
8.  feat(qw4): hook safety_checks_provider + extension /replay/resume
9.  feat(vwb): PauseDialog + ChecklistPanel + extension PropertiesPanel
10. docs(qw): synthèse de livraison QW suite mai 2026

10 commits attendus (1 spec déjà fait + 9 features+docs).