feat(vwb): pont R1 import idempotent core→DB par signature trajectoire

Add import_core_workflow_to_db() — create-or-update par signature de
trajectoire (décision produit Dom 23/06). Les workflows source='manual'
sont exclus du filtre de fusion. Inclut test TDD idempotent (ré-import
2× → toujours 1 seul workflow).
This commit is contained in:
Dom
2026-07-02 13:01:33 +02:00
parent 9a8242add5
commit ebed4d7546
3 changed files with 395 additions and 0 deletions

View File

@@ -295,6 +295,175 @@ def convert_learned_to_vwb_steps(
return workflow_meta, steps, warnings
# ---------------------------------------------------------------------------
# Pont R1 — import IDEMPOTENT d'un workflow core en DB VWB (create-or-update)
# ---------------------------------------------------------------------------
# Marqueur stable de signature de trajectoire embarqué dans `Workflow.description`.
# Le modèle `Workflow` n'a PAS (encore) de colonne dédiée ; on réutilise donc le
# même mécanisme que la route GET /learned-workflows existante, qui détecte les
# imports via `description.contains(...)`. La clé d'idempotence est la SIGNATURE
# DE TRAJECTOIRE (cf. core.execution.trajectory_signature), pas le workflow_id de
# session (qui change à chaque ré-apprentissage du même parcours).
_TRAJ_SIG_MARKER = "[traj_sig:"
def _trajectory_signature_marker(signature: str) -> str:
"""Marqueur texte stable à embarquer dans la description."""
return f"{_TRAJ_SIG_MARKER}{signature}]"
def _find_existing_learned_workflow(db_session, signature: str):
"""Cherche un Workflow `source='learned_import'` de MÊME signature de trajectoire.
Ne considère QUE les imports appris : les workflows `source='manual'`
(démo Urgence_aiva, etc.) sont volontairement exclus du filtre et donc
jamais candidats à la mise à jour.
"""
from db.models import Workflow # import paresseux (modèles liés au runtime VWB)
marker = _trajectory_signature_marker(signature)
return (
db_session.query(Workflow)
.filter(
Workflow.source == "learned_import",
Workflow.description.contains(marker),
)
.first()
)
def import_core_workflow_to_db(
core_dict: Dict[str, Any],
*,
machine_id: str,
source_session_id: str,
db_session,
) -> Dict[str, Any]:
"""Importe un workflow core (JSON appris par Léa) en DB VWB, de façon IDEMPOTENTE.
Fusion par **signature de trajectoire** (décision produit Dom 23/06) :
1. calcule `sig = workflow_trajectory_signature(core_dict)` ;
2. cherche un `Workflow` `source='learned_import'` de même signature ;
3. si trouvé → **skip** (pas de doublon, le workflow existant fait foi) ;
sinon → crée `Workflow` + `Step`(s) via `convert_learned_to_vwb_steps`.
Le nouveau workflow est marqué `source='learned_import'`,
`review_status='pending_review'`. Les workflows `source='manual'` ne sont
JAMAIS touchés (cf. `_find_existing_learned_workflow`).
Args:
core_dict: workflow core (dict JSON) tel qu'appris/sauvegardé.
machine_id: poste d'origine (traçabilité, stocké en tag/description).
source_session_id: session ayant produit ce workflow (traçabilité).
db_session: session SQLAlchemy (l'app appelante détient le contexte).
Returns:
dict {created: bool, workflow_id: str, signature: str, warnings: list}.
`created=False` quand un workflow de même trajectoire existait déjà.
Note (non-wiring) : cette unité n'est PAS branchée au worker live ni à la
route HTTP existante ; voir le rapport de câblage R1.
"""
# Imports paresseux : garde le module léger et évite un import core/DB au load.
from core.execution.trajectory_signature import workflow_trajectory_signature
from db.models import Workflow, Step
signature = workflow_trajectory_signature(core_dict)
# --- Idempotence : même trajectoire déjà importée ? → skip (pas de doublon) ---
existing = _find_existing_learned_workflow(db_session, signature)
if existing is not None:
logger.info(
"Workflow appris déjà présent (signature %s…) → import ignoré, "
"réutilisation de %s",
signature[:12],
existing.id,
)
return {
"created": False,
"workflow_id": existing.id,
"signature": signature,
"warnings": [],
}
# --- Création : conversion core → steps VWB, puis écriture DB ---
wf_meta, steps_list, warnings = convert_learned_to_vwb_steps(core_dict)
current_name = (wf_meta.get("name") or "").strip()
if current_name.lower() in {"", "unnamed workflow", "workflow importé"}:
# Réutilise la dérivation de nom de la route HTTP si disponible.
try:
from api_v3.learned_workflows import _derive_default_name
wf_meta["name"] = _derive_default_name(core_dict)
except Exception: # pragma: no cover - fallback minimal
wf_meta["name"] = f"Léa import — {datetime.now():%Y-%m-%d %H:%M}"
wf_id = f"wf_{uuid.uuid4().hex[:12]}"
# La signature est embarquée dans la description (clé d'idempotence) + une
# ligne de traçabilité (workflow core d'origine).
base_desc = (wf_meta.get("description") or "").strip()
description = "\n\n".join(
part
for part in (
base_desc,
f"[Importé depuis workflow appris: {core_dict.get('workflow_id', '')}]",
_trajectory_signature_marker(signature),
)
if part
)
workflow = Workflow(
id=wf_id,
name=wf_meta["name"],
description=description,
source="learned_import",
review_status="pending_review",
)
# Tags : conserver ceux du workflow + traçabilité machine/session.
tags = list(wf_meta.get("tags") or [])
tags.extend([f"machine:{machine_id}", f"session:{source_session_id}"])
workflow.tags = tags
db_session.add(workflow)
for step_data in steps_list:
step = Step(
id=f"step_{uuid.uuid4().hex[:12]}",
workflow_id=wf_id,
action_type=step_data["action_type"],
order=step_data["order"],
position_x=step_data.get("position_x", 0),
position_y=step_data.get("position_y", 0),
label=step_data.get("label", step_data["action_type"]),
)
params = dict(step_data.get("parameters", {}))
# L'image d'ancre (_anchor_image_base64) est laissée dans params : la
# persistance d'ancre (VisualAnchor + fichier) reste pilotée par la route
# HTTP existante. Cette unité se concentre sur l'idempotence Workflow/Step.
step.parameters = params
db_session.add(step)
db_session.commit()
logger.info(
"Workflow appris importé (R1) : %s (signature %s…, %d étapes, machine %s)",
wf_id,
signature[:12],
len(steps_list),
machine_id,
)
return {
"created": True,
"workflow_id": wf_id,
"signature": signature,
"warnings": warnings,
}
def _convert_compound_substep(
sub_type: str, sub: Dict[str, Any], parent_target: Dict[str, Any]
) -> Tuple[str, Dict[str, Any]]:

View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Test TDD — pont R1 : `import_core_workflow_to_db` IDEMPOTENT.
Objectif chantier R1 : une session auto-apprise (workflow core JSON) doit pouvoir
être (ré)importée en DB VWB **sans créer de doublon**. La fusion se fait par
**signature de trajectoire** (cf. `core.execution.trajectory_signature`) — décision
produit Dom 23/06 : create-or-update, pas create-only.
Coeur du test (b) : ré-importer le MÊME core_dict 2× → toujours UN seul workflow.
Ce module est volontairement isolé du chemin live :
- il ne démarre PAS l'app Flask complète (`app.py`) ;
- il lie le `db` partagé (`db.models.db`) à une SQLite **en mémoire** via une
app Flask minimale, même pattern que `tests/conftest.py` mais sans dépendances
lourdes (pas de socketio, pas de blueprints).
"""
import sys
from pathlib import Path
import pytest
from flask import Flask
# --- Chemins : racine projet (pour core.*) + backend (pour db.models, services.*) ---
_BACKEND = Path(__file__).resolve().parent.parent.parent # .../visual_workflow_builder/backend
_ROOT = _BACKEND.parent.parent # .../rpa_vision_v3
for p in (str(_ROOT), str(_BACKEND)):
if p not in sys.path:
sys.path.insert(0, p)
from db.models import db, Workflow, Step # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures DB en mémoire (app Flask minimale, db partagé)
# ---------------------------------------------------------------------------
@pytest.fixture
def db_app():
"""App Flask minimale liée à une SQLite en mémoire, schéma créé."""
app = Flask("test_import_core_workflow")
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
# ---------------------------------------------------------------------------
# Fixtures de workflows core (format JSON appris par Léa)
# ---------------------------------------------------------------------------
def _core_workflow_bloc_notes() -> dict:
"""Workflow core minimal : ouvrir Bloc-notes et saisir du texte."""
return {
"workflow_id": "wf_sess_bloc_notes_001",
"name": "Léa Bloc-notes",
"entry_nodes": ["n1"],
"nodes": [
{"node_id": "n1", "name": "Bureau"},
{"node_id": "n2", "name": "Bloc-notes ouvert"},
],
"edges": [
{
"edge_id": "e1",
"from_node": "n1",
"to_node": "n2",
"action": {
"type": "mouse_click",
"target": {"by_text": "Bloc-notes", "by_role": "ocr"},
"parameters": {"button": "left"},
},
},
{
"edge_id": "e2",
"from_node": "n2",
"to_node": "n2",
"action": {
"type": "text_input",
"target": {"by_text": "zone de saisie"},
"parameters": {"text": "bonjour"},
},
},
],
}
def _core_workflow_calculatrice() -> dict:
"""Workflow core d'une trajectoire DIFFÉRENTE (calculatrice)."""
return {
"workflow_id": "wf_sess_calc_002",
"name": "Léa Calculatrice",
"entry_nodes": ["n1"],
"nodes": [
{"node_id": "n1", "name": "Bureau"},
{"node_id": "n2", "name": "Calculatrice"},
],
"edges": [
{
"edge_id": "e1",
"from_node": "n1",
"to_node": "n2",
"action": {
"type": "mouse_click",
"target": {"by_text": "Calculatrice", "by_role": "ocr"},
"parameters": {"button": "left"},
},
},
],
}
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_import_creates_workflow_with_steps(db_app):
"""(a) Un core_dict → 1 workflow VWB créé, avec ses steps."""
from services.learned_workflow_bridge import import_core_workflow_to_db
with db_app.app_context():
result = import_core_workflow_to_db(
_core_workflow_bloc_notes(),
machine_id="DESKTOP-TEST_windows",
source_session_id="sess_bloc_notes_001",
db_session=db.session,
)
assert result["created"] is True
wf_id = result["workflow_id"]
assert wf_id
wf = Workflow.query.get(wf_id)
assert wf is not None
assert wf.source == "learned_import"
assert wf.review_status == "pending_review"
steps = Step.query.filter_by(workflow_id=wf_id).all()
assert len(steps) >= 1, "le workflow importé doit avoir au moins une étape"
def test_reimport_same_workflow_is_idempotent(db_app):
"""(b) COEUR — ré-importer le MÊME core_dict 2× → toujours 1 seul workflow."""
from services.learned_workflow_bridge import import_core_workflow_to_db
with db_app.app_context():
first = import_core_workflow_to_db(
_core_workflow_bloc_notes(),
machine_id="DESKTOP-TEST_windows",
source_session_id="sess_bloc_notes_001",
db_session=db.session,
)
second = import_core_workflow_to_db(
_core_workflow_bloc_notes(),
machine_id="DESKTOP-TEST_windows",
source_session_id="sess_bloc_notes_001_rerun",
db_session=db.session,
)
# UN seul workflow en DB malgré deux imports
assert Workflow.query.count() == 1, "ré-import du même parcours = pas de doublon"
# Le second pointe vers le même workflow, marqué non-créé
assert first["workflow_id"] == second["workflow_id"]
assert second["created"] is False
def test_different_trajectories_create_two_workflows(db_app):
"""(c) Deux trajectoires différentes → 2 workflows distincts."""
from services.learned_workflow_bridge import import_core_workflow_to_db
with db_app.app_context():
r1 = import_core_workflow_to_db(
_core_workflow_bloc_notes(),
machine_id="DESKTOP-TEST_windows",
source_session_id="sess_a",
db_session=db.session,
)
r2 = import_core_workflow_to_db(
_core_workflow_calculatrice(),
machine_id="DESKTOP-TEST_windows",
source_session_id="sess_b",
db_session=db.session,
)
assert Workflow.query.count() == 2
assert r1["workflow_id"] != r2["workflow_id"]
def test_manual_workflow_is_never_touched(db_app):
"""(d) Un workflow source='manual' préexistant n'est jamais modifié.
Même si, par construction, il partageait la signature d'un parcours importé,
la fonction ne doit cibler QUE les workflows source='learned_import'.
"""
from services.learned_workflow_bridge import import_core_workflow_to_db
with db_app.app_context():
# Workflow manuel préexistant (démo Urgence_aiva) — intouchable
manual = Workflow(
id="wf_manual_demo",
name="Urgence_aiva_demo",
description="Démo manuelle critique",
source="manual",
review_status="approved",
)
db.session.add(manual)
db.session.commit()
manual_name_before = manual.name
manual_review_before = manual.review_status
import_core_workflow_to_db(
_core_workflow_bloc_notes(),
machine_id="DESKTOP-TEST_windows",
source_session_id="sess_x",
db_session=db.session,
)
manual_after = Workflow.query.get("wf_manual_demo")
assert manual_after.name == manual_name_before
assert manual_after.review_status == manual_review_before
assert manual_after.source == "manual"