diff --git a/agent_v0/server_v1/replay_engine.py b/agent_v0/server_v1/replay_engine.py index 6a3cb9220..3c08b2eaf 100644 --- a/agent_v0/server_v1/replay_engine.py +++ b/agent_v0/server_v1/replay_engine.py @@ -40,6 +40,7 @@ _ALLOWED_ACTION_TYPES = { "pause_for_human", # Pause supervisée explicite (interceptée par /replay/next) "extract_text", # OCR serveur sur dernier heartbeat → variable workflow "extract_table", # OCR serveur + filtre regex → liste structurée (boucle) + "extract_dossier", # OCR grille structurée → dossier patient persisté (brique 3) "extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions "_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll) "t2a_decision", # Analyse LLM facturation T2A → variable workflow @@ -53,6 +54,7 @@ _ALLOWED_ACTION_TYPES = { _SERVER_SIDE_ACTION_TYPES = { "extract_text", "extract_table", + "extract_dossier", "t2a_decision", "llm_generate", "_concat_text_vars", @@ -2216,6 +2218,146 @@ def _handle_extract_table_action( return bool(rows) +def _resolve_screenshot_path(replay_state: Dict[str, Any]) -> Optional[str]: + """Résout le chemin du dernier screenshot (path disque ou base64 → temp). + + Calque la source utilisée par extract_text/extract_table : priorité au + ``last_screenshot`` (path ou data-URI base64). Retourne None si absent. + """ + raw_screenshot = replay_state.get("last_screenshot") or "" + if not raw_screenshot: + return None + if raw_screenshot.startswith("data:"): + try: + import base64 as _b64, tempfile + header, b64data = raw_screenshot.split(",", 1) + suffix = ".jpg" if "jpeg" in header else ".png" + tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) + tmp.write(_b64.b64decode(b64data)) + tmp.close() + return tmp.name + except Exception as e: + logger.warning("extract_dossier: décodage base64 screenshot échoué: %s", e) + return None + if os.path.isfile(raw_screenshot): + return raw_screenshot + return None + + +def _gate_dossier_quality( + grid: List[List[Dict[str, Any]]], + *, + min_confidence: float, + expected_cols: Optional[int], +) -> str: + """Gate qualité simple → 'complete' ou 'needs_review'. + + 'complete' SSI : grille non vide ET confiance médiane ≥ seuil ET (si + expected_cols fourni) au moins une ligne avec ce nombre de colonnes. + Sinon 'needs_review'. Volontairement conservatrice (default-review). + """ + confs = [ + cell.get("confidence") + for row in grid for cell in row + if isinstance(cell.get("confidence"), (int, float)) + ] + if not confs: + return "needs_review" + confs.sort() + median = confs[len(confs) // 2] + if median < min_confidence: + return "needs_review" + if expected_cols is not None: + if not any(len(row) == expected_cols for row in grid): + return "needs_review" + return "complete" + + +def _handle_extract_dossier_action( + action: Dict[str, Any], + replay_state: Dict[str, Any], + session_id: str, +) -> bool: + """Traite une action extract_dossier côté serveur (brique 3). + + Lit le dernier screenshot, extrait une grille structurée via + ``extract_grid_from_image``, applique une gate qualité, puis PERSISTE un + « dossier patient extrait » (Job/Table/Field) dans la DB VWB avec preuve + (screenshot_ref + screen_bbox + confidences). Le job_id est stocké dans + ``replay_state["variables"][output_var]``. + + Paramètres reconnus (action.parameters) : + output_var : nom de variable runtime (default "extracted_dossier") + patient_ref : référence patient EN CLAIR (volontaire) — non tokenisée + region : (x, y, w, h) px pour cropper avant OCR (None = plein) + min_confidence : seuil de confiance médiane pour 'complete' (default 0.6) + expected_cols : nb de colonnes attendu (optionnel) pour la gate + + N'ÉCHOUE JAMAIS le replay : toute erreur → log + needs_review. + Retourne True SSI le dossier est persisté avec statut 'complete'. + """ + params = action.get("parameters") or {} + output_var = (params.get("output_var") or params.get("variable_name") or "extracted_dossier").strip() + patient_ref = params.get("patient_ref") + region = params.get("region") or None + try: + min_confidence = float(params.get("min_confidence", 0.6)) + except (TypeError, ValueError): + min_confidence = 0.6 + expected_cols = params.get("expected_cols") + if isinstance(expected_cols, str): + try: + expected_cols = int(expected_cols) + except ValueError: + expected_cols = None + + job_id = "" + status = "needs_review" + try: + path = _resolve_screenshot_path(replay_state) + grid: List[List[Dict[str, Any]]] = [] + if path: + from core.llm import extract_grid_from_image + grid = extract_grid_from_image( + path, region=tuple(region) if region else None + ) + else: + logger.warning( + "extract_dossier : pas de screenshot pour session %s — needs_review", + session_id, + ) + + status = _gate_dossier_quality( + grid, min_confidence=min_confidence, expected_cols=expected_cols + ) + + from . import vwb_db + with vwb_db.vwb_app_context(): + job_id = vwb_db.persist_extracted_dossier( + grid, + patient_ref=patient_ref, + source_session_id=session_id, + screenshot_ref=path, + screen_bbox=({"x": region[0], "y": region[1], "width": region[2], "height": region[3]} + if region and len(region) == 4 else None), + status=status, + ) + except Exception as e: + # Ne JAMAIS échouer le replay : on log, on marque needs_review. + logger.warning( + "extract_dossier : échec persistance (%s) — needs_review, replay %s", + e, replay_state.get("replay_id", "?"), + ) + status = "needs_review" + + replay_state.setdefault("variables", {})[output_var] = job_id + logger.info( + "extract_dossier → variable '%s' job=%s statut=%s replay %s", + output_var, job_id or "?", status, replay_state.get("replay_id", "?"), + ) + return status == "complete" + + def _handle_t2a_decision_action( action: Dict[str, Any], replay_state: Dict[str, Any], diff --git a/agent_v0/server_v1/vwb_db.py b/agent_v0/server_v1/vwb_db.py new file mode 100644 index 000000000..83cd91a5a --- /dev/null +++ b/agent_v0/server_v1/vwb_db.py @@ -0,0 +1,106 @@ +"""Couplage worker → DB VWB (mutualisé) + persistance « dossier patient extrait ». + +Le worker/serveur streaming est un process distinct du backend VWB : il n'a +pas d'app Flask en mémoire. Ce module fournit : + +- ``vwb_app_context()`` : un app-context Flask lazy (singleton module) lié au + fichier SQLite VWB ``visual_workflow_builder/backend/instance/workflows.db``, + avec ``db.init_app`` (db de ``db.models``). Réutilisable par tout module + serveur qui doit écrire dans la DB VWB (R1, extraction métier, …). + +- ``persist_extracted_dossier(...)`` : depuis une grille OCR + (``List[List[cell]]``), crée ExtractionJob → ExtractedTable → ExtractedField + et commit. Suppose un app-context actif (comme le pont R1 existant). + +⚠️ CANAL EXTRACTION = données patient EN CLAIR (volontaire) : aucune +tokenisation/assainissement PII ici (cf. note dans db/models.py). +""" + +import sys +import uuid +from contextlib import contextmanager +from pathlib import Path +from typing import Any, Dict, List, Optional + +# Ajout du backend VWB au sys.path à l'import → rend ``db.models`` importable +# (couplage worker→DB VWB mutualisé ; identique au pattern stream_processor). +_VWB_BACKEND = Path(__file__).resolve().parents[2] / "visual_workflow_builder" / "backend" +if str(_VWB_BACKEND) not in sys.path: + sys.path.insert(0, str(_VWB_BACKEND)) + +# App Flask lazy (singleton module) — un seul db.init_app pour tout le process. +_vwb_app = None + + +@contextmanager +def vwb_app_context(): + """App-context Flask VWB (lazy singleton) sur instance/workflows.db. + + À utiliser via ``with vwb_app_context(): ...`` autour des appels qui + nécessitent ``db.session`` (ex. persist_extracted_dossier). + """ + global _vwb_app + if _vwb_app is None: + from flask import Flask + from db.models import db + + db_path = _VWB_BACKEND / "instance" / "workflows.db" + app = Flask("worker_vwb") + app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}" + app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + db.init_app(app) + _vwb_app = app + with _vwb_app.app_context(): + yield + + +def persist_extracted_dossier( + grid: List[List[Dict[str, Any]]], + *, + patient_ref: Optional[str], + source_session_id: Optional[str], + screenshot_ref: Optional[str], + screen_bbox: Optional[Dict[str, Any]], + status: str, +) -> str: + """Persiste un « dossier patient extrait » et retourne le job_id. + + Crée 1 ExtractionJob → 1 ExtractedTable → N ExtractedField (une par + cellule de la grille), puis commit. Suppose un app-context VWB actif + (fourni par ``vwb_app_context()`` ou par l'appelant, comme le pont R1). + + ⚠️ ``patient_ref`` et ``cell["text"]`` sont stockés EN CLAIR (volontaire) : + le but est de constituer le dossier, pas d'anonymiser. + """ + from db.models import db, ExtractionJob, ExtractedTable, ExtractedField + + job = ExtractionJob( + id=uuid.uuid4().hex, + patient_ref=patient_ref, + source_session_id=source_session_id, + status=status, + ) + db.session.add(job) + + table = ExtractedTable( + id=uuid.uuid4().hex, + job_id=job.id, + screen_bbox=screen_bbox, + screenshot_ref=screenshot_ref, + ) + db.session.add(table) + + for row in grid or []: + for cell in row or []: + db.session.add(ExtractedField( + id=uuid.uuid4().hex, + table_id=table.id, + row=cell.get("row"), + col=cell.get("col"), + value=cell.get("text"), + bbox=cell.get("bbox"), + confidence=cell.get("confidence"), + )) + + db.session.commit() + return job.id diff --git a/tests/unit/test_extract_dossier.py b/tests/unit/test_extract_dossier.py new file mode 100644 index 000000000..cdfd0d15b --- /dev/null +++ b/tests/unit/test_extract_dossier.py @@ -0,0 +1,219 @@ +"""Tests TDD — Extraction « dossier patient » (brique 3). + +Deux couches testées : + +1. ``vwb_db.persist_extracted_dossier`` : depuis une grille OCR + (List[List[cell]]), crée ExtractionJob → ExtractedTable → ExtractedField + et commit. Testé sur SQLite mémoire via un app-context Flask jetable + (PAS la vraie DB VWB — isolation). + +2. ``replay_engine._handle_extract_dossier_action`` : lit last_screenshot, + appelle ``extract_grid_from_image`` (mocké), applique la gate qualité + (complete / needs_review), persiste via vwb_db et n'échoue JAMAIS le + replay (grille vide → needs_review, sans lever). + +⚠️ Canal extraction = données patient EN CLAIR (volontaire) : on vérifie +que les valeurs sont persistées telles quelles, sans tokenisation. +""" +import pytest +from flask import Flask + +# vwb_db ajoute visual_workflow_builder/backend au sys.path à l'import → +# doit précéder l'import de db.models (couplage worker→DB VWB mutualisé). +import agent_v0.server_v1.vwb_db as vwb_db +import agent_v0.server_v1.replay_engine as replay_engine + +from db.models import db, ExtractionJob, ExtractedTable, ExtractedField + + +# --------------------------------------------------------------------------- +# Fixtures : app Flask jetable sur SQLite mémoire (isolation totale) +# --------------------------------------------------------------------------- +@pytest.fixture +def mem_app(): + """App Flask minimale liée à une DB SQLite en mémoire.""" + app = Flask("test_extract_dossier") + app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" + app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + db.init_app(app) + with app.app_context(): + db.create_all() + yield app + + +def _grid_2x2(): + """Grille connue 2×2 (confiances hautes).""" + return [ + [ + {"text": "Nom", "bbox": [[0, 0], [1, 0], [1, 1], [0, 1]], "confidence": 0.95, "row": 0, "col": 0}, + {"text": "MOREL", "bbox": [[2, 0], [3, 0], [3, 1], [2, 1]], "confidence": 0.92, "row": 0, "col": 1}, + ], + [ + {"text": "IPP", "bbox": [[0, 2], [1, 2], [1, 3], [0, 3]], "confidence": 0.90, "row": 1, "col": 0}, + {"text": "25123456", "bbox": [[2, 2], [3, 2], [3, 3], [2, 3]], "confidence": 0.88, "row": 1, "col": 1}, + ], + ] + + +# --------------------------------------------------------------------------- +# 1) persist_extracted_dossier +# --------------------------------------------------------------------------- +@pytest.mark.unit +def test_persist_extracted_dossier_creates_job_table_fields(mem_app): + job_id = vwb_db.persist_extracted_dossier( + _grid_2x2(), + patient_ref="MOREL Catherine", + source_session_id="sess-42", + screenshot_ref="/captures/last.png", + screen_bbox={"x": 0, "y": 0, "width": 800, "height": 600}, + status="complete", + ) + + assert isinstance(job_id, str) and job_id + + job = db.session.get(ExtractionJob, job_id) + assert job is not None + assert job.status == "complete" + assert job.patient_ref == "MOREL Catherine" # EN CLAIR, non tokenisé + assert job.source_session_id == "sess-42" + + tables = ExtractedTable.query.filter_by(job_id=job_id).all() + assert len(tables) == 1 + assert tables[0].screenshot_ref == "/captures/last.png" + assert tables[0].screen_bbox == {"x": 0, "y": 0, "width": 800, "height": 600} + + fields = ExtractedField.query.filter_by(table_id=tables[0].id).all() + assert len(fields) == 4 # 2×2 cellules + values = {(f.row, f.col): f.value for f in fields} + assert values[(0, 1)] == "MOREL" # valeur patient EN CLAIR conservée + assert values[(1, 1)] == "25123456" + confs = {(f.row, f.col): f.confidence for f in fields} + assert confs[(0, 0)] == pytest.approx(0.95) + + +@pytest.mark.unit +def test_persist_extracted_dossier_empty_grid_still_creates_job(mem_app): + """Grille vide → Job + Table sans Field (statut transmis tel quel).""" + job_id = vwb_db.persist_extracted_dossier( + [], + patient_ref=None, + source_session_id="sess-empty", + screenshot_ref="/captures/empty.png", + screen_bbox=None, + status="needs_review", + ) + job = db.session.get(ExtractionJob, job_id) + assert job is not None and job.status == "needs_review" + tables = ExtractedTable.query.filter_by(job_id=job_id).all() + assert len(tables) == 1 + assert ExtractedField.query.filter_by(table_id=tables[0].id).count() == 0 + + +# --------------------------------------------------------------------------- +# 2) _handle_extract_dossier_action +# --------------------------------------------------------------------------- +@pytest.mark.unit +def test_handle_extract_dossier_complete(mem_app, monkeypatch, tmp_path): + # screenshot bidon sur disque (le mock OCR ignore le contenu) + shot = tmp_path / "shot.png" + shot.write_bytes(b"\x89PNG") + + # extract_grid_from_image mocké → grille 2×2 de confiance haute + monkeypatch.setattr( + "core.llm.extract_grid_from_image", + lambda *a, **k: _grid_2x2(), + ) + # vwb_app_context pointé sur l'app mémoire de la fixture + monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context()) + monkeypatch.setattr(replay_engine, "vwb_db", vwb_db, raising=False) + + replay_state = { + "last_screenshot": str(shot), + "variables": {}, + "replay_id": "rep-1", + } + action = { + "type": "extract_dossier", + "parameters": { + "output_var": "dossier_id", + "patient_ref": "MOREL Catherine", + "expected_cols": 2, + "min_confidence": 0.5, + }, + } + + ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-42") + assert ok is True + + job_id = replay_state["variables"]["dossier_id"] + assert isinstance(job_id, str) and job_id + with mem_app.app_context(): + job = db.session.get(ExtractionJob, job_id) + assert job is not None + assert job.status == "complete" # gate OK : non vide, conf ok, 2 cols + + +@pytest.mark.unit +def test_handle_extract_dossier_low_confidence_needs_review(mem_app, monkeypatch, tmp_path): + shot = tmp_path / "shot.png" + shot.write_bytes(b"\x89PNG") + + low_grid = [ + [{"text": "x", "bbox": [], "confidence": 0.10, "row": 0, "col": 0}], + ] + monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: low_grid) + monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context()) + + replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-2"} + action = {"type": "extract_dossier", "parameters": {"min_confidence": 0.5}} + + ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-low") + assert ok is False # gate a basculé en needs_review + job_id = replay_state["variables"]["extracted_dossier"] + with mem_app.app_context(): + assert db.session.get(ExtractionJob, job_id).status == "needs_review" + + +@pytest.mark.unit +def test_handle_extract_dossier_empty_grid_no_raise(mem_app, monkeypatch, tmp_path): + shot = tmp_path / "shot.png" + shot.write_bytes(b"\x89PNG") + + monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: []) + monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context()) + + replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-3"} + action = {"type": "extract_dossier", "parameters": {}} + + # Ne lève jamais ; grille vide → needs_review + ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-empty") + assert ok is False + job_id = replay_state["variables"]["extracted_dossier"] + with mem_app.app_context(): + assert db.session.get(ExtractionJob, job_id).status == "needs_review" + + +@pytest.mark.unit +def test_handle_extract_dossier_persist_failure_no_raise(mem_app, monkeypatch, tmp_path): + """Si la persistance lève, le handler log et n'échoue PAS le replay.""" + shot = tmp_path / "shot.png" + shot.write_bytes(b"\x89PNG") + + monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: _grid_2x2()) + monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context()) + + def _boom(*a, **k): + raise RuntimeError("DB down") + monkeypatch.setattr(vwb_db, "persist_extracted_dossier", _boom) + + replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-4"} + action = {"type": "extract_dossier", "parameters": {}} + + ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-boom") + assert ok is False # jamais de raise + + +@pytest.mark.unit +def test_extract_dossier_declared_in_action_type_sets(): + assert "extract_dossier" in replay_engine._ALLOWED_ACTION_TYPES + assert "extract_dossier" in replay_engine._SERVER_SIDE_ACTION_TYPES