From c82829f2bb153e109b37ae3b5c93ae24d5118cdd Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 29 Jun 2026 17:44:24 +0200 Subject: [PATCH] =?UTF-8?q?feat(server):=20R1=20=E2=80=94=20import=20auto?= =?UTF-8?q?=20du=20workflow=20appris=20vers=20la=20DB=20VWB=20(gated)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit finalize_session appelle _maybe_import_to_vwb : si RPA_R1_AUTO_IMPORT (OFF par défaut), le workflow appris est assaini (sanitize_workflow_dict) puis importé en DB VWB rejouable via le pont idempotent (import_core_workflow_to_db), dans un app-context VWB lazy mutualisé (vwb_db). NON bloquant : un échec n'interrompt jamais la finalisation. Rend l'appris rejouable sans geste manuel (R1). Tests : câblage du seam + gating du flag + non-régression. Co-Authored-By: Claude Opus 4.8 (1M context) --- agent_v0/server_v1/stream_processor.py | 41 ++++ ..._worker_imports_learned_workflow_to_vwb.py | 215 ++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 tests/integration/test_worker_imports_learned_workflow_to_vwb.py diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py index 898396ab6..8ccd82ba4 100644 --- a/agent_v0/server_v1/stream_processor.py +++ b/agent_v0/server_v1/stream_processor.py @@ -3066,6 +3066,8 @@ class StreamProcessor: saved_path = self._persist_workflow(workflow, session_id, machine_id=machine_id) # Stocker le machine_id dans le workflow pour le filtrage workflow._machine_id = machine_id + # R1 : import auto en DB VWB (rejouable) — gated RPA_R1_AUTO_IMPORT, non bloquant. + self._maybe_import_to_vwb(workflow, session_id, machine_id) # Récupérer les métadonnées applicatives de la session session_state = self.session_manager.get_session(session_id) @@ -4444,6 +4446,45 @@ class StreamProcessor: logger.error(f"Erreur sauvegarde workflow {session_id}: {e}") return None + def _import_workflow_to_vwb(self, workflow, session_id: str, machine_id: str) -> Dict[str, Any]: + """Importer le workflow appris dans la DB VWB rejouable (Maillon A / R1). + + Rend l'appris rejouable sans geste manuel, de façon idempotente (fusion + par signature de trajectoire). Suppose un app-context VWB actif fournissant + ``db.session`` (créé par l'appelant côté worker). + """ + from .pii_sanitizer import sanitize_workflow_dict + from services.learned_workflow_bridge import import_core_workflow_to_db + from db.models import db + # Assainir la PII (cibles OCR `by_text`, noms) avant dépôt en DB VWB. + core_dict = sanitize_workflow_dict(workflow.to_dict()) + return import_core_workflow_to_db( + core_dict, + machine_id=machine_id, + source_session_id=session_id, + db_session=db.session, + ) + + def _vwb_app_context(self): + """Couplage worker→DB VWB mutualisé (un seul pont, cf. vwb_db). + + Délègue au helper module ``vwb_db.vwb_app_context`` partagé entre R1 et + l'extraction métier — pas de duplication de l'app Flask/init_app. + """ + from .vwb_db import vwb_app_context + return vwb_app_context() + + def _maybe_import_to_vwb(self, workflow, session_id: str, machine_id: str) -> None: + """Import auto de l'appris en DB VWB, gated par RPA_R1_AUTO_IMPORT (OFF + par défaut) et NON bloquant : un échec ne casse jamais la finalisation.""" + if os.environ.get("RPA_R1_AUTO_IMPORT", "false").lower() not in ("true", "1", "yes"): + return + try: + with self._vwb_app_context(): + self._import_workflow_to_vwb(workflow, session_id, machine_id) + except Exception as e: + logger.warning("[R1] import VWB auto échoué (non bloquant): %s", e) + def _build_raw_session_fallback(self, session, raw_dict): """Construire un RawSession manuellement si from_dict échoue.""" from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext diff --git a/tests/integration/test_worker_imports_learned_workflow_to_vwb.py b/tests/integration/test_worker_imports_learned_workflow_to_vwb.py new file mode 100644 index 000000000..8200d734f --- /dev/null +++ b/tests/integration/test_worker_imports_learned_workflow_to_vwb.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +"""Test RED — Maillon A (R1) : câblage worker → DB VWB rejouable. + +Invariant ciblé (le VRAI trou du chantier apprentissage) : + quand le worker `finalize_session` produit un workflow appris, ce workflow + doit devenir **rejouable** en atterrissant dans la DB VWB, **sans geste + manuel** — et un 2e passage de la MÊME trajectoire ne crée PAS de doublon. + +État vérifié au moment d'écrire ce test : +- le pont `import_core_workflow_to_db` (services.learned_workflow_bridge) EXISTE + et est vert en isolation (idempotence par signature de trajectoire) ; +- MAIS le worker (`agent_v0/server_v1/stream_processor.py`) ne l'appelle JAMAIS : + `_persist_workflow` écrit le JSON sur disque, puis rien ne l'importe en DB VWB. + → les deux mondes (JSON appris ↔ DB VWB rejouable) restent disjoints. + +Ce test cible le **seam de câblage** manquant côté worker, sans exécuter le +chemin lourd de `finalize_session` (GraphBuilder / CLIP) : il appelle la méthode +de pont attendue `StreamProcessor._import_workflow_to_vwb(workflow, session_id, +machine_id)`. Cette méthode N'EXISTE PAS encore → le test échoue (RED) pour la +bonne raison : le câblage worker→VWB est absent. + +Câblage minimal proposé (NON appliqué ici) : + dans `finalize_session`, juste après `_persist_workflow` (≈ ligne 3066), ajouter + self._import_workflow_to_vwb(workflow, session_id, machine_id) + où `_import_workflow_to_vwb` : + 1. sérialise `workflow.to_dict()` ; + 2. ouvre un app-context VWB (db.session) ; + 3. délègue à `import_core_workflow_to_db(core_dict, machine_id=..., + source_session_id=..., db_session=db.session)`. +""" + +import sys +from pathlib import Path + +import pytest +from flask import Flask + +# --- Chemins : racine projet (core.*, agent_v0.*) + backend VWB (db.models, services.*) --- +_ROOT = Path(__file__).resolve().parents[2] # .../rpa_vision_v3 +_BACKEND = _ROOT / "visual_workflow_builder" / "backend" +for _p in (str(_ROOT), str(_BACKEND)): + if _p not in sys.path: + sys.path.insert(0, _p) + +from db.models import db, Workflow # noqa: E402 (modèles ORM VWB) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def vwb_db_app(): + """App Flask minimale liée à une SQLite VWB en mémoire (schéma créé).""" + app = Flask("test_worker_import_to_vwb") + app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" + app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + db.init_app(app) + with app.app_context(): + db.create_all() + yield app + db.session.remove() + db.drop_all() + + +class _FakeCoreWorkflow: + """Stub léger d'un workflow core produit par le worker. + + Seul le **contrat** importe ici : le worker détient un objet exposant + `workflow_id` et `to_dict()` (cf. `core.models.workflow_graph.Workflow`, + déjà sérialisé par `_persist_workflow` via `save_to_file`). On reproduit ce + contrat sans dépendre du constructeur dataclass core (constraints/ + post_conditions obligatoires) — la cible du test est le câblage, pas la + construction d'objet. Le dict renvoyé est exactement la forme que le pont + `convert_learned_to_vwb_steps` consomme (validé en isolation). + """ + + def __init__(self): + self.workflow_id = "wf_sess_bloc_notes_worker" + + def to_dict(self): + return { + "workflow_id": self.workflow_id, + # Nom porteur de PII clinique : l'import en DB VWB doit l'assainir + # (logiciel métier réel en préfixe, nom clinique structuré ensuite). + "name": "Gxd5diag - VIOLA (VIOLA) Liliane", + "entry_nodes": ["n1"], + "nodes": [ + {"node_id": "n1", "name": "Bureau"}, + {"node_id": "n2", "name": "Bloc-notes ouvert"}, + ], + "edges": [ + { + "edge_id": "e1", + "from_node": "n1", + "to_node": "n2", + "action": { + "type": "mouse_click", + "target": {"by_text": "Bloc-notes", "by_role": "ocr"}, + "parameters": {"button": "left"}, + }, + }, + ], + } + + +def _build_core_workflow(): + """Workflow core tel que vu par le worker (contrat `workflow_id` + `to_dict`).""" + return _FakeCoreWorkflow() + + +def _make_processor(): + """Instancie un StreamProcessor sans déclencher l'init lourde (CLIP/FAISS). + + On crée l'objet via __new__ : le test n'exerce QUE la méthode de câblage, + pas le pipeline complet. + """ + from agent_v0.server_v1.stream_processor import StreamProcessor + return StreamProcessor.__new__(StreamProcessor) + + +# --------------------------------------------------------------------------- +# Test RED — le câblage worker→VWB +# --------------------------------------------------------------------------- + +def test_finalized_workflow_becomes_replayable_in_vwb_db(vwb_db_app): + """Un workflow appris par le worker devient rejouable en DB VWB, + et un 2e import de la même trajectoire ne crée pas de doublon (idempotence).""" + processor = _make_processor() + workflow = _build_core_workflow() + + # --- Seam de câblage attendu (à implémenter côté worker) --- + # _import_workflow_to_vwb(workflow, session_id, machine_id) doit : + # - sérialiser workflow.to_dict() + # - importer en DB VWB via import_core_workflow_to_db (idempotent) + assert hasattr(processor, "_import_workflow_to_vwb"), ( + "Câblage R1 absent : StreamProcessor n'expose pas de pont vers la DB VWB. " + "Le workflow appris reste sur disque (JSON) et n'est jamais rejouable." + ) + + with vwb_db_app.app_context(): + first = processor._import_workflow_to_vwb( + workflow, + session_id="sess_bloc_notes_worker", + machine_id="DESKTOP-TEST_windows", + ) + # 1er import → workflow rejouable créé en DB VWB + assert Workflow.query.count() == 1 + created = Workflow.query.first() + assert created.source == "learned_import" + assert created.review_status == "pending_review" + assert (first or {}).get("created") is True + # PII : le nom patient ne doit jamais atterrir en clair dans la DB VWB + assert "VIOLA" not in created.name, created.name + + # 2e import de la MÊME trajectoire → pas de doublon (idempotence) + second = processor._import_workflow_to_vwb( + workflow, + session_id="sess_bloc_notes_worker_rerun", + machine_id="DESKTOP-TEST_windows", + ) + assert Workflow.query.count() == 1, "ré-import du même parcours = pas de doublon" + assert (second or {}).get("created") is False + assert (first or {}).get("workflow_id") == (second or {}).get("workflow_id") + + +# --------------------------------------------------------------------------- +# Activation prod (couplage worker→DB VWB) : gating par feature-flag +# --------------------------------------------------------------------------- + +def test_maybe_import_gated_off_par_defaut(monkeypatch): + """Sans RPA_R1_AUTO_IMPORT, l'import auto NE doit PAS se déclencher + (R1 reste inactif tant que le sanitizer n'est pas validé / GO Dom).""" + monkeypatch.delenv("RPA_R1_AUTO_IMPORT", raising=False) + processor = _make_processor() + appels = [] + monkeypatch.setattr(processor, "_import_workflow_to_vwb", + lambda *a, **k: appels.append(a), raising=False) + + processor._maybe_import_to_vwb(_build_core_workflow(), "sess", "machine") + + assert appels == [] # gated OFF : aucun import + + +def test_maybe_import_actif_si_flag(monkeypatch): + """Avec RPA_R1_AUTO_IMPORT=true, l'import est appelé dans l'app-context VWB.""" + import contextlib + monkeypatch.setenv("RPA_R1_AUTO_IMPORT", "true") + processor = _make_processor() + appels = [] + monkeypatch.setattr(processor, "_import_workflow_to_vwb", + lambda w, s, m: appels.append((s, m)), raising=False) + # neutralise la création réelle de l'app-context (testée au runtime) + monkeypatch.setattr(processor, "_vwb_app_context", + lambda: contextlib.nullcontext(), raising=False) + + processor._maybe_import_to_vwb(_build_core_workflow(), "sess-x", "machine-y") + + assert appels == [("sess-x", "machine-y")] + + +def test_maybe_import_ne_casse_pas_la_finalisation(monkeypatch): + """Un échec d'import VWB ne doit JAMAIS faire échouer la finalisation worker.""" + import contextlib + monkeypatch.setenv("RPA_R1_AUTO_IMPORT", "true") + processor = _make_processor() + monkeypatch.setattr(processor, "_vwb_app_context", + lambda: contextlib.nullcontext(), raising=False) + + def _boom(*a, **k): + raise RuntimeError("DB VWB indisponible") + monkeypatch.setattr(processor, "_import_workflow_to_vwb", _boom, raising=False) + + # ne doit pas lever + processor._maybe_import_to_vwb(_build_core_workflow(), "sess", "machine")