10 Commits

Author SHA1 Message Date
Dom
c82829f2bb feat(server): R1 — import auto du workflow appris vers la DB VWB (gated)
Some checks failed
tests / Lint (ruff + black) (push) Failing after 1m44s
tests / Tests unitaires (sans GPU) (push) Failing after 1m49s
tests / Tests sécurité (critique) (push) Has been skipped
finalize_session appelle _maybe_import_to_vwb : si RPA_R1_AUTO_IMPORT (OFF par
défaut), le workflow appris est assaini (sanitize_workflow_dict) puis importé en
DB VWB rejouable via le pont idempotent (import_core_workflow_to_db), dans un
app-context VWB lazy mutualisé (vwb_db). NON bloquant : un échec n'interrompt
jamais la finalisation. Rend l'appris rejouable sans geste manuel (R1).
Tests : câblage du seam + gating du flag + non-régression.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 17:44:24 +02:00
Dom
6075717353 feat(server): durcissement sanitizer PII (chevauchements + GXD5 + workflow_dict)
- Résolution des chevauchements par priorité de détecteur + longueur : corrige le
  FN où, sur 'Dossier/Patient NOM (NAISSANCE) Prénom', le nom de naissance fuyait. (Qwen)
- RE_GXD5_DIAG : tokenise le numéro de dossier ([DOSSIER_n]) ET le nom ([NOM_n]) dans
  'GXD5 Diagnostics - <num> - NOM PRENOM' — 3 patients fuyaient en prod clinique, 0 FP. (Qwen)
- sanitize_workflow_dict : assainit les champs texte d'un workflow appris (by_text, noms)
  avant import en DB VWB (canal apprentissage). Utilisé par R1. (Claude)
14 tests verts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 17:44:24 +02:00
Dom
13f760a3b9 feat(extraction): handler extract_dossier + pont worker→DB VWB mutualisé (brique 3)
vwb_db.py : couplage worker→DB VWB lazy (app Flask sur instance/workflows.db)
mutualisé (R1 + extraction), + persist_extracted_dossier (grille → Job/Table/Field).
replay_engine.py : handler _handle_extract_dossier_action — lit le screenshot,
extrait une grille structurée, gate qualité conservatrice (complete|needs_review),
persiste avec preuve (screenshot_ref/bbox/confidence). N'échoue JAMAIS le replay.
Données patient EN CLAIR (canal extraction, non anonymisé).

Réserve : dispatch runtime (api_stream.py) non encore branché — étape suivante,
à coordonner. Brique 3/4 de la verticale extraction dossier patient.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 14:18:08 +02:00
Dom
9883cad012 feat(extraction): modèle DB dossier patient extrait (Job/Table/Field)
ExtractionJob -> ExtractedTable -> ExtractedField (SQLAlchemy, cascade), avec
preuve par cellule (bbox + confidence) réutilisant la sémantique VWBEvidence,
et statut dossier needs_review|complete. Brique 2 de la verticale extraction.
Documenté : ce canal conserve les données patient EN CLAIR (≠ canal
apprentissage anonymisé) — aucune anonymisation ne doit cibler ces colonnes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 12:47:03 +02:00
Dom
5ed5ae2d4b feat(extraction): lecture de tableau structurée (grille bbox+confiance)
Nouvelle extract_grid_from_image() : reconstruit une grille List[List[cell]]
(lignes ET colonnes par clustering des centres y/x des tokens EasyOCR), en
conservant bbox + confiance + (row,col) par cellule. Contrairement à
extract_table_from_image (liste plate, coordonnée x jetée) — laissé intact.
Brique 1 de la verticale extraction dossier patient.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 12:46:48 +02:00
Dom
7fb58195fb fix(workflow): conserve machine_id au round-trip to_dict/from_dict
Les workflows rechargés du disque retombaient sur machine_id='default' :
to_dict ne sérialisait pas l'attribut d'instance _machine_id et from_dict ne
le reposait pas (il dormait dans metadata['machine_id']). to_dict le sérialise
si présent (pas de 'default' parasite) ; from_dict le restaure depuis le champ
explicite ou metadata (rétrocompat des workflows déjà sur disque).
Test de non-régression round-trip.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 11:05:10 +02:00
Dom
fccc06e4a2 feat(server): floute aussi les focus_* (blind spot PII)
Les screenshots focus_* (plein écran, ~1440 fichiers/350 Mo) contenaient des
titres PII non floutés. La condition de blur serveur les inclut désormais,
au même titre que shot_*_full et heartbeat_*. Brut conservé, version _blurred
produite en parallèle. (blind spot relevé par Qwen, revue 28/06)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 11:05:10 +02:00
Dom
6461f0a21b feat(server): câble sanitize_event au chokepoint stream_event (PII)
Assainissement PII appliqué une seule fois à l'entrée de stream_event(),
avec un mapping de tokens par session (cohérence intra-session). Les chemins
de persistance et de traitement (jsonl, worker.process_event_direct,
shadow_observe_event, enrichissement SOM) consomment tous la copie assainie
au lieu de l'event brut — plus aucune PII patient en clair côté serveur.

Test de non-régression du câblage: stream_event ne doit jamais écrire de PII
brute (IPP/contenu saisi) dans live_events.jsonl ni la propager au worker/shadow.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 10:39:27 +02:00
Dom
e84cdee393 fix(server): durcissement sanitizer PII suite revue adversariale Qwen
- FN-1/2/3 : ajout RE_PRENOM_NOM (« Prénom NOM » inversé sans parens/crochets,
  ex. « Alix DATTIN ») ; 2e mot tout-majuscules -> 0 FP sur « Mozilla Firefox ».
- FN-4 (majeur, 228 events) : sanitize_event scanne désormais les titres
  RÉCURSIVEMENT (vision_info.window_capture.window_title et tout titre imbriqué),
  au lieu de 3 clés top-level hardcodées.
2 correctifs issus de la revue croisée Qwen. 11 tests verts, 0 FP.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 20:24:52 +02:00
Dom
30d8f65e9a feat(server): sanitize_event — assainissement PII au niveau event
sanitize_event(event, mapping) applique le principe « Léa apprend l'interface,
pas la donnée » (décision Dom 28/06) avant persistance :
- text_input -> contenu (text + raw_keys) remplacé par [SAISIE] (option b) :
  résout la fuite la plus grave (contenu médical) SANS NER ni détection ;
- titres de fenêtre (active_window_title + window/to/from.title) : identité
  patient tokenisée (anonymize_text), app/écran gardés ; cohérence par mapping.
Copie défensive (ne mute pas l'event d'origine). 4 tests (9 au total) verts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 19:53:09 +02:00
16 changed files with 1507 additions and 11 deletions

View File

@@ -27,6 +27,7 @@ from fastapi import BackgroundTasks, Depends, FastAPI, File, HTTPException, Requ
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel
from .pii_sanitizer import sanitize_event
from .replay_failure_logger import log_replay_failure from .replay_failure_logger import log_replay_failure
from .replay_verifier import ReplayVerifier, VerificationResult from .replay_verifier import ReplayVerifier, VerificationResult
from .replay_learner import ReplayLearner from .replay_learner import ReplayLearner
@@ -1922,6 +1923,11 @@ async def stream_event(data: StreamEvent):
# Auto-enregistrer la session si inconnue (robustesse au redémarrage serveur) # Auto-enregistrer la session si inconnue (robustesse au redémarrage serveur)
_ensure_session_registered(session_id, machine_id=machine_id) _ensure_session_registered(session_id, machine_id=machine_id)
# ── Assainissement PII : sanitize une fois, les 3 chemins reçoivent la copie ──
sanitized_event = sanitize_event(
data.event, mapping=_session_pii_mapping[session_id]
)
# Persister sur disque (journal JSONL, dans un sous-dossier par machine si multi-machine) # Persister sur disque (journal JSONL, dans un sous-dossier par machine si multi-machine)
if machine_id and machine_id != "default": if machine_id and machine_id != "default":
session_path = LIVE_SESSIONS_DIR / machine_id / session_id session_path = LIVE_SESSIONS_DIR / machine_id / session_id
@@ -1930,21 +1936,26 @@ async def stream_event(data: StreamEvent):
session_path.mkdir(parents=True, exist_ok=True) session_path.mkdir(parents=True, exist_ok=True)
event_file = session_path / "live_events.jsonl" event_file = session_path / "live_events.jsonl"
with open(event_file, "a", encoding="utf-8") as f: with open(event_file, "a", encoding="utf-8") as f:
f.write(json.dumps(data.dict()) + "\n") f.write(json.dumps({
"session_id": data.session_id,
"timestamp": data.timestamp,
"event": sanitized_event,
"machine_id": machine_id,
}) + "\n")
# Traitement direct via StreamProcessor # Traitement direct via StreamProcessor
result = worker.process_event_direct(session_id, data.event) result = worker.process_event_direct(session_id, sanitized_event)
# ── Observation Shadow (si mode Shadow activé pour cette session) ── # ── Observation Shadow (si mode Shadow activé pour cette session) ──
# L'appel est protégé et non bloquant : si l'observer n'est pas # L'appel est protégé et non bloquant : si l'observer n'est pas
# actif, ou s'il lève, la capture continue normalement. # actif, ou s'il lève, la capture continue normalement.
shadow_observe_event(session_id, data.event) shadow_observe_event(session_id, sanitized_event)
# ── Enrichissement SomEngine temps réel pour les mouse_click ── # ── Enrichissement SomEngine temps réel pour les mouse_click ──
# Après l'enregistrement de l'event, tenter l'enrichissement si le # Après l'enregistrement de l'event, tenter l'enrichissement si le
# screenshot est déjà arrivé. Sinon, l'event est mis en attente et # screenshot est déjà arrivé. Sinon, l'event est mis en attente et
# sera enrichi quand le screenshot arrivera (voir stream_image). # sera enrichi quand le screenshot arrivera (voir stream_image).
event = data.event event = sanitized_event
if event.get("type") == "mouse_click" and event.get("screenshot_id"): if event.get("type") == "mouse_click" and event.get("screenshot_id"):
session = processor.session_manager.get_session(session_id) session = processor.session_manager.get_session(session_id)
if session: if session:
@@ -1962,6 +1973,9 @@ async def stream_event(data: StreamEvent):
# ========================================================================= # =========================================================================
# Ensemble des screenshots déjà analysés (évite les doublons de retry) # Ensemble des screenshots déjà analysés (évite les doublons de retry)
# Mapping PII par session — tokens cohérents intra-session (même patient → même [NOM_1])
_session_pii_mapping: Dict[str, Dict] = defaultdict(dict)
_analyzed_shots: Dict[str, set] = defaultdict(set) _analyzed_shots: Dict[str, set] = defaultdict(set)
# Hash du dernier screenshot analysé par session (déduplication par similarité) # Hash du dernier screenshot analysé par session (déduplication par similarité)
@@ -2358,9 +2372,12 @@ async def stream_image(
# Le fichier brut (shot_XXXX_full.png) reste intact pour le replay, # Le fichier brut (shot_XXXX_full.png) reste intact pour le replay,
# le grounding VLM et l'entraînement. La version floutée est écrite en # le grounding VLM et l'entraînement. La version floutée est écrite en
# parallèle sous shot_XXXX_full_blurred.png. # parallèle sous shot_XXXX_full_blurred.png.
# focus_* : plein écran avec PII dans les titres (blind spot Qwen 28/06,
# 1440 fichiers/350 Mo non floutés) — désormais inclus dans le blur.
if _PII_BLUR_ENABLED and _blur_pii_on_image is not None and ( if _PII_BLUR_ENABLED and _blur_pii_on_image is not None and (
("_full" in shot_id and shot_id.startswith("shot_")) ("_full" in shot_id and shot_id.startswith("shot_"))
or shot_id.startswith("heartbeat_") or shot_id.startswith("heartbeat_")
or shot_id.startswith("focus_")
): ):
_pii_blur_executor.submit(_produce_blurred_version, file_path_str, shot_id) _pii_blur_executor.submit(_produce_blurred_version, file_path_str, shot_id)

View File

@@ -16,6 +16,7 @@ Branche feat/push-log-dgx — assainissement PII clinique.
from __future__ import annotations from __future__ import annotations
import copy
import re import re
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
@@ -37,17 +38,32 @@ RE_NOM_NAISSANCE = re.compile(
RE_NOM_BRACKET = re.compile( RE_NOM_BRACKET = re.compile(
rf"\[((?:[{_MAJ}][\w{_MIN}'\-]*\s+){{1,3}}[{_MAJ}][\w{_MIN}'\-]*)\]" rf"\[((?:[{_MAJ}][\w{_MIN}'\-]*\s+){{1,3}}[{_MAJ}][\w{_MIN}'\-]*)\]"
) )
# « Prénom NOM » inversé, sans parenthèses ni crochets (ex. « Alix DATTIN »).
# 2e mot tout en MAJUSCULES → faible risque de FP (« Mozilla Firefox » ne matche pas).
RE_PRENOM_NOM = re.compile(rf"\b[{_MAJ}][{_MIN}]+\s+[{_MAJ}][{_MAJ}\-']+\b")
# GXD5 Diagnostics : numéro de dossier + nom patient tout-majuscules.
# Format réel : « GXD5 Diagnostics - 128008 - BENVENISTE MARIE-LAURENCE »
# Le numéro (128008) = ID dossier patient (PII). Le nom = PII.
# 2 groupes de capture : (1)=numéro, (2)=nom complet.
RE_GXD5_DIAG = re.compile(
rf"GXD5\s+Diagnostics\s*-\s*(\d+)\s*-\s*([{_MAJ}][{_MAJ}\-' ]+)"
)
# Ordre = priorité ; group = portion à remplacer (0 = match entier). # Ordre = priorité ; group = portion à remplacer (0 = match entier).
_DETECTORS: List[Tuple[re.Pattern, str, int]] = [ _DETECTORS: List[Tuple[re.Pattern, str, int]] = [
(RE_NOM_NAISSANCE, "NOM", 0), (RE_NOM_NAISSANCE, "NOM", 0),
(RE_NOM_BRACKET, "NOM", 0), (RE_NOM_BRACKET, "NOM", 0),
(RE_GXD5_DIAG, "DOSSIER", 1), # numéro de dossier
(RE_PRENOM_NOM, "NOM", 0),
(RE_EMAIL, "EMAIL", 0), (RE_EMAIL, "EMAIL", 0),
(RE_NIR, "NIR", 0), (RE_NIR, "NIR", 0),
(RE_IPP, "IPP", 1), (RE_IPP, "IPP", 1),
(RE_TEL, "TEL", 0), (RE_TEL, "TEL", 0),
(RE_AGE, "AGE", 0), (RE_AGE, "AGE", 0),
] ]
# GXD5 nom (groupe 2) traité séparément — même regex, priorité juste après.
_DETECTORS.append((RE_GXD5_DIAG, "NOM", 2))
# Anti-faux-positifs : termes logiciels/UI à ne jamais prendre pour un nom. # Anti-faux-positifs : termes logiciels/UI à ne jamais prendre pour un nom.
# (Sous-ensemble inline ; les gazetteers complets arrivent avec la couche NER.) # (Sous-ensemble inline ; les gazetteers complets arrivent avec la couche NER.)
@@ -111,14 +127,21 @@ def anonymize_text(
continue continue
spans.append((start, end, etype, value)) spans.append((start, end, etype, value))
# 2) résolution des chevauchements (priorité = ordre des détecteurs, puis position) # 2) résolution des chevauchements (priorité = rang détecteur, puis -longueur)
spans.sort(key=lambda s: (s[0], s[1])) # _DETECTORS est ordonné par priorité ; le rang dans cette liste détermine
# qui gagne quand deux patterns chevauchent. Plus prioritaire + plus long
# = accepté en premier, les plus courts/moins prioritaires sont éliminés.
# Fix FN « Dossier VIOLA (VIOLA) Liliane » : RE_PRENOM_NOM captait
# « Dossier VIOLA » (rang 2) et bloquait RE_NOM_NAISSANCE « VIOLA (VIOLA)
# Liliane » (rang 0, plus prioritaire et plus long).
det_rank = {p: i for i, (p, _, _) in enumerate(_DETECTORS)}
spans.sort(key=lambda s: (det_rank.get(s[2], 999), -(s[1] - s[0]), s[0]))
occupied: List[Tuple[int, int]] = []
accepted: List[Tuple[int, int, str, str]] = [] accepted: List[Tuple[int, int, str, str]] = []
last_end = -1
for start, end, etype, value in spans: for start, end, etype, value in spans:
if start >= last_end: if all(start >= oe or end <= os for os, oe in occupied):
accepted.append((start, end, etype, value)) accepted.append((start, end, etype, value))
last_end = end occupied.append((start, end))
# 3) substitution (de droite à gauche pour préserver les indices) # 3) substitution (de droite à gauche pour préserver les indices)
entities: List[Dict] = [] entities: List[Dict] = []
@@ -131,3 +154,86 @@ def anonymize_text(
) )
entities.reverse() entities.reverse()
return out, entities return out, entities
# Clés portant un titre de fenêtre, où qu'elles soient imbriquées dans l'event
# (top-level `active_window_title`, `window/to/from.title`, et surtout
# `vision_info.window_capture.window_title` — blind spot signalé par Qwen).
_TITLE_KEYS = ("title", "window_title", "active_window_title")
_PLACEHOLDER_SAISIE = "[SAISIE]"
def _walk_titles(obj, mapping: Dict) -> None:
"""Parcourt récursivement l'event et assainit toute valeur de titre de fenêtre."""
if isinstance(obj, dict):
for k, v in obj.items():
if k in _TITLE_KEYS and isinstance(v, str):
obj[k] = anonymize_text(v, mapping=mapping)[0]
else:
_walk_titles(v, mapping)
elif isinstance(obj, list):
for item in obj:
_walk_titles(item, mapping)
def sanitize_event(event: Dict, *, mapping: Optional[Dict] = None) -> Dict:
"""Assainit un event capturé avant persistance (copie, ne mute pas l'original).
Principe « Léa apprend l'interface, pas la donnée » (décision Dom 28/06) :
- `text_input` : le **contenu tapé** (`text`, `raw_keys`) = donnée de santé →
remplacé par `[SAISIE]` (on garde le champ, pas la valeur — option b) ;
- **titres de fenêtre** (`active_window_title`, et `title` dans `window`/`to`/
`from`) : l'**identité patient** est tokenisée, l'app/écran est gardé
(contexte d'apprentissage), via `anonymize_text` + `mapping` partagé (cohérence).
"""
if mapping is None:
mapping = {}
ev = copy.deepcopy(event)
# text_input : on ne garde pas le contenu
if ev.get("type") == "text_input":
for k in ("text", "raw_keys"):
if ev.get(k) not in (None, ""):
ev[k] = _PLACEHOLDER_SAISIE
# tous les titres de fenêtre, où qu'ils soient imbriqués
# (active_window_title, window/to/from.title, vision_info.window_capture.window_title…)
_walk_titles(ev, mapping)
return ev
# Clés d'un workflow core portant du texte potentiellement PII : cible OCR
# (`by_text`), noms d'écrans/labels dérivés des titres. Le contenu saisi est
# déjà neutralisé à la source (sanitize_event → [SAISIE]).
_WORKFLOW_TEXT_KEYS = ("by_text", "name", "label")
def _walk_workflow_text(obj, mapping: Dict) -> None:
"""Parcourt un workflow core et tokenise la PII des champs texte (cibles, noms)."""
if isinstance(obj, dict):
for k, v in obj.items():
if k in _WORKFLOW_TEXT_KEYS and isinstance(v, str) and v:
obj[k] = anonymize_text(v, mapping=mapping)[0]
else:
_walk_workflow_text(v, mapping)
elif isinstance(obj, list):
for item in obj:
_walk_workflow_text(item, mapping)
def sanitize_workflow_dict(workflow_dict: Dict, *, mapping: Optional[Dict] = None) -> Dict:
"""Assainit un workflow core (JSON appris) avant import/persistance en DB VWB.
Tokenise la PII des champs texte (cible OCR `by_text`, noms d'écrans, labels)
via `anonymize_text`, en gardant l'interface intacte (« Léa apprend
l'interface, pas la donnée »). Copie — l'original n'est pas muté.
Limite (couche 1) : ne capte que la PII structurée (IPP, NOM clinique…) ;
les noms libres relèvent de la couche 2 NER.
"""
if mapping is None:
mapping = {}
wf = copy.deepcopy(workflow_dict)
_walk_workflow_text(wf, mapping)
return wf

View File

@@ -40,6 +40,7 @@ _ALLOWED_ACTION_TYPES = {
"pause_for_human", # Pause supervisée explicite (interceptée par /replay/next) "pause_for_human", # Pause supervisée explicite (interceptée par /replay/next)
"extract_text", # OCR serveur sur dernier heartbeat → variable workflow "extract_text", # OCR serveur sur dernier heartbeat → variable workflow
"extract_table", # OCR serveur + filtre regex → liste structurée (boucle) "extract_table", # OCR serveur + filtre regex → liste structurée (boucle)
"extract_dossier", # OCR grille structurée → dossier patient persisté (brique 3)
"extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions "extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions
"_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll) "_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll)
"t2a_decision", # Analyse LLM facturation T2A → variable workflow "t2a_decision", # Analyse LLM facturation T2A → variable workflow
@@ -53,6 +54,7 @@ _ALLOWED_ACTION_TYPES = {
_SERVER_SIDE_ACTION_TYPES = { _SERVER_SIDE_ACTION_TYPES = {
"extract_text", "extract_text",
"extract_table", "extract_table",
"extract_dossier",
"t2a_decision", "t2a_decision",
"llm_generate", "llm_generate",
"_concat_text_vars", "_concat_text_vars",
@@ -2216,6 +2218,146 @@ def _handle_extract_table_action(
return bool(rows) return bool(rows)
def _resolve_screenshot_path(replay_state: Dict[str, Any]) -> Optional[str]:
"""Résout le chemin du dernier screenshot (path disque ou base64 → temp).
Calque la source utilisée par extract_text/extract_table : priorité au
``last_screenshot`` (path ou data-URI base64). Retourne None si absent.
"""
raw_screenshot = replay_state.get("last_screenshot") or ""
if not raw_screenshot:
return None
if raw_screenshot.startswith("data:"):
try:
import base64 as _b64, tempfile
header, b64data = raw_screenshot.split(",", 1)
suffix = ".jpg" if "jpeg" in header else ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.write(_b64.b64decode(b64data))
tmp.close()
return tmp.name
except Exception as e:
logger.warning("extract_dossier: décodage base64 screenshot échoué: %s", e)
return None
if os.path.isfile(raw_screenshot):
return raw_screenshot
return None
def _gate_dossier_quality(
grid: List[List[Dict[str, Any]]],
*,
min_confidence: float,
expected_cols: Optional[int],
) -> str:
"""Gate qualité simple → 'complete' ou 'needs_review'.
'complete' SSI : grille non vide ET confiance médiane ≥ seuil ET (si
expected_cols fourni) au moins une ligne avec ce nombre de colonnes.
Sinon 'needs_review'. Volontairement conservatrice (default-review).
"""
confs = [
cell.get("confidence")
for row in grid for cell in row
if isinstance(cell.get("confidence"), (int, float))
]
if not confs:
return "needs_review"
confs.sort()
median = confs[len(confs) // 2]
if median < min_confidence:
return "needs_review"
if expected_cols is not None:
if not any(len(row) == expected_cols for row in grid):
return "needs_review"
return "complete"
def _handle_extract_dossier_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
session_id: str,
) -> bool:
"""Traite une action extract_dossier côté serveur (brique 3).
Lit le dernier screenshot, extrait une grille structurée via
``extract_grid_from_image``, applique une gate qualité, puis PERSISTE un
« dossier patient extrait » (Job/Table/Field) dans la DB VWB avec preuve
(screenshot_ref + screen_bbox + confidences). Le job_id est stocké dans
``replay_state["variables"][output_var]``.
Paramètres reconnus (action.parameters) :
output_var : nom de variable runtime (default "extracted_dossier")
patient_ref : référence patient EN CLAIR (volontaire) — non tokenisée
region : (x, y, w, h) px pour cropper avant OCR (None = plein)
min_confidence : seuil de confiance médiane pour 'complete' (default 0.6)
expected_cols : nb de colonnes attendu (optionnel) pour la gate
N'ÉCHOUE JAMAIS le replay : toute erreur → log + needs_review.
Retourne True SSI le dossier est persisté avec statut 'complete'.
"""
params = action.get("parameters") or {}
output_var = (params.get("output_var") or params.get("variable_name") or "extracted_dossier").strip()
patient_ref = params.get("patient_ref")
region = params.get("region") or None
try:
min_confidence = float(params.get("min_confidence", 0.6))
except (TypeError, ValueError):
min_confidence = 0.6
expected_cols = params.get("expected_cols")
if isinstance(expected_cols, str):
try:
expected_cols = int(expected_cols)
except ValueError:
expected_cols = None
job_id = ""
status = "needs_review"
try:
path = _resolve_screenshot_path(replay_state)
grid: List[List[Dict[str, Any]]] = []
if path:
from core.llm import extract_grid_from_image
grid = extract_grid_from_image(
path, region=tuple(region) if region else None
)
else:
logger.warning(
"extract_dossier : pas de screenshot pour session %s — needs_review",
session_id,
)
status = _gate_dossier_quality(
grid, min_confidence=min_confidence, expected_cols=expected_cols
)
from . import vwb_db
with vwb_db.vwb_app_context():
job_id = vwb_db.persist_extracted_dossier(
grid,
patient_ref=patient_ref,
source_session_id=session_id,
screenshot_ref=path,
screen_bbox=({"x": region[0], "y": region[1], "width": region[2], "height": region[3]}
if region and len(region) == 4 else None),
status=status,
)
except Exception as e:
# Ne JAMAIS échouer le replay : on log, on marque needs_review.
logger.warning(
"extract_dossier : échec persistance (%s) — needs_review, replay %s",
e, replay_state.get("replay_id", "?"),
)
status = "needs_review"
replay_state.setdefault("variables", {})[output_var] = job_id
logger.info(
"extract_dossier → variable '%s' job=%s statut=%s replay %s",
output_var, job_id or "?", status, replay_state.get("replay_id", "?"),
)
return status == "complete"
def _handle_t2a_decision_action( def _handle_t2a_decision_action(
action: Dict[str, Any], action: Dict[str, Any],
replay_state: Dict[str, Any], replay_state: Dict[str, Any],

View File

@@ -3066,6 +3066,8 @@ class StreamProcessor:
saved_path = self._persist_workflow(workflow, session_id, machine_id=machine_id) saved_path = self._persist_workflow(workflow, session_id, machine_id=machine_id)
# Stocker le machine_id dans le workflow pour le filtrage # Stocker le machine_id dans le workflow pour le filtrage
workflow._machine_id = machine_id workflow._machine_id = machine_id
# R1 : import auto en DB VWB (rejouable) — gated RPA_R1_AUTO_IMPORT, non bloquant.
self._maybe_import_to_vwb(workflow, session_id, machine_id)
# Récupérer les métadonnées applicatives de la session # Récupérer les métadonnées applicatives de la session
session_state = self.session_manager.get_session(session_id) session_state = self.session_manager.get_session(session_id)
@@ -4444,6 +4446,45 @@ class StreamProcessor:
logger.error(f"Erreur sauvegarde workflow {session_id}: {e}") logger.error(f"Erreur sauvegarde workflow {session_id}: {e}")
return None return None
def _import_workflow_to_vwb(self, workflow, session_id: str, machine_id: str) -> Dict[str, Any]:
"""Importer le workflow appris dans la DB VWB rejouable (Maillon A / R1).
Rend l'appris rejouable sans geste manuel, de façon idempotente (fusion
par signature de trajectoire). Suppose un app-context VWB actif fournissant
``db.session`` (créé par l'appelant côté worker).
"""
from .pii_sanitizer import sanitize_workflow_dict
from services.learned_workflow_bridge import import_core_workflow_to_db
from db.models import db
# Assainir la PII (cibles OCR `by_text`, noms) avant dépôt en DB VWB.
core_dict = sanitize_workflow_dict(workflow.to_dict())
return import_core_workflow_to_db(
core_dict,
machine_id=machine_id,
source_session_id=session_id,
db_session=db.session,
)
def _vwb_app_context(self):
"""Couplage worker→DB VWB mutualisé (un seul pont, cf. vwb_db).
Délègue au helper module ``vwb_db.vwb_app_context`` partagé entre R1 et
l'extraction métier — pas de duplication de l'app Flask/init_app.
"""
from .vwb_db import vwb_app_context
return vwb_app_context()
def _maybe_import_to_vwb(self, workflow, session_id: str, machine_id: str) -> None:
"""Import auto de l'appris en DB VWB, gated par RPA_R1_AUTO_IMPORT (OFF
par défaut) et NON bloquant : un échec ne casse jamais la finalisation."""
if os.environ.get("RPA_R1_AUTO_IMPORT", "false").lower() not in ("true", "1", "yes"):
return
try:
with self._vwb_app_context():
self._import_workflow_to_vwb(workflow, session_id, machine_id)
except Exception as e:
logger.warning("[R1] import VWB auto échoué (non bloquant): %s", e)
def _build_raw_session_fallback(self, session, raw_dict): def _build_raw_session_fallback(self, session, raw_dict):
"""Construire un RawSession manuellement si from_dict échoue.""" """Construire un RawSession manuellement si from_dict échoue."""
from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext

View File

@@ -0,0 +1,106 @@
"""Couplage worker → DB VWB (mutualisé) + persistance « dossier patient extrait ».
Le worker/serveur streaming est un process distinct du backend VWB : il n'a
pas d'app Flask en mémoire. Ce module fournit :
- ``vwb_app_context()`` : un app-context Flask lazy (singleton module) lié au
fichier SQLite VWB ``visual_workflow_builder/backend/instance/workflows.db``,
avec ``db.init_app`` (db de ``db.models``). Réutilisable par tout module
serveur qui doit écrire dans la DB VWB (R1, extraction métier, …).
- ``persist_extracted_dossier(...)`` : depuis une grille OCR
(``List[List[cell]]``), crée ExtractionJob → ExtractedTable → ExtractedField
et commit. Suppose un app-context actif (comme le pont R1 existant).
⚠️ CANAL EXTRACTION = données patient EN CLAIR (volontaire) : aucune
tokenisation/assainissement PII ici (cf. note dans db/models.py).
"""
import sys
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional
# Ajout du backend VWB au sys.path à l'import → rend ``db.models`` importable
# (couplage worker→DB VWB mutualisé ; identique au pattern stream_processor).
_VWB_BACKEND = Path(__file__).resolve().parents[2] / "visual_workflow_builder" / "backend"
if str(_VWB_BACKEND) not in sys.path:
sys.path.insert(0, str(_VWB_BACKEND))
# App Flask lazy (singleton module) — un seul db.init_app pour tout le process.
_vwb_app = None
@contextmanager
def vwb_app_context():
"""App-context Flask VWB (lazy singleton) sur instance/workflows.db.
À utiliser via ``with vwb_app_context(): ...`` autour des appels qui
nécessitent ``db.session`` (ex. persist_extracted_dossier).
"""
global _vwb_app
if _vwb_app is None:
from flask import Flask
from db.models import db
db_path = _VWB_BACKEND / "instance" / "workflows.db"
app = Flask("worker_vwb")
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
_vwb_app = app
with _vwb_app.app_context():
yield
def persist_extracted_dossier(
grid: List[List[Dict[str, Any]]],
*,
patient_ref: Optional[str],
source_session_id: Optional[str],
screenshot_ref: Optional[str],
screen_bbox: Optional[Dict[str, Any]],
status: str,
) -> str:
"""Persiste un « dossier patient extrait » et retourne le job_id.
Crée 1 ExtractionJob → 1 ExtractedTable → N ExtractedField (une par
cellule de la grille), puis commit. Suppose un app-context VWB actif
(fourni par ``vwb_app_context()`` ou par l'appelant, comme le pont R1).
⚠️ ``patient_ref`` et ``cell["text"]`` sont stockés EN CLAIR (volontaire) :
le but est de constituer le dossier, pas d'anonymiser.
"""
from db.models import db, ExtractionJob, ExtractedTable, ExtractedField
job = ExtractionJob(
id=uuid.uuid4().hex,
patient_ref=patient_ref,
source_session_id=source_session_id,
status=status,
)
db.session.add(job)
table = ExtractedTable(
id=uuid.uuid4().hex,
job_id=job.id,
screen_bbox=screen_bbox,
screenshot_ref=screenshot_ref,
)
db.session.add(table)
for row in grid or []:
for cell in row or []:
db.session.add(ExtractedField(
id=uuid.uuid4().hex,
table_id=table.id,
row=cell.get("row"),
col=cell.get("col"),
value=cell.get("text"),
bbox=cell.get("bbox"),
confidence=cell.get("confidence"),
))
db.session.commit()
return job.id

View File

@@ -8,6 +8,7 @@ from .t2a_decision import (
) )
from .ocr_extractor import ( from .ocr_extractor import (
extract_digits_tesseract_from_image, extract_digits_tesseract_from_image,
extract_grid_from_image,
extract_table_from_image, extract_table_from_image,
extract_text_from_image, extract_text_from_image,
) )
@@ -19,5 +20,6 @@ __all__ = [
"build_dpi_enriched", "build_dpi_enriched",
"extract_text_from_image", "extract_text_from_image",
"extract_table_from_image", "extract_table_from_image",
"extract_grid_from_image",
"extract_digits_tesseract_from_image", "extract_digits_tesseract_from_image",
] ]

View File

@@ -243,3 +243,107 @@ def extract_table_from_image(
except Exception as e: except Exception as e:
logger.warning("extract_table échoué sur %s : %s", image_path, e) logger.warning("extract_table échoué sur %s : %s", image_path, e)
return [] return []
def _cluster_1d(centers: List[float], tol: float) -> List[Tuple[float, int]]:
"""Regroupe des positions 1D par proximité (centres triés, gap > tol = nouveau cluster).
Retourne, pour chaque centre d'entrée (ordre d'origine), un couple
(centre_du_cluster, index_du_cluster), les clusters étant indexés dans
l'ordre croissant. Permet de mapper lignes (y) et colonnes (x).
"""
order = sorted(range(len(centers)), key=lambda i: centers[i])
cluster_of = [0] * len(centers)
cluster_centers: List[List[float]] = []
prev = None
idx = -1
for i in order:
c = centers[i]
if prev is None or (c - prev) > tol:
idx += 1
cluster_centers.append([])
cluster_centers[idx].append(c)
cluster_of[i] = idx
prev = c
means = [sum(g) / len(g) for g in cluster_centers]
return [(means[cluster_of[i]], cluster_of[i]) for i in range(len(centers))]
def extract_grid_from_image(
image_path: str,
region: Optional[Tuple[int, int, int, int]] = None,
row_tol: float = 12.0,
col_tol: float = 25.0,
) -> List[List[dict]]:
"""Extrait un tableau STRUCTURÉ (lignes ET colonnes) via OCR EasyOCR.
Contrairement à `extract_table_from_image` (liste plate triée par y, x jeté),
on conserve la coordonnée x pour reconstruire une grille. Clustering :
lignes par proximité du centre y, colonnes par proximité du centre x.
Args:
image_path: chemin du PNG sur disque.
region: (x, y, w, h) pour cropper avant OCR. None = image entière.
row_tol: écart vertical max (px) entre 2 tokens d'une même ligne.
col_tol: écart horizontal max (px) entre 2 tokens d'une même colonne.
Returns:
Grille `List[List[cell]]`, lignes top→bottom, colonnes left→right.
`cell = {"text", "bbox", "confidence", "row", "col"}`.
En cas d'erreur ou d'absence de tokens, retourne [].
"""
path = Path(image_path)
if not path.exists():
logger.warning("extract_grid: fichier introuvable %s", image_path)
return []
try:
from PIL import Image
import numpy as np
img = Image.open(path)
if region:
x, y, w, h = region
img = img.crop((x, y, x + w, y + h))
reader = _get_reader()
results = reader.readtext(np.array(img), detail=1, paragraph=False)
toks = []
for bbox, text, conf in results:
t = str(text).strip()
if not t:
continue
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
toks.append({
"text": t,
"bbox": bbox,
"confidence": conf,
"xc": sum(xs) / len(xs),
"yc": sum(ys) / len(ys),
})
if not toks:
return []
rows_cl = _cluster_1d([tk["yc"] for tk in toks], row_tol)
cols_cl = _cluster_1d([tk["xc"] for tk in toks], col_tol)
for tk, (_yc, r), (_xc, c) in zip(toks, rows_cl, cols_cl):
tk["row"], tk["col"] = r, c
n_rows = max(tk["row"] for tk in toks) + 1
grid: List[List[dict]] = [[] for _ in range(n_rows)]
for tk in toks:
grid[tk["row"]].append({
"text": tk["text"],
"bbox": tk["bbox"],
"confidence": tk["confidence"],
"row": tk["row"],
"col": tk["col"],
})
for row in grid:
row.sort(key=lambda cell: cell["col"])
return grid
except Exception as e:
logger.warning("extract_grid échoué sur %s : %s", image_path, e)
return []

View File

@@ -1250,12 +1250,16 @@ class Workflow:
} }
if self.chain_config: if self.chain_config:
result["chain_config"] = self.chain_config.to_dict() if hasattr(self.chain_config, 'to_dict') else self.chain_config result["chain_config"] = self.chain_config.to_dict() if hasattr(self.chain_config, 'to_dict') else self.chain_config
# machine_id : attribut d'instance posé au runtime (pas un champ dataclass)
machine_id = getattr(self, "_machine_id", None)
if machine_id:
result["machine_id"] = machine_id
return result return result
@classmethod @classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Workflow': def from_dict(cls, data: Dict[str, Any]) -> 'Workflow':
"""Désérialiser depuis JSON""" """Désérialiser depuis JSON"""
return cls( wf = cls(
workflow_id=data["workflow_id"], workflow_id=data["workflow_id"],
name=data.get("name", data["workflow_id"]), name=data.get("name", data["workflow_id"]),
description=data.get("description", ""), description=data.get("description", ""),
@@ -1277,7 +1281,13 @@ class Workflow:
references=data.get("references", []), references=data.get("references", []),
chain_config=data.get("chain_config") chain_config=data.get("chain_config")
) )
# Reposer machine_id (attribut d'instance) : priorité au champ explicite,
# sinon depuis metadata['machine_id'] (rétrocompat des workflows déjà sur disque)
machine_id = data.get("machine_id") or (wf.metadata or {}).get("machine_id")
if machine_id:
wf._machine_id = machine_id
return wf
def to_json(self) -> str: def to_json(self) -> str:
"""Sérialiser en JSON string""" """Sérialiser en JSON string"""
return json.dumps(self.to_dict(), indent=2) return json.dumps(self.to_dict(), indent=2)

View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""Test RED — Maillon A (R1) : câblage worker → DB VWB rejouable.
Invariant ciblé (le VRAI trou du chantier apprentissage) :
quand le worker `finalize_session` produit un workflow appris, ce workflow
doit devenir **rejouable** en atterrissant dans la DB VWB, **sans geste
manuel** — et un 2e passage de la MÊME trajectoire ne crée PAS de doublon.
État vérifié au moment d'écrire ce test :
- le pont `import_core_workflow_to_db` (services.learned_workflow_bridge) EXISTE
et est vert en isolation (idempotence par signature de trajectoire) ;
- MAIS le worker (`agent_v0/server_v1/stream_processor.py`) ne l'appelle JAMAIS :
`_persist_workflow` écrit le JSON sur disque, puis rien ne l'importe en DB VWB.
→ les deux mondes (JSON appris ↔ DB VWB rejouable) restent disjoints.
Ce test cible le **seam de câblage** manquant côté worker, sans exécuter le
chemin lourd de `finalize_session` (GraphBuilder / CLIP) : il appelle la méthode
de pont attendue `StreamProcessor._import_workflow_to_vwb(workflow, session_id,
machine_id)`. Cette méthode N'EXISTE PAS encore → le test échoue (RED) pour la
bonne raison : le câblage worker→VWB est absent.
Câblage minimal proposé (NON appliqué ici) :
dans `finalize_session`, juste après `_persist_workflow` (≈ ligne 3066), ajouter
self._import_workflow_to_vwb(workflow, session_id, machine_id)
où `_import_workflow_to_vwb` :
1. sérialise `workflow.to_dict()` ;
2. ouvre un app-context VWB (db.session) ;
3. délègue à `import_core_workflow_to_db(core_dict, machine_id=...,
source_session_id=..., db_session=db.session)`.
"""
import sys
from pathlib import Path
import pytest
from flask import Flask
# --- Chemins : racine projet (core.*, agent_v0.*) + backend VWB (db.models, services.*) ---
_ROOT = Path(__file__).resolve().parents[2] # .../rpa_vision_v3
_BACKEND = _ROOT / "visual_workflow_builder" / "backend"
for _p in (str(_ROOT), str(_BACKEND)):
if _p not in sys.path:
sys.path.insert(0, _p)
from db.models import db, Workflow # noqa: E402 (modèles ORM VWB)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def vwb_db_app():
"""App Flask minimale liée à une SQLite VWB en mémoire (schéma créé)."""
app = Flask("test_worker_import_to_vwb")
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
class _FakeCoreWorkflow:
"""Stub léger d'un workflow core produit par le worker.
Seul le **contrat** importe ici : le worker détient un objet exposant
`workflow_id` et `to_dict()` (cf. `core.models.workflow_graph.Workflow`,
déjà sérialisé par `_persist_workflow` via `save_to_file`). On reproduit ce
contrat sans dépendre du constructeur dataclass core (constraints/
post_conditions obligatoires) — la cible du test est le câblage, pas la
construction d'objet. Le dict renvoyé est exactement la forme que le pont
`convert_learned_to_vwb_steps` consomme (validé en isolation).
"""
def __init__(self):
self.workflow_id = "wf_sess_bloc_notes_worker"
def to_dict(self):
return {
"workflow_id": self.workflow_id,
# Nom porteur de PII clinique : l'import en DB VWB doit l'assainir
# (logiciel métier réel en préfixe, nom clinique structuré ensuite).
"name": "Gxd5diag - VIOLA (VIOLA) Liliane",
"entry_nodes": ["n1"],
"nodes": [
{"node_id": "n1", "name": "Bureau"},
{"node_id": "n2", "name": "Bloc-notes ouvert"},
],
"edges": [
{
"edge_id": "e1",
"from_node": "n1",
"to_node": "n2",
"action": {
"type": "mouse_click",
"target": {"by_text": "Bloc-notes", "by_role": "ocr"},
"parameters": {"button": "left"},
},
},
],
}
def _build_core_workflow():
"""Workflow core tel que vu par le worker (contrat `workflow_id` + `to_dict`)."""
return _FakeCoreWorkflow()
def _make_processor():
"""Instancie un StreamProcessor sans déclencher l'init lourde (CLIP/FAISS).
On crée l'objet via __new__ : le test n'exerce QUE la méthode de câblage,
pas le pipeline complet.
"""
from agent_v0.server_v1.stream_processor import StreamProcessor
return StreamProcessor.__new__(StreamProcessor)
# ---------------------------------------------------------------------------
# Test RED — le câblage worker→VWB
# ---------------------------------------------------------------------------
def test_finalized_workflow_becomes_replayable_in_vwb_db(vwb_db_app):
"""Un workflow appris par le worker devient rejouable en DB VWB,
et un 2e import de la même trajectoire ne crée pas de doublon (idempotence)."""
processor = _make_processor()
workflow = _build_core_workflow()
# --- Seam de câblage attendu (à implémenter côté worker) ---
# _import_workflow_to_vwb(workflow, session_id, machine_id) doit :
# - sérialiser workflow.to_dict()
# - importer en DB VWB via import_core_workflow_to_db (idempotent)
assert hasattr(processor, "_import_workflow_to_vwb"), (
"Câblage R1 absent : StreamProcessor n'expose pas de pont vers la DB VWB. "
"Le workflow appris reste sur disque (JSON) et n'est jamais rejouable."
)
with vwb_db_app.app_context():
first = processor._import_workflow_to_vwb(
workflow,
session_id="sess_bloc_notes_worker",
machine_id="DESKTOP-TEST_windows",
)
# 1er import → workflow rejouable créé en DB VWB
assert Workflow.query.count() == 1
created = Workflow.query.first()
assert created.source == "learned_import"
assert created.review_status == "pending_review"
assert (first or {}).get("created") is True
# PII : le nom patient ne doit jamais atterrir en clair dans la DB VWB
assert "VIOLA" not in created.name, created.name
# 2e import de la MÊME trajectoire → pas de doublon (idempotence)
second = processor._import_workflow_to_vwb(
workflow,
session_id="sess_bloc_notes_worker_rerun",
machine_id="DESKTOP-TEST_windows",
)
assert Workflow.query.count() == 1, "ré-import du même parcours = pas de doublon"
assert (second or {}).get("created") is False
assert (first or {}).get("workflow_id") == (second or {}).get("workflow_id")
# ---------------------------------------------------------------------------
# Activation prod (couplage worker→DB VWB) : gating par feature-flag
# ---------------------------------------------------------------------------
def test_maybe_import_gated_off_par_defaut(monkeypatch):
"""Sans RPA_R1_AUTO_IMPORT, l'import auto NE doit PAS se déclencher
(R1 reste inactif tant que le sanitizer n'est pas validé / GO Dom)."""
monkeypatch.delenv("RPA_R1_AUTO_IMPORT", raising=False)
processor = _make_processor()
appels = []
monkeypatch.setattr(processor, "_import_workflow_to_vwb",
lambda *a, **k: appels.append(a), raising=False)
processor._maybe_import_to_vwb(_build_core_workflow(), "sess", "machine")
assert appels == [] # gated OFF : aucun import
def test_maybe_import_actif_si_flag(monkeypatch):
"""Avec RPA_R1_AUTO_IMPORT=true, l'import est appelé dans l'app-context VWB."""
import contextlib
monkeypatch.setenv("RPA_R1_AUTO_IMPORT", "true")
processor = _make_processor()
appels = []
monkeypatch.setattr(processor, "_import_workflow_to_vwb",
lambda w, s, m: appels.append((s, m)), raising=False)
# neutralise la création réelle de l'app-context (testée au runtime)
monkeypatch.setattr(processor, "_vwb_app_context",
lambda: contextlib.nullcontext(), raising=False)
processor._maybe_import_to_vwb(_build_core_workflow(), "sess-x", "machine-y")
assert appels == [("sess-x", "machine-y")]
def test_maybe_import_ne_casse_pas_la_finalisation(monkeypatch):
"""Un échec d'import VWB ne doit JAMAIS faire échouer la finalisation worker."""
import contextlib
monkeypatch.setenv("RPA_R1_AUTO_IMPORT", "true")
processor = _make_processor()
monkeypatch.setattr(processor, "_vwb_app_context",
lambda: contextlib.nullcontext(), raising=False)
def _boom(*a, **k):
raise RuntimeError("DB VWB indisponible")
monkeypatch.setattr(processor, "_import_workflow_to_vwb", _boom, raising=False)
# ne doit pas lever
processor._maybe_import_to_vwb(_build_core_workflow(), "sess", "machine")

View File

@@ -0,0 +1,219 @@
"""Tests TDD — Extraction « dossier patient » (brique 3).
Deux couches testées :
1. ``vwb_db.persist_extracted_dossier`` : depuis une grille OCR
(List[List[cell]]), crée ExtractionJob → ExtractedTable → ExtractedField
et commit. Testé sur SQLite mémoire via un app-context Flask jetable
(PAS la vraie DB VWB — isolation).
2. ``replay_engine._handle_extract_dossier_action`` : lit last_screenshot,
appelle ``extract_grid_from_image`` (mocké), applique la gate qualité
(complete / needs_review), persiste via vwb_db et n'échoue JAMAIS le
replay (grille vide → needs_review, sans lever).
⚠️ Canal extraction = données patient EN CLAIR (volontaire) : on vérifie
que les valeurs sont persistées telles quelles, sans tokenisation.
"""
import pytest
from flask import Flask
# vwb_db ajoute visual_workflow_builder/backend au sys.path à l'import →
# doit précéder l'import de db.models (couplage worker→DB VWB mutualisé).
import agent_v0.server_v1.vwb_db as vwb_db
import agent_v0.server_v1.replay_engine as replay_engine
from db.models import db, ExtractionJob, ExtractedTable, ExtractedField
# ---------------------------------------------------------------------------
# Fixtures : app Flask jetable sur SQLite mémoire (isolation totale)
# ---------------------------------------------------------------------------
@pytest.fixture
def mem_app():
"""App Flask minimale liée à une DB SQLite en mémoire."""
app = Flask("test_extract_dossier")
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
with app.app_context():
db.create_all()
yield app
def _grid_2x2():
"""Grille connue 2×2 (confiances hautes)."""
return [
[
{"text": "Nom", "bbox": [[0, 0], [1, 0], [1, 1], [0, 1]], "confidence": 0.95, "row": 0, "col": 0},
{"text": "MOREL", "bbox": [[2, 0], [3, 0], [3, 1], [2, 1]], "confidence": 0.92, "row": 0, "col": 1},
],
[
{"text": "IPP", "bbox": [[0, 2], [1, 2], [1, 3], [0, 3]], "confidence": 0.90, "row": 1, "col": 0},
{"text": "25123456", "bbox": [[2, 2], [3, 2], [3, 3], [2, 3]], "confidence": 0.88, "row": 1, "col": 1},
],
]
# ---------------------------------------------------------------------------
# 1) persist_extracted_dossier
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_persist_extracted_dossier_creates_job_table_fields(mem_app):
job_id = vwb_db.persist_extracted_dossier(
_grid_2x2(),
patient_ref="MOREL Catherine",
source_session_id="sess-42",
screenshot_ref="/captures/last.png",
screen_bbox={"x": 0, "y": 0, "width": 800, "height": 600},
status="complete",
)
assert isinstance(job_id, str) and job_id
job = db.session.get(ExtractionJob, job_id)
assert job is not None
assert job.status == "complete"
assert job.patient_ref == "MOREL Catherine" # EN CLAIR, non tokenisé
assert job.source_session_id == "sess-42"
tables = ExtractedTable.query.filter_by(job_id=job_id).all()
assert len(tables) == 1
assert tables[0].screenshot_ref == "/captures/last.png"
assert tables[0].screen_bbox == {"x": 0, "y": 0, "width": 800, "height": 600}
fields = ExtractedField.query.filter_by(table_id=tables[0].id).all()
assert len(fields) == 4 # 2×2 cellules
values = {(f.row, f.col): f.value for f in fields}
assert values[(0, 1)] == "MOREL" # valeur patient EN CLAIR conservée
assert values[(1, 1)] == "25123456"
confs = {(f.row, f.col): f.confidence for f in fields}
assert confs[(0, 0)] == pytest.approx(0.95)
@pytest.mark.unit
def test_persist_extracted_dossier_empty_grid_still_creates_job(mem_app):
"""Grille vide → Job + Table sans Field (statut transmis tel quel)."""
job_id = vwb_db.persist_extracted_dossier(
[],
patient_ref=None,
source_session_id="sess-empty",
screenshot_ref="/captures/empty.png",
screen_bbox=None,
status="needs_review",
)
job = db.session.get(ExtractionJob, job_id)
assert job is not None and job.status == "needs_review"
tables = ExtractedTable.query.filter_by(job_id=job_id).all()
assert len(tables) == 1
assert ExtractedField.query.filter_by(table_id=tables[0].id).count() == 0
# ---------------------------------------------------------------------------
# 2) _handle_extract_dossier_action
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_handle_extract_dossier_complete(mem_app, monkeypatch, tmp_path):
# screenshot bidon sur disque (le mock OCR ignore le contenu)
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
# extract_grid_from_image mocké → grille 2×2 de confiance haute
monkeypatch.setattr(
"core.llm.extract_grid_from_image",
lambda *a, **k: _grid_2x2(),
)
# vwb_app_context pointé sur l'app mémoire de la fixture
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
monkeypatch.setattr(replay_engine, "vwb_db", vwb_db, raising=False)
replay_state = {
"last_screenshot": str(shot),
"variables": {},
"replay_id": "rep-1",
}
action = {
"type": "extract_dossier",
"parameters": {
"output_var": "dossier_id",
"patient_ref": "MOREL Catherine",
"expected_cols": 2,
"min_confidence": 0.5,
},
}
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-42")
assert ok is True
job_id = replay_state["variables"]["dossier_id"]
assert isinstance(job_id, str) and job_id
with mem_app.app_context():
job = db.session.get(ExtractionJob, job_id)
assert job is not None
assert job.status == "complete" # gate OK : non vide, conf ok, 2 cols
@pytest.mark.unit
def test_handle_extract_dossier_low_confidence_needs_review(mem_app, monkeypatch, tmp_path):
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
low_grid = [
[{"text": "x", "bbox": [], "confidence": 0.10, "row": 0, "col": 0}],
]
monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: low_grid)
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-2"}
action = {"type": "extract_dossier", "parameters": {"min_confidence": 0.5}}
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-low")
assert ok is False # gate a basculé en needs_review
job_id = replay_state["variables"]["extracted_dossier"]
with mem_app.app_context():
assert db.session.get(ExtractionJob, job_id).status == "needs_review"
@pytest.mark.unit
def test_handle_extract_dossier_empty_grid_no_raise(mem_app, monkeypatch, tmp_path):
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: [])
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-3"}
action = {"type": "extract_dossier", "parameters": {}}
# Ne lève jamais ; grille vide → needs_review
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-empty")
assert ok is False
job_id = replay_state["variables"]["extracted_dossier"]
with mem_app.app_context():
assert db.session.get(ExtractionJob, job_id).status == "needs_review"
@pytest.mark.unit
def test_handle_extract_dossier_persist_failure_no_raise(mem_app, monkeypatch, tmp_path):
"""Si la persistance lève, le handler log et n'échoue PAS le replay."""
shot = tmp_path / "shot.png"
shot.write_bytes(b"\x89PNG")
monkeypatch.setattr("core.llm.extract_grid_from_image", lambda *a, **k: _grid_2x2())
monkeypatch.setattr(vwb_db, "vwb_app_context", lambda: mem_app.app_context())
def _boom(*a, **k):
raise RuntimeError("DB down")
monkeypatch.setattr(vwb_db, "persist_extracted_dossier", _boom)
replay_state = {"last_screenshot": str(shot), "variables": {}, "replay_id": "rep-4"}
action = {"type": "extract_dossier", "parameters": {}}
ok = replay_engine._handle_extract_dossier_action(action, replay_state, "sess-boom")
assert ok is False # jamais de raise
@pytest.mark.unit
def test_extract_dossier_declared_in_action_type_sets():
assert "extract_dossier" in replay_engine._ALLOWED_ACTION_TYPES
assert "extract_dossier" in replay_engine._SERVER_SIDE_ACTION_TYPES

View File

@@ -0,0 +1,79 @@
"""Tests pour extract_grid_from_image — lecture de tableau STRUCTURÉE.
Contrairement à extract_table_from_image (qui jette x et retourne une liste
plate triée par y), extract_grid_from_image reconstruit une vraie grille
List[List[cell]] : clustering des lignes par proximité y, des colonnes par
proximité x. bbox + confiance conservées par cellule.
Les tokens OCR sont injectés (mock du reader EasyOCR) → pas de PNG réel,
pas de GPU.
"""
from pathlib import Path
from types import SimpleNamespace
from PIL import Image
import core.llm.ocr_extractor as ocr_extractor
def _blank_png(path: Path) -> None:
Image.new("RGB", (300, 120), "white").save(path)
def _bbox(x0: float, y0: float, x1: float, y1: float):
"""bbox EasyOCR = 4 points [tl, tr, br, bl], chaque point (x, y)."""
return [[x0, y0], [x1, y0], [x1, y1], [x0, y1]]
def _fake_reader(tokens):
"""Reader factice : readtext() renvoie la liste (bbox, text, conf) fournie."""
return SimpleNamespace(readtext=lambda *a, **k: tokens)
def test_extract_grid_2x3(tmp_path, monkeypatch):
image_path = tmp_path / "table.png"
_blank_png(image_path)
# 2 lignes (y≈10 et y≈60) × 3 colonnes (x≈10, x≈110, x≈210).
# Volontairement mélangées dans l'ordre OCR pour vérifier le tri.
tokens = [
(_bbox(110, 58, 160, 78), "B2", 0.97),
(_bbox(10, 10, 60, 30), "A1", 0.91),
(_bbox(210, 12, 260, 32), "C1", 0.88),
(_bbox(210, 60, 260, 80), "C2", 0.95),
(_bbox(10, 60, 60, 80), "A2", 0.90),
(_bbox(110, 8, 160, 28), "B1", 0.93),
]
monkeypatch.setattr(ocr_extractor, "_get_reader", lambda: _fake_reader(tokens))
grid = ocr_extractor.extract_grid_from_image(str(image_path))
# Grille 2×3 ordonnée
assert len(grid) == 2, "doit détecter 2 lignes"
assert all(len(row) == 3 for row in grid), "chaque ligne doit avoir 3 colonnes"
texts = [[cell["text"] for cell in row] for row in grid]
assert texts == [["A1", "B1", "C1"], ["A2", "B2", "C2"]]
# Métadonnées conservées + indices row/col cohérents
cell = grid[0][2]
assert cell["text"] == "C1"
assert cell["confidence"] == 0.88
assert cell["bbox"] == _bbox(210, 12, 260, 32)
assert cell["row"] == 0
assert cell["col"] == 2
assert grid[1][0]["row"] == 1 and grid[1][0]["col"] == 0
def test_extract_grid_empty_when_no_tokens(tmp_path, monkeypatch):
image_path = tmp_path / "blank.png"
_blank_png(image_path)
monkeypatch.setattr(ocr_extractor, "_get_reader", lambda: _fake_reader([]))
grid = ocr_extractor.extract_grid_from_image(str(image_path))
assert grid == []
def test_extract_grid_missing_file_returns_empty():
grid = ocr_extractor.extract_grid_from_image("/no/such/file.png")
assert grid == []

View File

@@ -79,3 +79,158 @@ def test_texte_sans_pii_inchange():
out, ents = anonymize_text(t) out, ents = anonymize_text(t)
assert out == t assert out == t
assert ents == [] assert ents == []
# --- sanitize_event : assainissement au niveau event (option b pour text_input) ---
def test_sanitize_text_input_remplace_contenu_par_saisie():
"""Option b (Dom) : le contenu tapé n'est pas gardé -> [SAISIE]."""
from agent_v0.server_v1.pii_sanitizer import sanitize_event
ev = {
"type": "text_input",
"text": "hemorragie post-operatoire saignement", # contenu médical
"raw_keys": ["h", "e", "m"],
"window": {"title": "VIOLA (VIOLA) Liliane 90 ans - IPP: 168246 - Firefox",
"app_name": "firefox.exe"},
}
out = sanitize_event(ev)
assert out["text"] == "[SAISIE]"
assert out["raw_keys"] == "[SAISIE]"
# le titre de la fenêtre est assaini (identité tokenisée, app gardée)
assert "168246" not in out["window"]["title"]
assert "VIOLA" not in out["window"]["title"]
assert "[IPP_1]" in out["window"]["title"] and "Firefox" in out["window"]["title"]
# l'event d'origine n'est PAS muté
assert ev["text"].startswith("hemorragie")
def test_sanitize_heartbeat_titre_direct():
from agent_v0.server_v1.pii_sanitizer import sanitize_event
ev = {"type": "heartbeat",
"active_window_title": "GXD5 Pacs CIM ARES - [DATTIN Alix] - Firefox"}
out = sanitize_event(ev)
assert "DATTIN" not in out["active_window_title"]
assert "[NOM_1]" in out["active_window_title"] and "Pacs" in out["active_window_title"]
def test_sanitize_focus_change_to_from_window():
from agent_v0.server_v1.pii_sanitizer import sanitize_event
ev = {"type": "window_focus_change",
"from": None,
"to": {"title": "LAVAL (BARTHELEMY) Nicole 86 ans - Expert Sante", "app_name": "firefox.exe"},
"window": {"title": "LAVAL (BARTHELEMY) Nicole 86 ans - Expert Sante"}}
out = sanitize_event(ev)
assert out["from"] is None # null géré
assert "LAVAL" not in out["to"]["title"]
assert "[NOM_1]" in out["to"]["title"]
# cohérence : même patient dans to et window -> même token
assert out["window"]["title"] == out["to"]["title"]
def test_sanitize_action_result_inchange():
from agent_v0.server_v1.pii_sanitizer import sanitize_event
ev = {"type": "action_result", "base_shot_id": "shot_0003", "image": "x.png"}
assert sanitize_event(ev) == ev
def test_prenom_nom_inverse():
"""FN-1/2/3 (Qwen) : « Prénom NOM » inversé (sans parens/crochets)."""
from agent_v0.server_v1.pii_sanitizer import anonymize_text
m: dict = {}
for s, leak in [("Alix DATTIN - Mozilla Firefox", "DATTIN"),
("Agathe RONDOT - PACS CIM ARES", "RONDOT"),
("Marie FLANDINETTE - Mozilla Firefox", "FLANDINETTE")]:
out, _ = anonymize_text(s, mapping=m)
assert leak not in out, out
assert "[NOM_" in out
# pas de faux positif sur les logiciels (2e mot non capitalisé tout en majuscules)
out, ents = anonymize_text("Mozilla Firefox - Expert Sante - Consultation")
assert out == "Mozilla Firefox - Expert Sante - Consultation"
assert ents == []
def test_sanitize_event_titre_imbrique_vision_info():
"""FN-4 (Qwen) : titre PII imbriqué dans vision_info.window_capture (228 events)."""
from agent_v0.server_v1.pii_sanitizer import sanitize_event
titre = "VIOLA (VIOLA) Liliane 90 ans - IPP: 168246 - Firefox"
ev = {
"type": "mouse_click",
"window": {"title": titre, "app_name": "firefox.exe"},
"vision_info": {"window_capture": {"window_title": titre, "app_name": "firefox.exe"}},
}
out = sanitize_event(ev)
wc = out["vision_info"]["window_capture"]["window_title"]
assert "168246" not in wc and "VIOLA" not in wc, wc
assert "[IPP_1]" in wc
# cohérence : même titre dans window et vision_info -> même token
assert out["window"]["title"] == wc
def test_sanitize_workflow_dict_tokenise_by_text_garde_ui():
"""R1/PII : un workflow appris ne doit pas porter de PII brute dans ses cibles
(by_text) ni ses noms avant import en DB VWB ; l'interface est préservée."""
import json
from agent_v0.server_v1.pii_sanitizer import sanitize_workflow_dict
wf = {
"name": "Dossier patient",
"nodes": [{"node_id": "n1", "name": "VIOLA (VIOLA) Liliane 90 ans"}],
"edges": [{
"edge_id": "e1",
"action": {
"type": "mouse_click",
"target": {"by_text": "Valider", "by_role": "ocr"},
},
}],
}
out = sanitize_workflow_dict(wf)
s = json.dumps(out, ensure_ascii=False)
assert "VIOLA" not in s # nom clinique tokenisé (dans un node name)
assert "[NOM_1]" in s
assert "90 ans" not in s # âge tokenisé
assert "Valider" in s # cible UI préservée (by_text)
assert "VIOLA" in json.dumps(wf, ensure_ascii=False) # original non muté
def test_chevauchement_prefix_capitalise():
"""FN bloquant (Claude R1) : mot capitalisé avant NOM (NAISSANCE) Prénom
-> RE_PRENOM_NOM captait « Dossier VIOLA » et bloquait RE_NOM_NAISSANCE
« VIOLA (VIOLA) Liliane ». Fix : résolution par priorité détecteur + longueur."""
from agent_v0.server_v1.pii_sanitizer import anonymize_text
m: dict = {}
for titre, leak in [("Dossier VIOLA (VIOLA) Liliane", "VIOLA"),
("Patient ROSSIGNOL (SOUBIE) Pierrette", "ROSSIGNOL"),
("Fenetre LAVAL (BARTHELEMY) Nicole", "LAVAL")]:
out, _ = anonymize_text(titre, mapping=m)
assert leak not in out, f"FN: {leak} still visible in '{out}'"
# contrôle : sans préfixe, toujours OK
out, _ = anonymize_text("VIOLA (VIOLA) Liliane", mapping=m)
assert "VIOLA" not in out
def test_gxd5_diagnostics_numero_et_nom():
"""GXD5 Diagnostics — numéro de dossier + nom tout-majuscules (3 patients prod)."""
from agent_v0.server_v1.pii_sanitizer import anonymize_text
m: dict = {}
for titre, num_leak, nom_leak in [
("GXD5 Diagnostics - 128008 - BENVENISTE MARIE-LAURENCE", "128008", "BENVENISTE"),
("GXD5 Diagnostics - 272223 - LEMOINE ERIC", "272223", "LEMOINE"),
("GXD5 Diagnostics - 153442 - ROSELIER MATHEO", "153442", "ROSELIER"),
]:
out, ents = anonymize_text(titre, mapping=m)
assert num_leak not in out, f"FN: numéro {num_leak} visible dans '{out}'"
assert nom_leak not in out, f"FN: nom {nom_leak} visible dans '{out}'"
types = {e["type"] for e in ents}
assert "DOSSIER" in types, f"Pas de token DOSSIER dans {ents}"
assert "NOM" in types, f"Pas de token NOM dans {ents}"

View File

@@ -0,0 +1,68 @@
"""Non-régression sécurité : câblage PII au chokepoint ``stream_event``.
Invariant : un event contenant de la PII patient (titre de fenêtre + contenu
saisi) passé à ``stream_event`` ne doit JAMAIS écrire la PII brute dans le
journal ``live_events.jsonl``, ni la propager au worker ou au shadow observer.
L'assainissement a lieu une seule fois, en amont des chemins de
persistance/traitement (``api_stream.py``, hook ``sanitize_event``).
"""
import asyncio
import json
import os
# Le module serveur refuse de se charger sans token (sécurité prod) ;
# en test unitaire on désactive l'auth pour pouvoir importer le module.
os.environ.setdefault("RPA_AUTH_DISABLED", "true")
import agent_v0.server_v1.api_stream as api
def _event_avec_pii():
# PII captée par la couche 1 : IPP (structurel) + contenu saisi.
# Contexte = logiciel métier réel du POC (pas la maquette Easily abandonnée).
# (Les noms libres sans marqueur relèvent de la couche 2 NER — hors scope ici.)
return {
"type": "text_input",
"text": "anticoagulant 75mg matin",
"active_window_title": "Gxd5diag - Recherche dossier (IPP: 123456)",
}
def test_stream_event_assainit_et_propage_sur_les_chemins(tmp_path, monkeypatch):
"""Le chokepoint applique sanitize_event UNE fois et tous les chemins
(jsonl, worker, shadow) reçoivent la copie assainie — pas la valeur brute."""
captured = {}
monkeypatch.setattr(api, "_ensure_session_registered", lambda *a, **k: None)
monkeypatch.setattr(
api.worker,
"process_event_direct",
lambda sid, ev: (captured.__setitem__("worker", ev), {})[1],
)
monkeypatch.setattr(
api, "shadow_observe_event", lambda sid, ev: captured.__setitem__("shadow", ev)
)
monkeypatch.setattr(api, "LIVE_SESSIONS_DIR", tmp_path)
api._session_pii_mapping.pop("sess_pii", None)
se = api.StreamEvent(
session_id="sess_pii",
machine_id="lea-test",
timestamp=1000.0,
event=_event_avec_pii(),
)
asyncio.run(api.stream_event(se))
# 1. le journal sur disque ne contient ni l'IPP brut ni le contenu saisi
jsonl = (tmp_path / "lea-test" / "sess_pii" / "live_events.jsonl").read_text(
encoding="utf-8"
)
assert "123456" not in jsonl
assert "anticoagulant 75mg" not in jsonl
# 2. contenu saisi masqué + IPP tokenisé (preuve que le titre est traité)
assert "[SAISIE]" in jsonl
assert "[IPP_1]" in jsonl
# 3. worker et shadow reçoivent l'event assaini, pas la valeur brute
assert captured["worker"]["text"] == "[SAISIE]"
assert "123456" not in json.dumps(captured["worker"], ensure_ascii=False)
assert "123456" not in json.dumps(captured["shadow"], ensure_ascii=False)

View File

@@ -0,0 +1,44 @@
"""
Test de non-régression : conservation du machine_id au round-trip to_dict/from_dict.
Bug : les workflows listés via /api/v1/traces/stream/workflows étaient tous
attribués à machine_id="default" alors que les sessions portaient le bon
machine_id (lea-*). Cause : to_dict ne sérialisait pas l'attribut d'instance
`_machine_id` et from_dict ne le reposait pas (il dormait dans
metadata['machine_id']). list_workflows tombait alors sur le fallback "default".
"""
from datetime import datetime
from core.models.workflow_graph import Workflow
def _make_minimal_workflow(machine_id: str) -> Workflow:
"""Construit un workflow minimal portant un machine_id dans ses métadonnées."""
now = datetime.now().isoformat()
return Workflow.from_dict({
"workflow_id": "wf-test",
"name": "wf-test",
"nodes": [],
"edges": [],
"safety_rules": {},
"stats": {},
"learning": {},
"entry_nodes": [],
"end_nodes": [],
"created_at": now,
"updated_at": now,
"metadata": {"machine_id": machine_id},
})
def test_machine_id_preserved_after_to_dict_from_dict_round_trip():
"""Un workflow doit conserver son machine_id après un round-trip de (dé)sérialisation."""
wf = _make_minimal_workflow("lea-poste-3")
# Simule l'étiquetage runtime fait par le stream_processor
wf._machine_id = "lea-poste-3"
restored = Workflow.from_dict(wf.to_dict())
# Invariant : le machine_id survit au round-trip (comme le fait list_workflows)
assert getattr(restored, "_machine_id", "default") == "lea-poste-3"

View File

@@ -321,6 +321,70 @@ class ExecutionStep(db.Model):
} }
# ---------------------------------------------------------------------------
# Extraction — « dossier patient extrait » (brique 2)
#
# ⚠️ CANAL EXTRACTION ≠ canal apprentissage. Ces tables conservent les
# VRAIES données patient (patient_ref, ExtractedField.value) : c'est le but,
# constituer le dossier. Elles NE doivent PAS être anonymisées/tokenisées
# (à l'inverse du canal apprentissage, cf. pii_sanitizer). Aucun appel
# d'assainissement PII ne doit cibler ces colonnes.
#
# Sémantique de preuve réutilisée de contracts/evidence.py (VWBEvidence) :
# screenshot_ref ≈ screenshot, screen_bbox/bbox ≈ highlight_box, confidence
# ≈ confidence_score, created_at ≈ timestamp.
# ---------------------------------------------------------------------------
class ExtractionJob(db.Model):
"""Dossier patient extrait — racine d'une session d'extraction."""
__tablename__ = 'extraction_jobs'
id = db.Column(db.String(64), primary_key=True)
patient_ref = db.Column(db.String(255), nullable=True) # donnée patient EN CLAIR (volontaire)
source_session_id = db.Column(db.String(64), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# status: 'needs_review' (revue humaine requise) | 'complete' (validé)
status = db.Column(db.String(32), default='needs_review')
tables = db.relationship('ExtractedTable', backref='job', lazy='dynamic',
cascade='all, delete-orphan')
def __repr__(self):
return f'<ExtractionJob {self.id}: {self.status}>'
class ExtractedTable(db.Model):
"""Tableau extrait d'un écran (preuve : screenshot_ref + screen_bbox)."""
__tablename__ = 'extracted_tables'
id = db.Column(db.String(64), primary_key=True)
job_id = db.Column(db.String(64), db.ForeignKey('extraction_jobs.id'), nullable=False)
screen_bbox = db.Column(db.JSON, nullable=True) # {x, y, width, height}
screenshot_ref = db.Column(db.String(512), nullable=True)
fields = db.relationship('ExtractedField', backref='table', lazy='dynamic',
cascade='all, delete-orphan')
def __repr__(self):
return f'<ExtractedTable {self.id}>'
class ExtractedField(db.Model):
"""Cellule extraite (donnée patient EN CLAIR) + preuve bbox/confidence."""
__tablename__ = 'extracted_fields'
id = db.Column(db.String(64), primary_key=True)
table_id = db.Column(db.String(64), db.ForeignKey('extracted_tables.id'), nullable=False)
row = db.Column(db.Integer, nullable=True)
col = db.Column(db.Integer, nullable=True)
value = db.Column(db.Text, nullable=True) # valeur patient EN CLAIR (volontaire)
bbox = db.Column(db.JSON, nullable=True) # {x, y, width, height}
confidence = db.Column(db.Float, nullable=True)
def __repr__(self):
return f'<ExtractedField {self.id}: r{self.row}c{self.col}>'
# Session active (en mémoire, pas en DB) # Session active (en mémoire, pas en DB)
class SessionState: class SessionState:
"""État de la session utilisateur (en mémoire)""" """État de la session utilisateur (en mémoire)"""

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Test TDD — Extraction (brique 2) : modèle « dossier patient extrait ».
Objectif : valider les 3 modèles métier d'extraction (absents avant cette brique) :
ExtractionJob → ExtractedTable → ExtractedField
avec leurs relations, cascade, et le `status` ∈ {complete, needs_review}.
⚠️ CANAL EXTRACTION ≠ canal apprentissage : ici on conserve les **vraies
données patient** (le but est de constituer le dossier). Pas d'anonymisation.
Le test pose donc une valeur patient en clair et vérifie qu'elle est restituée
telle quelle.
Isolation (même pattern que test_import_core_workflow_to_db.py) :
- pas d'app Flask complète (`app.py`), pas de socketio/blueprints ;
- `db` partagé (`db.models.db`) lié à une SQLite **en mémoire**.
"""
import sys
from datetime import datetime
from pathlib import Path
import pytest
from flask import Flask
_BACKEND = Path(__file__).resolve().parent.parent.parent # .../visual_workflow_builder/backend
_ROOT = _BACKEND.parent.parent # .../rpa_vision_v3
for p in (str(_ROOT), str(_BACKEND)):
if p not in sys.path:
sys.path.insert(0, p)
from db.models import db # noqa: E402
@pytest.fixture
def db_app():
"""App Flask minimale liée à une SQLite en mémoire, schéma créé."""
app = Flask("test_extraction_models")
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
def test_extraction_job_table_field_chain(db_app):
"""Chaîne complète Job → Table → Field, relations + status par défaut."""
from db.models import ExtractionJob, ExtractedTable, ExtractedField
with db_app.app_context():
job = ExtractionJob(
id="job_001",
patient_ref="MOREL Catherine", # donnée patient EN CLAIR (canal extraction)
source_session_id="sess_extract_001",
)
table = ExtractedTable(
id="tbl_001",
job=job,
screen_bbox={"x": 10, "y": 20, "width": 300, "height": 120},
screenshot_ref="data/extract/sess_extract_001/screen_0.png",
)
field = ExtractedField(
id="fld_001",
table=table,
row=0,
col=1,
value="1975-04-12",
bbox={"x": 110, "y": 22, "width": 80, "height": 18},
confidence=0.94,
)
db.session.add(job)
db.session.commit()
# status par défaut appliqué à l'INSERT = needs_review (revue humaine requise)
assert job.status == "needs_review"
# Relations descendantes
assert job.tables.count() == 1
assert job.tables.first().fields.count() == 1
# Relations remontantes
f = ExtractedField.query.get("fld_001")
assert f.table.job.patient_ref == "MOREL Catherine" # patient conservé en clair
assert f.value == "1975-04-12"
assert f.bbox["width"] == 80
assert f.confidence == pytest.approx(0.94)
assert f.table.screen_bbox["height"] == 120
def test_status_complete_is_accepted(db_app):
"""`status` accepte 'complete' (extraction validée)."""
from db.models import ExtractionJob
with db_app.app_context():
job = ExtractionJob(id="job_ok", patient_ref="DUPONT Jean", status="complete")
db.session.add(job)
db.session.commit()
assert ExtractionJob.query.get("job_ok").status == "complete"
assert job.created_at is not None and isinstance(job.created_at, datetime)
def test_cascade_delete_removes_children(db_app):
"""Supprimer le Job supprime tables + fields (cascade, pas d'orphelins)."""
from db.models import ExtractionJob, ExtractedTable, ExtractedField
with db_app.app_context():
job = ExtractionJob(id="job_del", patient_ref="X")
table = ExtractedTable(id="tbl_del", job=job, screen_bbox={}, screenshot_ref="s.png")
ExtractedField(id="fld_del", table=table, row=0, col=0, value="v",
bbox={}, confidence=0.5)
db.session.add(job)
db.session.commit()
db.session.delete(job)
db.session.commit()
assert ExtractionJob.query.count() == 0
assert ExtractedTable.query.count() == 0
assert ExtractedField.query.count() == 0