feat(extraction): handler extract_dossier + pont worker→DB VWB mutualisé (brique 3)

vwb_db.py : couplage worker→DB VWB lazy (app Flask sur instance/workflows.db)
mutualisé (R1 + extraction), + persist_extracted_dossier (grille → Job/Table/Field).
replay_engine.py : handler _handle_extract_dossier_action — lit le screenshot,
extrait une grille structurée, gate qualité conservatrice (complete|needs_review),
persiste avec preuve (screenshot_ref/bbox/confidence). N'échoue JAMAIS le replay.
Données patient EN CLAIR (canal extraction, non anonymisé).

Réserve : dispatch runtime (api_stream.py) non encore branché — étape suivante,
à coordonner. Brique 3/4 de la verticale extraction dossier patient.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-29 14:18:08 +02:00
parent 9883cad012
commit 13f760a3b9
3 changed files with 467 additions and 0 deletions

View File

@@ -40,6 +40,7 @@ _ALLOWED_ACTION_TYPES = {
"pause_for_human", # Pause supervisée explicite (interceptée par /replay/next)
"extract_text", # OCR serveur sur dernier heartbeat → variable workflow
"extract_table", # OCR serveur + filtre regex → liste structurée (boucle)
"extract_dossier", # OCR grille structurée → dossier patient persisté (brique 3)
"extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions
"_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll)
"t2a_decision", # Analyse LLM facturation T2A → variable workflow
@@ -53,6 +54,7 @@ _ALLOWED_ACTION_TYPES = {
_SERVER_SIDE_ACTION_TYPES = {
"extract_text",
"extract_table",
"extract_dossier",
"t2a_decision",
"llm_generate",
"_concat_text_vars",
@@ -2216,6 +2218,146 @@ def _handle_extract_table_action(
return bool(rows)
def _resolve_screenshot_path(replay_state: Dict[str, Any]) -> Optional[str]:
"""Résout le chemin du dernier screenshot (path disque ou base64 → temp).
Calque la source utilisée par extract_text/extract_table : priorité au
``last_screenshot`` (path ou data-URI base64). Retourne None si absent.
"""
raw_screenshot = replay_state.get("last_screenshot") or ""
if not raw_screenshot:
return None
if raw_screenshot.startswith("data:"):
try:
import base64 as _b64, tempfile
header, b64data = raw_screenshot.split(",", 1)
suffix = ".jpg" if "jpeg" in header else ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.write(_b64.b64decode(b64data))
tmp.close()
return tmp.name
except Exception as e:
logger.warning("extract_dossier: décodage base64 screenshot échoué: %s", e)
return None
if os.path.isfile(raw_screenshot):
return raw_screenshot
return None
def _gate_dossier_quality(
grid: List[List[Dict[str, Any]]],
*,
min_confidence: float,
expected_cols: Optional[int],
) -> str:
"""Gate qualité simple → 'complete' ou 'needs_review'.
'complete' SSI : grille non vide ET confiance médiane ≥ seuil ET (si
expected_cols fourni) au moins une ligne avec ce nombre de colonnes.
Sinon 'needs_review'. Volontairement conservatrice (default-review).
"""
confs = [
cell.get("confidence")
for row in grid for cell in row
if isinstance(cell.get("confidence"), (int, float))
]
if not confs:
return "needs_review"
confs.sort()
median = confs[len(confs) // 2]
if median < min_confidence:
return "needs_review"
if expected_cols is not None:
if not any(len(row) == expected_cols for row in grid):
return "needs_review"
return "complete"
def _handle_extract_dossier_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
session_id: str,
) -> bool:
"""Traite une action extract_dossier côté serveur (brique 3).
Lit le dernier screenshot, extrait une grille structurée via
``extract_grid_from_image``, applique une gate qualité, puis PERSISTE un
« dossier patient extrait » (Job/Table/Field) dans la DB VWB avec preuve
(screenshot_ref + screen_bbox + confidences). Le job_id est stocké dans
``replay_state["variables"][output_var]``.
Paramètres reconnus (action.parameters) :
output_var : nom de variable runtime (default "extracted_dossier")
patient_ref : référence patient EN CLAIR (volontaire) — non tokenisée
region : (x, y, w, h) px pour cropper avant OCR (None = plein)
min_confidence : seuil de confiance médiane pour 'complete' (default 0.6)
expected_cols : nb de colonnes attendu (optionnel) pour la gate
N'ÉCHOUE JAMAIS le replay : toute erreur → log + needs_review.
Retourne True SSI le dossier est persisté avec statut 'complete'.
"""
params = action.get("parameters") or {}
output_var = (params.get("output_var") or params.get("variable_name") or "extracted_dossier").strip()
patient_ref = params.get("patient_ref")
region = params.get("region") or None
try:
min_confidence = float(params.get("min_confidence", 0.6))
except (TypeError, ValueError):
min_confidence = 0.6
expected_cols = params.get("expected_cols")
if isinstance(expected_cols, str):
try:
expected_cols = int(expected_cols)
except ValueError:
expected_cols = None
job_id = ""
status = "needs_review"
try:
path = _resolve_screenshot_path(replay_state)
grid: List[List[Dict[str, Any]]] = []
if path:
from core.llm import extract_grid_from_image
grid = extract_grid_from_image(
path, region=tuple(region) if region else None
)
else:
logger.warning(
"extract_dossier : pas de screenshot pour session %s — needs_review",
session_id,
)
status = _gate_dossier_quality(
grid, min_confidence=min_confidence, expected_cols=expected_cols
)
from . import vwb_db
with vwb_db.vwb_app_context():
job_id = vwb_db.persist_extracted_dossier(
grid,
patient_ref=patient_ref,
source_session_id=session_id,
screenshot_ref=path,
screen_bbox=({"x": region[0], "y": region[1], "width": region[2], "height": region[3]}
if region and len(region) == 4 else None),
status=status,
)
except Exception as e:
# Ne JAMAIS échouer le replay : on log, on marque needs_review.
logger.warning(
"extract_dossier : échec persistance (%s) — needs_review, replay %s",
e, replay_state.get("replay_id", "?"),
)
status = "needs_review"
replay_state.setdefault("variables", {})[output_var] = job_id
logger.info(
"extract_dossier → variable '%s' job=%s statut=%s replay %s",
output_var, job_id or "?", status, replay_state.get("replay_id", "?"),
)
return status == "complete"
def _handle_t2a_decision_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],

View File

@@ -0,0 +1,106 @@
"""Couplage worker → DB VWB (mutualisé) + persistance « dossier patient extrait ».
Le worker/serveur streaming est un process distinct du backend VWB : il n'a
pas d'app Flask en mémoire. Ce module fournit :
- ``vwb_app_context()`` : un app-context Flask lazy (singleton module) lié au
fichier SQLite VWB ``visual_workflow_builder/backend/instance/workflows.db``,
avec ``db.init_app`` (db de ``db.models``). Réutilisable par tout module
serveur qui doit écrire dans la DB VWB (R1, extraction métier, …).
- ``persist_extracted_dossier(...)`` : depuis une grille OCR
(``List[List[cell]]``), crée ExtractionJob → ExtractedTable → ExtractedField
et commit. Suppose un app-context actif (comme le pont R1 existant).
⚠️ CANAL EXTRACTION = données patient EN CLAIR (volontaire) : aucune
tokenisation/assainissement PII ici (cf. note dans db/models.py).
"""
import sys
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional
# Ajout du backend VWB au sys.path à l'import → rend ``db.models`` importable
# (couplage worker→DB VWB mutualisé ; identique au pattern stream_processor).
_VWB_BACKEND = Path(__file__).resolve().parents[2] / "visual_workflow_builder" / "backend"
if str(_VWB_BACKEND) not in sys.path:
sys.path.insert(0, str(_VWB_BACKEND))
# App Flask lazy (singleton module) — un seul db.init_app pour tout le process.
_vwb_app = None
@contextmanager
def vwb_app_context():
"""App-context Flask VWB (lazy singleton) sur instance/workflows.db.
À utiliser via ``with vwb_app_context(): ...`` autour des appels qui
nécessitent ``db.session`` (ex. persist_extracted_dossier).
"""
global _vwb_app
if _vwb_app is None:
from flask import Flask
from db.models import db
db_path = _VWB_BACKEND / "instance" / "workflows.db"
app = Flask("worker_vwb")
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
_vwb_app = app
with _vwb_app.app_context():
yield
def persist_extracted_dossier(
grid: List[List[Dict[str, Any]]],
*,
patient_ref: Optional[str],
source_session_id: Optional[str],
screenshot_ref: Optional[str],
screen_bbox: Optional[Dict[str, Any]],
status: str,
) -> str:
"""Persiste un « dossier patient extrait » et retourne le job_id.
Crée 1 ExtractionJob → 1 ExtractedTable → N ExtractedField (une par
cellule de la grille), puis commit. Suppose un app-context VWB actif
(fourni par ``vwb_app_context()`` ou par l'appelant, comme le pont R1).
⚠️ ``patient_ref`` et ``cell["text"]`` sont stockés EN CLAIR (volontaire) :
le but est de constituer le dossier, pas d'anonymiser.
"""
from db.models import db, ExtractionJob, ExtractedTable, ExtractedField
job = ExtractionJob(
id=uuid.uuid4().hex,
patient_ref=patient_ref,
source_session_id=source_session_id,
status=status,
)
db.session.add(job)
table = ExtractedTable(
id=uuid.uuid4().hex,
job_id=job.id,
screen_bbox=screen_bbox,
screenshot_ref=screenshot_ref,
)
db.session.add(table)
for row in grid or []:
for cell in row or []:
db.session.add(ExtractedField(
id=uuid.uuid4().hex,
table_id=table.id,
row=cell.get("row"),
col=cell.get("col"),
value=cell.get("text"),
bbox=cell.get("bbox"),
confidence=cell.get("confidence"),
))
db.session.commit()
return job.id

View File

@@ -0,0 +1,219 @@
"""Tests TDD — Extraction « dossier patient » (brique 3).
Deux couches testées :
1. ``vwb_db.persist_extracted_dossier`` : depuis une grille OCR
(List[List[cell]]), crée ExtractionJob → ExtractedTable → ExtractedField
et commit. Testé sur SQLite mémoire via un app-context Flask jetable
(PAS la vraie DB VWB — isolation).
2. ``replay_engine._handle_extract_dossier_action`` : lit last_screenshot,
appelle ``extract_grid_from_image`` (mocké), applique la gate qualité
(complete / needs_review), persiste via vwb_db et n'échoue JAMAIS le
replay (grille vide → needs_review, sans lever).
⚠️ Canal extraction = données patient EN CLAIR (volontaire) : on vérifie
que les valeurs sont persistées telles quelles, sans tokenisation.
"""
import pytest
from flask import Flask
# vwb_db ajoute visual_workflow_builder/backend au sys.path à l'import →
# doit précéder l'import de db.models (couplage worker→DB VWB mutualisé).
import agent_v0.server_v1.vwb_db as vwb_db
import agent_v0.server_v1.replay_engine as replay_engine
from db.models import db, ExtractionJob, ExtractedTable, ExtractedField
# ---------------------------------------------------------------------------
# Fixtures : app Flask jetable sur SQLite mémoire (isolation totale)
# ---------------------------------------------------------------------------
@pytest.fixture
def mem_app():
"""App Flask minimale liée à une DB SQLite en mémoire."""
app = Flask("test_extract_dossier")
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
with app.app_context():
db.create_all()
yield app
def _grid_2x2():
"""Grille connue 2×2 (confiances hautes)."""
return [
[
{"text": "Nom", "bbox": [[0, 0], [1, 0], [1, 1], [0, 1]], "confidence": 0.95, "row": 0, "col": 0},
{"text": "MOREL", "bbox": [[2, 0], [3, 0], [3, 1], [2, 1]], "confidence": 0.92, "row": 0, "col": 1},
],
[
{"text": "IPP", "bbox": [[0, 2], [1, 2], [1, 3], [0, 3]], "confidence": 0.90, "row": 1, "col": 0},
{"text": "25123456", "bbox": [[2, 2], [3, 2], [3, 3], [2, 3]], "confidence": 0.88, "row": 1, "col": 1},
],
]
# ---------------------------------------------------------------------------
# 1) persist_extracted_dossier
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_persist_extracted_dossier_creates_job_table_fields(mem_app):
job_id = vwb_db.persist_extracted_dossier(
_grid_2x2(),
patient_ref="MOREL Catherine",
source_session_id="sess-42",
screenshot_ref="/captures/last.png",
screen_bbox={"x": 0, "y": 0, "width": 800, "height": 600},
status="complete",
)
assert isinstance(job_id, str) and job_id
job = db.session.get(ExtractionJob, job_id)
assert job is not None
assert job.status == "complete"
assert job.patient_ref == "MOREL Catherine" # EN CLAIR, non tokenisé
assert job.source_session_id == "sess-42"
tables = ExtractedTable.query.filter_by(job_id=job_id).all()
assert len(tables) == 1
assert tables[0].screenshot_ref == "/captures/last.png"
assert tables[0].screen_bbox == {"x": 0, "y": 0, "width": 800, "height": 600}
fields = ExtractedField.query.filter_by(table_id=tables[0].id).all()
assert len(fields) == 4 # 2×2 cellules
values = {(f.row, f.col): f.value for f in fields}
assert values[(0, 1)] == "MOREL" # valeur patient EN CLAIR conservée
assert values[(1, 1)] == "25123456"
confs = {(f.row, f.col): f.confidence for f in fields}
assert confs[(0, 0)] == pytest.approx(0.95)
@pytest.mark.unit
def test_persist_extracted_dossier_empty_grid_still_creates_job(mem_app):
"""Grille vide → Job + Table sans Field (statut transmis tel quel)."""
job_id = vwb_db.persist_extracted_dossier(
[],
patient_ref=None,
source_session_id="sess-empty",
screenshot_ref="/captures/empty.png",
screen_bbox=None,
status="needs_review",
)
job = db.session.get(ExtractionJob, job_id)
assert job is not None and job.status == "needs_review"
tables = ExtractedTable.query.filter_by(job_id=job_id).all()
assert len(tables) == 1
assert ExtractedField.query.filter_by(table_id=tables[0].id).count() == 0
# ---------------------------------------------------------------------------
# 2) _handle_extract_dossier_action
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_handle_extract_dossier_complete(mem_app, monkeypatch, tmp_path):
# screenshot bidon sur disque (le mock OCR ignore le contenu)
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
# extract_grid_from_image mocké → grille 2×2 de confiance haute
monkeypatch.setattr(
"core.llm.extract_grid_from_image",
lambda *a, **k: _grid_2x2(),
)
# vwb_app_context pointé sur l'app mémoire de la fixture
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
monkeypatch.setattr(replay_engine, "vwb_db", vwb_db, raising=False)
replay_state = {
"last_screenshot": str(shot),
"variables": {},
"replay_id": "rep-1",
}
action = {
"type": "extract_dossier",
"parameters": {
"output_var": "dossier_id",
"patient_ref": "MOREL Catherine",
"expected_cols": 2,
"min_confidence": 0.5,
},
}
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-42")
assert ok is True
job_id = replay_state["variables"]["dossier_id"]
assert isinstance(job_id, str) and job_id
with mem_app.app_context():
job = db.session.get(ExtractionJob, job_id)
assert job is not None
assert job.status == "complete" # gate OK : non vide, conf ok, 2 cols
@pytest.mark.unit
def test_handle_extract_dossier_low_confidence_needs_review(mem_app, monkeypatch, tmp_path):
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
low_grid = [
[{"text": "x", "bbox": [], "confidence": 0.10, "row": 0, "col": 0}],
]
monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: low_grid)
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-2"}
action = {"type": "extract_dossier", "parameters": {"min_confidence": 0.5}}
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-low")
assert ok is False # gate a basculé en needs_review
job_id = replay_state["variables"]["extracted_dossier"]
with mem_app.app_context():
assert db.session.get(ExtractionJob, job_id).status == "needs_review"
@pytest.mark.unit
def test_handle_extract_dossier_empty_grid_no_raise(mem_app, monkeypatch, tmp_path):
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: [])
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-3"}
action = {"type": "extract_dossier", "parameters": {}}
# Ne lève jamais ; grille vide → needs_review
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-empty")
assert ok is False
job_id = replay_state["variables"]["extracted_dossier"]
with mem_app.app_context():
assert db.session.get(ExtractionJob, job_id).status == "needs_review"
@pytest.mark.unit
def test_handle_extract_dossier_persist_failure_no_raise(mem_app, monkeypatch, tmp_path):
"""Si la persistance lève, le handler log et n'échoue PAS le replay."""
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: _grid_2x2())
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
def _boom(*a, **k):
raise RuntimeError("DB down")
monkeypatch.setattr(vwb_db, "persist_extracted_dossier", _boom)
replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-4"}
action = {"type": "extract_dossier", "parameters": {}}
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-boom")
assert ok is False # jamais de raise
@pytest.mark.unit
def test_extract_dossier_declared_in_action_type_sets():
assert "extract_dossier" in replay_engine._ALLOWED_ACTION_TYPES
assert "extract_dossier" in replay_engine._SERVER_SIDE_ACTION_TYPES