Compare commits
5 Commits
c82829f2bb
...
3ed9798f06
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ed9798f06 | ||
|
|
b65710ae43 | ||
|
|
509a026cfc | ||
|
|
a62b720144 | ||
|
|
14b1bf844a |
@@ -82,6 +82,17 @@ BLUR_SENSITIVE = os.environ.get("RPA_BLUR_SENSITIVE", "true").lower() in ("true"
|
||||
# Configurable via variable d'environnement pour permettre l'ajustement
|
||||
LOG_RETENTION_DAYS = int(os.environ.get("RPA_LOG_RETENTION_DAYS", "180"))
|
||||
|
||||
# Remontée automatique des logs vers le serveur (push-log-DGX).
|
||||
# Diagnostic des postes clinique SANS AnyDesk : les logs (déjà écrits sur disque)
|
||||
# sont poussés au serveur, rangés par machine_id, consultables au dashboard.
|
||||
# Défaut PRUDENT = désactivé : on l'active poste par poste via config.txt /
|
||||
# variable d'environnement, sans rebuild de l'installateur.
|
||||
LOG_SHIP_ENABLED = os.environ.get("RPA_LOG_SHIP_ENABLED", "false").lower() in (
|
||||
"true", "1", "yes",
|
||||
)
|
||||
# Intervalle de flush du buffer de logs (secondes).
|
||||
LOG_SHIP_INTERVAL_S = float(os.environ.get("RPA_LOG_SHIP_INTERVAL_S", "30"))
|
||||
|
||||
# Monitoring
|
||||
PERF_MONITOR_INTERVAL_S = 30
|
||||
LOGS_DIR = BASE_DIR / "logs"
|
||||
|
||||
@@ -17,7 +17,7 @@ import threading
|
||||
from .config import (
|
||||
SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID, LOG_RETENTION_DAYS, LOG_FILE,
|
||||
SCREEN_RESOLUTION, DPI_SCALE, OS_THEME, API_TOKEN, MAX_SESSION_DURATION_S,
|
||||
STREAMING_ENDPOINT,
|
||||
STREAMING_ENDPOINT, LOG_SHIP_ENABLED, LOG_SHIP_INTERVAL_S,
|
||||
)
|
||||
from .core.captor import EventCaptorV1
|
||||
from .core.executor import ActionExecutorV1
|
||||
@@ -62,6 +62,26 @@ except Exception:
|
||||
for _noisy in ("urllib3", "requests.packages.urllib3", "PIL", "mss"):
|
||||
logging.getLogger(_noisy).setLevel(logging.WARNING)
|
||||
|
||||
# push-log-DGX : remontée automatique des logs vers le serveur (diagnostic des
|
||||
# postes SANS AnyDesk). GARDÉ derrière RPA_LOG_SHIP_ENABLED (défaut désactivé) —
|
||||
# activable poste par poste via config.txt, sans rebuild. Le handler est attaché
|
||||
# au logger racine APRÈS setup_logging (les logs partent aussi dans le fichier).
|
||||
_log_shipper = None
|
||||
if LOG_SHIP_ENABLED:
|
||||
try:
|
||||
from .network.log_shipper import LogShipper
|
||||
_log_shipper = LogShipper(
|
||||
machine_id=MACHINE_ID,
|
||||
max_batch=int(os.environ.get("RPA_AGENT_LOGS_MAX_BATCH", "1000")),
|
||||
flush_interval_s=LOG_SHIP_INTERVAL_S,
|
||||
)
|
||||
logging.getLogger().addHandler(_log_shipper.handler)
|
||||
_log_shipper.start()
|
||||
except Exception as _e:
|
||||
# Ne JAMAIS empêcher Léa de démarrer pour un problème de remontée de logs.
|
||||
logging.getLogger(__name__).warning("Log shipper non démarré : %s", _e)
|
||||
_log_shipper = None
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Intervalle de polling replay (secondes)
|
||||
|
||||
317
agent_v0/agent_v1/network/log_shipper.py
Normal file
317
agent_v0/agent_v1/network/log_shipper.py
Normal file
@@ -0,0 +1,317 @@
|
||||
# agent_v1/network/log_shipper.py
|
||||
"""Remontée AUTOMATIQUE des logs du client Léa vers le serveur (push-log-DGX).
|
||||
|
||||
But : diagnostiquer les postes Windows clinique SANS AnyDesk. Les logs déjà
|
||||
écrits sur disque par `logging_setup.py` (rotation quotidienne, rétention 180 j,
|
||||
Règlement IA Art. 12) sont en plus poussés au serveur, rangés par `machine_id`,
|
||||
consultables au dashboard.
|
||||
|
||||
Serveur (déjà prêt — NE PAS toucher) :
|
||||
POST /api/v1/agents/logs
|
||||
body = {machine_id: str, logs: [{ts, level, logger, message}]}
|
||||
borne RPA_AGENT_LOGS_MAX_BATCH (défaut 1000) — 413 si dépassée.
|
||||
|
||||
Conception :
|
||||
- `LogShipperHandler(logging.Handler)` : sur `emit(record)`, formate au
|
||||
schéma EXACT `{ts, level, logger, message}`, applique un assainissement
|
||||
PII au message (défense en profondeur — la discipline `log_safe` à la
|
||||
source logue déjà des hashes/longueurs, pas du contenu brut), puis
|
||||
empile dans un buffer borné.
|
||||
- `LogShipper` : flush par BATCH (≤ max_batch) via un `sender` callable
|
||||
INJECTABLE `(machine_id, logs) -> bool`. Défaut = POST réel Bearer
|
||||
(pattern `streamer.py`).
|
||||
- Résilience (ZÉRO perte) : si `sender` renvoie False ou lève, les logs
|
||||
RESTENT dans le buffer et sont rejoués au flush suivant. Le fichier de
|
||||
log local reste de toute façon la source durable (survit au crash) ; le
|
||||
buffer RAM est un best-effort de remontée, volontairement NON persisté en
|
||||
SQLite (le `PersistentBuffer` est session/event-scoped — y mêler des logs
|
||||
polluerait la DB d'events). Borne mémoire = `max_buffer` (drop des plus
|
||||
VIEUX au-delà — un log récent vaut mieux qu'un vieux pour le diagnostic).
|
||||
|
||||
Pattern d'import PII : on tente `anonymize_text` (server_v1.pii_sanitizer,
|
||||
source de vérité des tokens typés) via le même import paresseux tolérant que
|
||||
`ui/messages.py`. Sur un vrai poste (sans server_v1), on retombe sur l'identité :
|
||||
acceptable car la PII de message est déjà neutralisée à la source par la
|
||||
discipline `log_safe`. Le sanitizer reste INJECTABLE pour les tests/évolutions.
|
||||
|
||||
Branche feat/push-log-dgx.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from typing import Callable, Deque, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Schéma d'une entrée de log poussée au serveur.
|
||||
# ts : epoch (float) — l'heure de l'évènement
|
||||
# level : nom du niveau ("INFO", "WARNING"...)
|
||||
# logger : nom du logger (record.name)
|
||||
# message : message formaté (args interpolés) ET assaini PII
|
||||
|
||||
# Défaut aligné sur la borne serveur RPA_AGENT_LOGS_MAX_BATCH (api_stream.py).
|
||||
DEFAULT_MAX_BATCH = 1000
|
||||
|
||||
# Borne mémoire du buffer : au-delà, on droppe les plus VIEUX (diagnostic =
|
||||
# on préfère les logs récents). Quelques milliers d'entrées = quelques Mo RAM.
|
||||
DEFAULT_MAX_BUFFER = 5000
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Assainissement PII du message (défense en profondeur)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _default_message_sanitizer(text: str) -> str:
|
||||
"""Sanitizer par défaut côté client = identité.
|
||||
|
||||
Le **rempart PII des logs est le SERVEUR** : `sanitize_log_entries`
|
||||
ré-assainit chaque message à la réception (`/api/v1/agents/logs`), via le
|
||||
même `anonymize_text` que les events. Tenter un import de `server_v1` côté
|
||||
poste à CHAQUE ligne de log est inutile (absent du bundle client) et coûteux
|
||||
(exception attrapée par emit). La discipline `log_safe` neutralise déjà la
|
||||
PII à la source. Reste INJECTABLE pour tests/évolutions.
|
||||
"""
|
||||
return text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Handler — empile les LogRecords dans un buffer partagé
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class LogShipperHandler(logging.Handler):
|
||||
"""Handler logging qui sérialise chaque record et l'empile pour envoi.
|
||||
|
||||
Ne fait AUCUN réseau : il alimente seulement le buffer du `LogShipper`.
|
||||
L'envoi est piloté par `LogShipper.flush()` (thread dédié périodique).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
buffer: Deque[Dict],
|
||||
lock: threading.Lock,
|
||||
message_sanitizer: Callable[[str], str],
|
||||
max_buffer: int = DEFAULT_MAX_BUFFER,
|
||||
level=logging.NOTSET,
|
||||
):
|
||||
super().__init__(level=level)
|
||||
self._buffer = buffer
|
||||
self._lock = lock
|
||||
self._sanitize = message_sanitizer
|
||||
self._max_buffer = max_buffer
|
||||
|
||||
def _format_record(self, record: logging.LogRecord) -> Dict:
|
||||
"""Construit l'entrée au schéma EXACT {ts, level, logger, message}.
|
||||
|
||||
`record.getMessage()` interpole les args (%s...). Le message est ensuite
|
||||
passé au sanitizer PII. Tolérant : un message non formatable ne doit pas
|
||||
faire perdre l'entrée.
|
||||
"""
|
||||
try:
|
||||
message = record.getMessage()
|
||||
except Exception:
|
||||
message = str(record.msg)
|
||||
try:
|
||||
message = self._sanitize(message)
|
||||
except Exception:
|
||||
# Le sanitizer ne doit jamais casser le logging.
|
||||
pass
|
||||
return {
|
||||
"ts": record.created,
|
||||
"level": record.levelname,
|
||||
"logger": record.name,
|
||||
"message": message,
|
||||
}
|
||||
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
"""Sérialise et empile le record (best-effort, ne lève jamais)."""
|
||||
try:
|
||||
entry = self._format_record(record)
|
||||
with self._lock:
|
||||
# deque(maxlen) droppe automatiquement le plus VIEUX au-delà
|
||||
# de la borne — pas de croissance mémoire non bornée.
|
||||
self._buffer.append(entry)
|
||||
except Exception:
|
||||
# handleError respecte logging.raiseExceptions (silencieux en prod).
|
||||
self.handleError(record)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shipper — flush périodique par batch via un sender injectable
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class LogShipper:
|
||||
"""Orchestre la remontée des logs : buffer + flush par batch.
|
||||
|
||||
Args:
|
||||
machine_id : identifiant du poste (config.MACHINE_ID en prod).
|
||||
sender : callable INJECTABLE `(machine_id, logs) -> bool`. True =
|
||||
accusé de réception serveur. Défaut = POST réel Bearer.
|
||||
max_batch : taille max d'un batch (≤ borne serveur). Défaut 1000.
|
||||
max_buffer : borne mémoire du buffer (drop des plus vieux au-delà).
|
||||
message_sanitizer : assainissement PII du message. Défaut = pii_sanitizer
|
||||
si disponible, sinon identité.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
machine_id: str,
|
||||
sender: Optional[Callable[[str, List[Dict]], bool]] = None,
|
||||
max_batch: int = DEFAULT_MAX_BATCH,
|
||||
max_buffer: int = DEFAULT_MAX_BUFFER,
|
||||
message_sanitizer: Optional[Callable[[str], str]] = None,
|
||||
flush_interval_s: float = 30.0,
|
||||
):
|
||||
self.machine_id = machine_id
|
||||
self.max_batch = max(1, int(max_batch))
|
||||
self.flush_interval_s = flush_interval_s
|
||||
self._sender = sender if sender is not None else self._default_sender
|
||||
self._sanitize = message_sanitizer or _default_message_sanitizer
|
||||
self._lock = threading.Lock()
|
||||
self._buffer: Deque[Dict] = deque(maxlen=max_buffer)
|
||||
self.handler = LogShipperHandler(
|
||||
buffer=self._buffer,
|
||||
lock=self._lock,
|
||||
message_sanitizer=self._sanitize,
|
||||
max_buffer=max_buffer,
|
||||
)
|
||||
self._running = False
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Introspection (diagnostic / tests)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def peek_buffer(self) -> List[Dict]:
|
||||
"""Copie des entrées en attente (lecture seule, pour diagnostic/tests)."""
|
||||
with self._lock:
|
||||
return list(self._buffer)
|
||||
|
||||
def pending(self) -> int:
|
||||
with self._lock:
|
||||
return len(self._buffer)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Flush — envoie le buffer par batches ≤ max_batch
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def flush(self) -> int:
|
||||
"""Envoie le buffer par batches successifs. Retourne le nb de logs ACK.
|
||||
|
||||
Résilience ZÉRO perte : on retire un batch du buffer, on tente l'envoi.
|
||||
- Succès → les entrées sont définitivement consommées.
|
||||
- Échec (False ou exception) → on REMET les entrées en tête du buffer
|
||||
et on ARRÊTE la passe (serveur probablement down) ; rejeu au flush
|
||||
suivant. Les entrées non encore extraites restent en place.
|
||||
"""
|
||||
sent = 0
|
||||
while True:
|
||||
with self._lock:
|
||||
if not self._buffer:
|
||||
break
|
||||
batch: List[Dict] = []
|
||||
for _ in range(min(self.max_batch, len(self._buffer))):
|
||||
batch.append(self._buffer.popleft())
|
||||
|
||||
try:
|
||||
ok = self._sender(self.machine_id, batch)
|
||||
except Exception as e:
|
||||
ok = False
|
||||
logger.debug("Log shipper sender a levé : %s", e)
|
||||
|
||||
if ok:
|
||||
sent += len(batch)
|
||||
continue
|
||||
|
||||
# Échec : on remet le batch en tête (ordre préservé) et on arrête.
|
||||
with self._lock:
|
||||
self._buffer.extendleft(reversed(batch))
|
||||
break
|
||||
|
||||
return sent
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Sender réel — POST Bearer (pattern streamer.py)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _auth_headers() -> dict:
|
||||
"""Headers Bearer (pattern streamer.py)."""
|
||||
try:
|
||||
from ..config import API_TOKEN
|
||||
except Exception:
|
||||
API_TOKEN = ""
|
||||
if API_TOKEN:
|
||||
return {"Authorization": f"Bearer {API_TOKEN}"}
|
||||
return {}
|
||||
|
||||
def _default_sender(self, machine_id: str, logs: List[Dict]) -> bool:
|
||||
"""POST réel vers /api/v1/agents/logs. True si HTTP 2xx.
|
||||
|
||||
Best-effort : tout échec réseau/serveur → False (logs conservés,
|
||||
rejoués). Aucune exception ne remonte au-delà du sender.
|
||||
"""
|
||||
try:
|
||||
import requests
|
||||
|
||||
from ..config import SERVER_URL
|
||||
|
||||
url = f"{SERVER_URL}/agents/logs"
|
||||
resp = requests.post(
|
||||
url,
|
||||
json={"machine_id": machine_id, "logs": logs},
|
||||
headers=self._auth_headers(),
|
||||
timeout=5,
|
||||
allow_redirects=False,
|
||||
)
|
||||
return bool(resp.ok)
|
||||
except Exception as e:
|
||||
logger.debug("Log shipper POST échoué : %s", e)
|
||||
return False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Boucle de flush périodique (thread daemon)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start(self) -> None:
|
||||
"""Démarre le thread de flush périodique (idempotent)."""
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
self._thread = threading.Thread(
|
||||
target=self._flush_loop, daemon=True, name="lea-log-shipper"
|
||||
)
|
||||
self._thread.start()
|
||||
logger.info(
|
||||
"Log shipper démarré (machine_id=%s, intervalle=%.0fs, batch≤%d)",
|
||||
self.machine_id, self.flush_interval_s, self.max_batch,
|
||||
)
|
||||
|
||||
def stop(self, final_flush: bool = True) -> None:
|
||||
"""Arrête la boucle et tente un dernier flush (best-effort)."""
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=2.0)
|
||||
if final_flush:
|
||||
try:
|
||||
self.flush()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _flush_loop(self) -> None:
|
||||
while self._running:
|
||||
# Découpe l'attente pour réagir vite à stop().
|
||||
waited = 0.0
|
||||
step = 0.5
|
||||
while self._running and waited < self.flush_interval_s:
|
||||
time.sleep(step)
|
||||
waited += step
|
||||
if not self._running:
|
||||
break
|
||||
try:
|
||||
self.flush()
|
||||
except Exception as e:
|
||||
logger.debug("Log shipper flush loop : %s", e)
|
||||
@@ -27,7 +27,7 @@ from fastapi import BackgroundTasks, Depends, FastAPI, File, HTTPException, Requ
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .pii_sanitizer import sanitize_event
|
||||
from .pii_sanitizer import sanitize_event, sanitize_log_entries
|
||||
from .replay_failure_logger import log_replay_failure
|
||||
from .replay_verifier import ReplayVerifier, VerificationResult
|
||||
from .replay_learner import ReplayLearner
|
||||
@@ -7263,7 +7263,10 @@ async def agents_logs(request: AgentLogsRequest):
|
||||
# Bloque les postes révoqués/désinstallés + met à jour last_seen_at.
|
||||
_guard_agent_registry_access(machine_id, endpoint="agents/logs")
|
||||
|
||||
received = agent_logs_store.append(machine_id, request.logs)
|
||||
# Assainissement PII côté serveur avant persistance (couche 1 regex, sans NER).
|
||||
# Un mapping partagé sur le batch garantit la cohérence des tokens ([NOM_1]…).
|
||||
safe_logs = sanitize_log_entries(request.logs)
|
||||
received = agent_logs_store.append(machine_id, safe_logs)
|
||||
return {"status": "ok", "received": received, "machine_id": machine_id}
|
||||
|
||||
|
||||
|
||||
@@ -203,6 +203,40 @@ def sanitize_event(event: Dict, *, mapping: Optional[Dict] = None) -> Dict:
|
||||
return ev
|
||||
|
||||
|
||||
def sanitize_log_entries(
|
||||
entries: List[Dict], *, mapping: Optional[Dict] = None
|
||||
) -> List[Dict]:
|
||||
"""Assainit un batch de log-entries reçues d'un client Léa avant persistance.
|
||||
|
||||
Pour chaque entrée, renvoie une **copie** où les champs texte porteurs de PII
|
||||
sont passés par `anonymize_text` :
|
||||
- `message` (str) : assaini par `anonymize_text`.
|
||||
- `logger` (str) : assaini de la même façon (peut porter un chemin patient).
|
||||
- `ts` et `level` : préservés à l'identique, jamais touchés.
|
||||
|
||||
Un `mapping` partagé est utilisé pour **toutes** les entrées du batch afin de
|
||||
garantir la cohérence des tokens (même PII → même token). Si `mapping` est
|
||||
None, un mapping local est créé et partagé entre toutes les entrées du batch.
|
||||
|
||||
Tolère les valeurs absentes, None ou non-str sans lever d'exception.
|
||||
N'utilise que `anonymize_text` — aucune regex supplémentaire.
|
||||
"""
|
||||
if not entries:
|
||||
return []
|
||||
if mapping is None:
|
||||
mapping = {}
|
||||
|
||||
result: List[Dict] = []
|
||||
for entry in entries:
|
||||
item = copy.copy(entry) # copie superficielle suffit (valeurs scalaires)
|
||||
for field in ("message", "logger"):
|
||||
v = item.get(field)
|
||||
if isinstance(v, str):
|
||||
item[field] = anonymize_text(v, mapping=mapping)[0]
|
||||
result.append(item)
|
||||
return result
|
||||
|
||||
|
||||
# Clés d'un workflow core portant du texte potentiellement PII : cible OCR
|
||||
# (`by_text`), noms d'écrans/labels dérivés des titres. Le contenu saisi est
|
||||
# déjà neutralisé à la source (sanitize_event → [SAISIE]).
|
||||
|
||||
249
core/extraction/role_mapper.py
Normal file
249
core/extraction/role_mapper.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""role_mapper — reconstruction de champs ANCRÉS sur l'OCR.
|
||||
|
||||
Principe cardinal (gate validé le 30/06 sur DPI urgences réel) :
|
||||
le VLM ne fournit QUE des ids de tokens OCR (`value_ids`) ; la valeur est
|
||||
reconstruite ici depuis l'OCR. Aucun texte produit par le VLM ne peut entrer
|
||||
dans une valeur → **0 hallucination par construction**.
|
||||
|
||||
Ce module est volontairement PUR (pas d'appel réseau/VLM) : il prend les tokens
|
||||
OCR (issus de `core.llm.ocr_extractor.extract_grid_from_image`) et la réponse
|
||||
déjà désérialisée du VLM, et produit des champs ancrés. L'appel VLM lui-même
|
||||
est orchestré ailleurs (et mockable), pour rester testable hors-ligne.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, List, Optional, Sequence, Tuple
|
||||
|
||||
BBox = Tuple[int, int, int, int] # (x_min, y_min, x_max, y_max)
|
||||
|
||||
|
||||
@dataclass
|
||||
class OcrToken:
|
||||
"""Un token OCR indexé par un id stable."""
|
||||
id: int
|
||||
text: str
|
||||
confidence: float = 1.0
|
||||
bbox: Optional[BBox] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class MappedField:
|
||||
"""Un champ {rôle → valeur} dont la valeur est 100% issue de l'OCR."""
|
||||
label: str
|
||||
value: str
|
||||
value_ids: List[int]
|
||||
confidence: float
|
||||
bbox: Optional[BBox]
|
||||
anchored: bool
|
||||
invalid_ids: List[int]
|
||||
|
||||
|
||||
def _norm_bbox(bbox) -> Optional[BBox]:
|
||||
"""Normalise une bbox en (x_min, y_min, x_max, y_max).
|
||||
|
||||
Accepte soit 4 points EasyOCR `[[x,y], ...]`, soit un quadruplet déjà plat.
|
||||
"""
|
||||
if bbox is None:
|
||||
return None
|
||||
if len(bbox) == 4 and all(isinstance(v, (int, float)) for v in bbox):
|
||||
return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))
|
||||
xs = [p[0] for p in bbox]
|
||||
ys = [p[1] for p in bbox]
|
||||
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
|
||||
|
||||
|
||||
def tokens_from_grid(grid: Sequence[Sequence[dict]]) -> List[OcrToken]:
|
||||
"""Convertit une grille `extract_grid_from_image` en tokens indexés (id séquentiel).
|
||||
|
||||
L'ordre des ids suit l'ordre de lecture de la grille (lignes top→bottom,
|
||||
colonnes left→right), ce qui donne au VLM un référentiel stable.
|
||||
"""
|
||||
tokens: List[OcrToken] = []
|
||||
tid = 0
|
||||
for row in grid:
|
||||
for cell in row:
|
||||
tokens.append(OcrToken(
|
||||
id=tid,
|
||||
text=cell["text"],
|
||||
confidence=float(cell.get("confidence", 1.0)),
|
||||
bbox=_norm_bbox(cell.get("bbox")),
|
||||
))
|
||||
tid += 1
|
||||
return tokens
|
||||
|
||||
|
||||
def _enclosing_bbox(bboxes: Sequence[Optional[BBox]]) -> Optional[BBox]:
|
||||
present = [b for b in bboxes if b is not None]
|
||||
if not present:
|
||||
return None
|
||||
return (
|
||||
min(b[0] for b in present),
|
||||
min(b[1] for b in present),
|
||||
max(b[2] for b in present),
|
||||
max(b[3] for b in present),
|
||||
)
|
||||
|
||||
|
||||
def reconstruct_fields(
|
||||
tokens: Sequence[OcrToken],
|
||||
vlm_fields: Sequence[dict],
|
||||
) -> List[MappedField]:
|
||||
"""Reconstruit les champs à partir des tokens OCR et des `value_ids` du VLM.
|
||||
|
||||
Pour chaque champ VLM `{label, value_ids:[...]}` :
|
||||
- déduplique les ids en préservant l'ordre de lecture donné par le VLM ;
|
||||
- filtre les ids hors OCR (listés dans `invalid_ids`) ;
|
||||
- reconstruit la valeur par concaténation des `text` des tokens valides ;
|
||||
- confidence = min des tokens ancrés (le plus prudent), bbox = englobante.
|
||||
|
||||
Tout champ `value`/texte fourni par le VLM est IGNORÉ : seule la liste
|
||||
d'ids fait foi (anti-hallucination).
|
||||
"""
|
||||
by_id = {t.id: t for t in tokens}
|
||||
out: List[MappedField] = []
|
||||
for vf in vlm_fields:
|
||||
label = vf.get("label", "")
|
||||
seen: List[int] = []
|
||||
for i in (vf.get("value_ids") or []):
|
||||
if i not in seen:
|
||||
seen.append(i)
|
||||
valid = [i for i in seen if i in by_id]
|
||||
invalid = [i for i in seen if i not in by_id]
|
||||
toks = [by_id[i] for i in valid]
|
||||
out.append(MappedField(
|
||||
label=label,
|
||||
value=" ".join(t.text for t in toks),
|
||||
value_ids=valid,
|
||||
confidence=min((t.confidence for t in toks), default=0.0),
|
||||
bbox=_enclosing_bbox([t.bbox for t in toks]),
|
||||
anchored=bool(valid),
|
||||
invalid_ids=invalid,
|
||||
))
|
||||
return out
|
||||
|
||||
|
||||
# --- Orchestration VLM (client injectable pour rester testable hors-ligne) ---
|
||||
|
||||
# Un client VLM est un callable (image_path, prompt) -> texte de réponse.
|
||||
VlmClient = Callable[[str, str], str]
|
||||
|
||||
|
||||
def build_role_prompt(
|
||||
tokens: Sequence[OcrToken],
|
||||
roles: Optional[Sequence[str]] = None,
|
||||
) -> str:
|
||||
"""Construit le prompt d'attribution de rôles (ancrage strict par ids).
|
||||
|
||||
Mode *guidé* si `roles` est fourni (rôles attendus de l'écran), sinon *libre*
|
||||
(le VLM nomme lui-même les champs). Dans les deux cas le VLM ne renvoie que
|
||||
des `value_ids` — jamais de texte recopié.
|
||||
"""
|
||||
ocr_list = [{"id": t.id, "text": t.text} for t in tokens]
|
||||
if roles:
|
||||
roles_line = (
|
||||
"Rôles attendus sur cet écran (associe chacun s'il est présent) : "
|
||||
+ ", ".join(roles) + ".\n"
|
||||
)
|
||||
else:
|
||||
roles_line = (
|
||||
"Identifie librement les champs présents — le 'label' est le rôle du champ.\n"
|
||||
)
|
||||
return (
|
||||
"Tu reçois une capture d'écran d'un dossier patient et la liste des tokens "
|
||||
"détectés par OCR (chaque token : id, text).\n"
|
||||
+ roles_line +
|
||||
"Pour chaque champ, désigne les tokens OCR qui composent sa VALEUR.\n"
|
||||
"RÈGLES STRICTES :\n"
|
||||
"- Tu ne recopies AUCUN texte. Tu renvoies seulement 'value_ids' : la liste "
|
||||
"des id de tokens OCR (dans l'ordre de lecture) qui forment la valeur.\n"
|
||||
"- 'label' = le rôle du champ. N'invente aucun champ.\n"
|
||||
"- Réponds UNIQUEMENT en JSON PLAT :\n"
|
||||
'{"ecran":"<type en 3 mots>","champs":[{"label":"...","value_ids":[<int>,...]}]}\n\n'
|
||||
"Tokens OCR :\n" + json.dumps(ocr_list, ensure_ascii=False)
|
||||
)
|
||||
|
||||
|
||||
def parse_vlm_json(text: str) -> dict:
|
||||
"""Extrait le 1er objet JSON d'une réponse VLM (tolère les fences ```json).
|
||||
|
||||
Robuste : renvoie `{}` si la réponse n'est pas du JSON exploitable (pas de
|
||||
crash en batch).
|
||||
"""
|
||||
if not text:
|
||||
return {}
|
||||
s = text.strip()
|
||||
if "```" in s:
|
||||
parts = s.split("```")
|
||||
if len(parts) >= 2:
|
||||
s = parts[1]
|
||||
if s.lstrip().lower().startswith("json"):
|
||||
s = s.lstrip()[4:]
|
||||
a, b = s.find("{"), s.rfind("}")
|
||||
if a < 0 or b <= a:
|
||||
return {}
|
||||
try:
|
||||
return json.loads(s[a:b + 1])
|
||||
except (ValueError, TypeError):
|
||||
return {}
|
||||
|
||||
|
||||
def _norm_label(label: str) -> str:
|
||||
"""Normalise un label pour comparaison : minuscules + strip espaces."""
|
||||
return label.strip().lower()
|
||||
|
||||
|
||||
def assess_quality(
|
||||
fields: Sequence[MappedField],
|
||||
required_roles: Optional[Sequence[str]] = None,
|
||||
min_confidence: float = 0.6,
|
||||
) -> str:
|
||||
"""Évalue la qualité d'extraction d'un dossier à partir des champs reconstruits.
|
||||
|
||||
Renvoie l'un des 4 statuts (par priorité décroissante) :
|
||||
- "failed" : aucun champ, OU aucun champ ancré.
|
||||
- "needs_review" : au moins un rôle requis absent ou non ancré.
|
||||
- "partial" : rôles requis ok mais confidence insuffisante OU champs non ancrés.
|
||||
- "complete" : tout ancré, toutes confidences >= min_confidence, aucun non ancré.
|
||||
|
||||
Le matching required_role ↔ field.label est insensible à la casse et aux espaces.
|
||||
"""
|
||||
# --- failed : aucun champ du tout, ou aucun ancré ---
|
||||
anchored = [f for f in fields if f.anchored]
|
||||
if not fields or not anchored:
|
||||
return "failed"
|
||||
|
||||
# --- needs_review : rôle requis absent ou non ancré ---
|
||||
if required_roles:
|
||||
anchored_labels = {_norm_label(f.label) for f in anchored}
|
||||
for role in required_roles:
|
||||
if _norm_label(role) not in anchored_labels:
|
||||
return "needs_review"
|
||||
|
||||
# --- partial : confidence basse sur un champ ancré OU champs non ancrés ---
|
||||
has_low_confidence = any(f.confidence < min_confidence for f in anchored)
|
||||
has_unanchored = any(not f.anchored for f in fields)
|
||||
if has_low_confidence or has_unanchored:
|
||||
return "partial"
|
||||
|
||||
# --- complete ---
|
||||
return "complete"
|
||||
|
||||
|
||||
def map_roles(
|
||||
image_path: str,
|
||||
tokens: Sequence[OcrToken],
|
||||
vlm_client: VlmClient,
|
||||
roles: Optional[Sequence[str]] = None,
|
||||
) -> List[MappedField]:
|
||||
"""Orchestre l'attribution de rôles : prompt → VLM → parse → reconstruction ancrée.
|
||||
|
||||
`vlm_client` est injecté (testable hors-ligne). Le résultat est toujours
|
||||
ancré sur l'OCR via `reconstruct_fields`.
|
||||
"""
|
||||
prompt = build_role_prompt(tokens, roles)
|
||||
raw = vlm_client(image_path, prompt)
|
||||
data = parse_vlm_json(raw)
|
||||
vlm_fields = data.get("champs", []) if isinstance(data, dict) else []
|
||||
return reconstruct_fields(tokens, vlm_fields)
|
||||
220
tests/unit/test_agent_v1_log_shipper.py
Normal file
220
tests/unit/test_agent_v1_log_shipper.py
Normal file
@@ -0,0 +1,220 @@
|
||||
"""TDD — push-log-DGX : log shipper client Léa (remontée auto des logs).
|
||||
|
||||
Le serveur expose déjà `POST /api/v1/agents/logs` (body
|
||||
`{machine_id, logs:[{ts, level, logger, message}]}`, borne
|
||||
`RPA_AGENT_LOGS_MAX_BATCH`). Côté client, on veut :
|
||||
|
||||
- `LogShipperHandler(logging.Handler)` : sur `emit`, formate un LogRecord
|
||||
au schéma exact `{ts, level, logger, message}`, applique un assainissement
|
||||
PII au message, et empile dans un buffer.
|
||||
- `LogShipper` : flush périodique du buffer par BATCH (≤ max_batch) via un
|
||||
`sender` callable INJECTABLE `(machine_id, logs) -> bool`. Résilience :
|
||||
si `sender` renvoie False ou lève, les logs RESTENT (rejoués au flush
|
||||
suivant — ZÉRO perte ; conformité AI Act Art. 12).
|
||||
|
||||
Le module est chargé par chemin (importlib) pour ne dépendre d'aucun import
|
||||
lourd du package client (cf. DETTE-011/013, comme test_agent_v1_logging.py).
|
||||
"""
|
||||
import importlib.util
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
_MOD_PATH = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "agent_v0" / "agent_v1" / "network" / "log_shipper.py"
|
||||
)
|
||||
|
||||
|
||||
def _load_module():
|
||||
spec = importlib.util.spec_from_file_location("lea_log_shipper", _MOD_PATH)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mod():
|
||||
return _load_module()
|
||||
|
||||
|
||||
def _make_record(name="lea.test", level=logging.INFO, msg="hello %s", args=("world",)):
|
||||
"""Construit un vrai LogRecord (pas un mock) pour tester le formatage."""
|
||||
return logging.LogRecord(
|
||||
name=name, level=level, pathname=__file__, lineno=1,
|
||||
msg=msg, args=args, exc_info=None,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. emit formate un LogRecord au schéma exact {ts, level, logger, message}
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_emit_formate_au_schema_exact(mod):
|
||||
shipper = mod.LogShipper(machine_id="poste-1", sender=lambda m, l: True)
|
||||
handler = shipper.handler
|
||||
|
||||
handler.emit(_make_record(name="lea.captor", level=logging.WARNING,
|
||||
msg="bonjour %s", args=("monde",)))
|
||||
|
||||
buffered = shipper.peek_buffer()
|
||||
assert len(buffered) == 1
|
||||
entry = buffered[0]
|
||||
# Schéma EXACT : pas de clé en plus, pas de clé en moins.
|
||||
assert set(entry.keys()) == {"ts", "level", "logger", "message"}
|
||||
assert entry["level"] == "WARNING"
|
||||
assert entry["logger"] == "lea.captor"
|
||||
assert entry["message"] == "bonjour monde" # args interpolés
|
||||
assert isinstance(entry["ts"], (int, float))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. log_safe / assainissement PII appliqué au message avant envoi
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_pii_assaini_avant_envoi(mod):
|
||||
# Sanitizer injecté déterministe : PII -> token (mime anonymize_text).
|
||||
def fake_sanitizer(text):
|
||||
return text.replace("ROSSIGNOL", "[NOM_1]")
|
||||
|
||||
shipper = mod.LogShipper(
|
||||
machine_id="poste-1", sender=lambda m, l: True,
|
||||
message_sanitizer=fake_sanitizer,
|
||||
)
|
||||
shipper.handler.emit(_make_record(msg="clic sur patient ROSSIGNOL", args=None))
|
||||
|
||||
entry = shipper.peek_buffer()[0]
|
||||
assert "ROSSIGNOL" not in entry["message"]
|
||||
assert "[NOM_1]" in entry["message"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. flush envoie un batch <= max et appelle sender(machine_id, logs)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_flush_envoie_batch_borne_et_appelle_sender(mod):
|
||||
calls = []
|
||||
|
||||
def sender(machine_id, logs):
|
||||
calls.append((machine_id, logs))
|
||||
return True
|
||||
|
||||
shipper = mod.LogShipper(machine_id="poste-42", sender=sender, max_batch=10)
|
||||
for i in range(5):
|
||||
shipper.handler.emit(_make_record(msg=f"event {i}", args=None))
|
||||
|
||||
sent = shipper.flush()
|
||||
|
||||
assert sent == 5
|
||||
assert len(calls) == 1
|
||||
machine_id, logs = calls[0]
|
||||
assert machine_id == "poste-42"
|
||||
assert len(logs) == 5
|
||||
assert logs[0]["message"] == "event 0"
|
||||
# Buffer vidé après succès
|
||||
assert shipper.peek_buffer() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. sender échoue (False / exception) -> logs CONSERVÉS, rejoués au flush suivant
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_sender_echec_false_conserve_les_logs(mod):
|
||||
state = {"fail": True, "received": None}
|
||||
|
||||
def flaky_sender(machine_id, logs):
|
||||
if state["fail"]:
|
||||
return False # échec récupérable
|
||||
state["received"] = list(logs)
|
||||
return True
|
||||
|
||||
shipper = mod.LogShipper(machine_id="p", sender=flaky_sender)
|
||||
for i in range(3):
|
||||
shipper.handler.emit(_make_record(msg=f"m{i}", args=None))
|
||||
|
||||
sent = shipper.flush() # échec
|
||||
assert sent == 0
|
||||
assert len(shipper.peek_buffer()) == 3 # ZÉRO perte
|
||||
|
||||
state["fail"] = False
|
||||
sent = shipper.flush() # rejeu
|
||||
assert sent == 3
|
||||
assert [e["message"] for e in state["received"]] == ["m0", "m1", "m2"]
|
||||
assert shipper.peek_buffer() == []
|
||||
|
||||
|
||||
def test_sender_exception_conserve_les_logs(mod):
|
||||
def exploding_sender(machine_id, logs):
|
||||
raise ConnectionError("serveur down")
|
||||
|
||||
shipper = mod.LogShipper(machine_id="p", sender=exploding_sender)
|
||||
shipper.handler.emit(_make_record(msg="important", args=None))
|
||||
|
||||
sent = shipper.flush() # ne doit PAS propager
|
||||
assert sent == 0
|
||||
assert len(shipper.peek_buffer()) == 1 # log conservé
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. buffer vide -> sender NON appelé
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_buffer_vide_sender_non_appele(mod):
|
||||
calls = []
|
||||
shipper = mod.LogShipper(
|
||||
machine_id="p", sender=lambda m, l: calls.append((m, l)) or True
|
||||
)
|
||||
|
||||
sent = shipper.flush()
|
||||
|
||||
assert sent == 0
|
||||
assert calls == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. > max_batch entrées -> découpage en plusieurs batches
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_decoupage_en_plusieurs_batches(mod):
|
||||
batches = []
|
||||
|
||||
def sender(machine_id, logs):
|
||||
batches.append(len(logs))
|
||||
return True
|
||||
|
||||
shipper = mod.LogShipper(machine_id="p", sender=sender, max_batch=3)
|
||||
for i in range(7):
|
||||
shipper.handler.emit(_make_record(msg=f"x{i}", args=None))
|
||||
|
||||
sent = shipper.flush()
|
||||
|
||||
assert sent == 7
|
||||
# 7 entrées, max_batch=3 -> 3 + 3 + 1
|
||||
assert batches == [3, 3, 1]
|
||||
# Chaque batch <= max_batch
|
||||
assert all(n <= 3 for n in batches)
|
||||
assert shipper.peek_buffer() == []
|
||||
|
||||
|
||||
def test_decoupage_echec_partiel_conserve_le_reste(mod):
|
||||
"""Si un batch intermédiaire échoue, on arrête et on garde le reste (0 perte)."""
|
||||
batches = []
|
||||
|
||||
def sender(machine_id, logs):
|
||||
batches.append([e["message"] for e in logs])
|
||||
# Le 2e batch échoue
|
||||
return len(batches) != 2
|
||||
|
||||
shipper = mod.LogShipper(machine_id="p", sender=sender, max_batch=2)
|
||||
for i in range(6):
|
||||
shipper.handler.emit(_make_record(msg=f"x{i}", args=None))
|
||||
|
||||
sent = shipper.flush()
|
||||
|
||||
# 1er batch (x0,x1) part ; 2e (x2,x3) échoue -> on arrête.
|
||||
assert sent == 2
|
||||
assert batches[0] == ["x0", "x1"]
|
||||
# x2..x5 restent dans le buffer dans l'ordre.
|
||||
restant = [e["message"] for e in shipper.peek_buffer()]
|
||||
assert restant == ["x2", "x3", "x4", "x5"]
|
||||
296
tests/unit/test_role_mapper.py
Normal file
296
tests/unit/test_role_mapper.py
Normal file
@@ -0,0 +1,296 @@
|
||||
"""Tests du role_mapper : reconstruction de champs ANCRÉS sur l'OCR.
|
||||
|
||||
Principe cardinal (cf. gate vert 30/06) : le VLM ne fournit QUE des ids de tokens OCR
|
||||
(value_ids) ; la valeur est reconstruite côté Python depuis l'OCR. Aucun texte produit
|
||||
par le VLM ne doit pouvoir entrer dans une valeur -> 0 hallucination par construction.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from core.extraction.role_mapper import (
|
||||
MappedField,
|
||||
OcrToken,
|
||||
assess_quality,
|
||||
build_role_prompt,
|
||||
map_roles,
|
||||
reconstruct_fields,
|
||||
tokens_from_grid,
|
||||
)
|
||||
|
||||
|
||||
def _tok(tid, text, conf=0.9, bbox=(0, 0, 10, 10)):
|
||||
return OcrToken(id=tid, text=text, confidence=conf, bbox=bbox)
|
||||
|
||||
|
||||
def test_reconstruit_value_concatene_tokens_dans_lordre():
|
||||
tokens = [_tok(0, "DUPONT"), _tok(1, "Jean")]
|
||||
fields = reconstruct_fields(tokens, [{"label": "Nom complet", "value_ids": [0, 1]}])
|
||||
assert len(fields) == 1
|
||||
assert fields[0].label == "Nom complet"
|
||||
assert fields[0].value == "DUPONT Jean"
|
||||
assert fields[0].anchored is True
|
||||
|
||||
|
||||
def test_ignore_les_ids_hors_plage_et_les_liste():
|
||||
tokens = [_tok(0, "DUPONT")]
|
||||
fields = reconstruct_fields(tokens, [{"label": "Nom", "value_ids": [0, 99]}])
|
||||
assert fields[0].value == "DUPONT"
|
||||
assert fields[0].invalid_ids == [99]
|
||||
assert fields[0].anchored is True
|
||||
|
||||
|
||||
def test_value_ids_vide_donne_champ_non_ancre():
|
||||
tokens = [_tok(0, "DUPONT")]
|
||||
fields = reconstruct_fields(tokens, [{"label": "Poids", "value_ids": []}])
|
||||
assert fields[0].value == ""
|
||||
assert fields[0].anchored is False
|
||||
|
||||
|
||||
def test_aucun_id_valide_donne_champ_non_ancre():
|
||||
tokens = [_tok(0, "DUPONT")]
|
||||
fields = reconstruct_fields(tokens, [{"label": "Poids", "value_ids": [7, 8]}])
|
||||
assert fields[0].anchored is False
|
||||
assert fields[0].value == ""
|
||||
assert fields[0].invalid_ids == [7, 8]
|
||||
|
||||
|
||||
def test_dedup_ids_en_preservant_lordre():
|
||||
tokens = [_tok(0, "DUPONT"), _tok(1, "Jean")]
|
||||
fields = reconstruct_fields(tokens, [{"label": "X", "value_ids": [1, 1, 0]}])
|
||||
assert fields[0].value == "Jean DUPONT"
|
||||
assert fields[0].value_ids == [1, 0]
|
||||
|
||||
|
||||
def test_confidence_est_le_min_des_tokens_ancres():
|
||||
tokens = [_tok(0, "A", conf=0.95), _tok(1, "B", conf=0.70)]
|
||||
fields = reconstruct_fields(tokens, [{"label": "X", "value_ids": [0, 1]}])
|
||||
assert fields[0].confidence == pytest.approx(0.70)
|
||||
|
||||
|
||||
def test_bbox_englobante_des_tokens_ancres():
|
||||
tokens = [_tok(0, "A", bbox=(0, 0, 10, 10)), _tok(1, "B", bbox=(20, 5, 40, 15))]
|
||||
fields = reconstruct_fields(tokens, [{"label": "X", "value_ids": [0, 1]}])
|
||||
assert fields[0].bbox == (0, 0, 40, 15)
|
||||
|
||||
|
||||
def test_invariant_aucun_texte_hors_ocr():
|
||||
# 'value' fournie par le VLM est ignorée : seul value_ids compte.
|
||||
tokens = [_tok(0, "DUPONT")]
|
||||
fields = reconstruct_fields(
|
||||
tokens, [{"label": "Nom", "value_ids": [0], "value": "HALLUCINATION"}]
|
||||
)
|
||||
assert fields[0].value == "DUPONT"
|
||||
|
||||
|
||||
def test_tokens_from_grid_indexe_et_normalise_bbox():
|
||||
# grille extract_grid_from_image : bbox = 4 points EasyOCR
|
||||
grid = [
|
||||
[
|
||||
{"text": "Nom", "bbox": [[0, 0], [10, 0], [10, 8], [0, 8]],
|
||||
"confidence": 0.9, "row": 0, "col": 0},
|
||||
{"text": "DUPONT", "bbox": [[20, 0], [60, 0], [60, 8], [20, 8]],
|
||||
"confidence": 0.95, "row": 0, "col": 1},
|
||||
],
|
||||
]
|
||||
tokens = tokens_from_grid(grid)
|
||||
assert [t.id for t in tokens] == [0, 1]
|
||||
assert tokens[0].text == "Nom"
|
||||
assert tokens[1].bbox == (20, 0, 60, 8)
|
||||
|
||||
|
||||
# --- map_roles : orchestrateur (client VLM injectable, donc testable hors-ligne) ---
|
||||
|
||||
def _fake_client(response, capture=None):
|
||||
"""Faux client VLM : enregistre éventuellement le prompt reçu, renvoie une réponse fixe."""
|
||||
def client(image_path, prompt):
|
||||
if capture is not None:
|
||||
capture["prompt"] = prompt
|
||||
capture["image_path"] = image_path
|
||||
return response
|
||||
return client
|
||||
|
||||
|
||||
def test_map_roles_reconstruit_via_client_injecte():
|
||||
tokens = [_tok(0, "DUPONT"), _tok(1, "Jean")]
|
||||
client = _fake_client('{"champs":[{"label":"Nom complet","value_ids":[0,1]}]}')
|
||||
fields = map_roles("img.png", tokens, client)
|
||||
assert len(fields) == 1
|
||||
assert fields[0].label == "Nom complet"
|
||||
assert fields[0].value == "DUPONT Jean"
|
||||
|
||||
|
||||
def test_map_roles_tolere_les_fences_json():
|
||||
tokens = [_tok(0, "DUPONT")]
|
||||
client = _fake_client('```json\n{"champs":[{"label":"Nom","value_ids":[0]}]}\n```')
|
||||
fields = map_roles("img.png", tokens, client)
|
||||
assert fields[0].value == "DUPONT"
|
||||
|
||||
|
||||
def test_map_roles_json_invalide_retourne_liste_vide():
|
||||
# robustesse batch : une réponse VLM non-JSON ne doit pas crasher.
|
||||
tokens = [_tok(0, "DUPONT")]
|
||||
client = _fake_client("désolé, je n'ai pas compris")
|
||||
fields = map_roles("img.png", tokens, client)
|
||||
assert fields == []
|
||||
|
||||
|
||||
def test_build_role_prompt_inclut_les_tokens_avec_ids():
|
||||
tokens = [_tok(0, "Poids"), _tok(1, "72")]
|
||||
prompt = build_role_prompt(tokens)
|
||||
assert "Poids" in prompt and "72" in prompt
|
||||
assert "value_ids" in prompt # on demande bien des ids, pas du texte recopié
|
||||
|
||||
|
||||
def test_build_role_prompt_guide_liste_les_roles_attendus():
|
||||
tokens = [_tok(0, "X")]
|
||||
prompt = build_role_prompt(tokens, roles=["Nom", "IPP", "Poids"])
|
||||
assert "Nom" in prompt and "IPP" in prompt and "Poids" in prompt
|
||||
|
||||
|
||||
def test_map_roles_passe_les_roles_au_prompt():
|
||||
tokens = [_tok(0, "X")]
|
||||
cap = {}
|
||||
client = _fake_client('{"champs":[]}', capture=cap)
|
||||
map_roles("img.png", tokens, client, roles=["Diagnostic", "GEMSA"])
|
||||
assert "Diagnostic" in cap["prompt"] and "GEMSA" in cap["prompt"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# assess_quality — évaluation de la qualité d'extraction d'un dossier
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _field(label, value="val", anchored=True, confidence=0.9, value_ids=None, invalid_ids=None):
|
||||
"""Helper : construit un MappedField directement (sans passer par OCR/VLM)."""
|
||||
return MappedField(
|
||||
label=label,
|
||||
value=value if anchored else "",
|
||||
value_ids=value_ids or ([0] if anchored else []),
|
||||
confidence=confidence,
|
||||
bbox=(0, 0, 10, 10) if anchored else None,
|
||||
anchored=anchored,
|
||||
invalid_ids=invalid_ids or [],
|
||||
)
|
||||
|
||||
|
||||
# --- failed ---
|
||||
|
||||
def test_assess_quality_failed_aucun_champ():
|
||||
"""Liste vide → failed."""
|
||||
assert assess_quality([]) == "failed"
|
||||
|
||||
|
||||
def test_assess_quality_failed_aucun_champ_ancre():
|
||||
"""Tous non ancrés → failed."""
|
||||
fields = [_field("Nom", anchored=False), _field("IPP", anchored=False)]
|
||||
assert assess_quality(fields) == "failed"
|
||||
|
||||
|
||||
def test_assess_quality_failed_un_champ_value_vide():
|
||||
"""Un seul champ, anchored=False, value vide → failed."""
|
||||
fields = [_field("Nom", anchored=False, value_ids=[])]
|
||||
assert assess_quality(fields) == "failed"
|
||||
|
||||
|
||||
# --- needs_review ---
|
||||
|
||||
def test_assess_quality_needs_review_role_requis_absent():
|
||||
"""Un rôle requis n'est pas dans fields → needs_review."""
|
||||
fields = [_field("Nom", anchored=True)]
|
||||
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "needs_review"
|
||||
|
||||
|
||||
def test_assess_quality_needs_review_role_requis_non_ancre():
|
||||
"""Rôle requis présent mais anchored=False → needs_review."""
|
||||
fields = [_field("Nom", anchored=True), _field("IPP", anchored=False)]
|
||||
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "needs_review"
|
||||
|
||||
|
||||
def test_assess_quality_needs_review_matching_insensible_casse():
|
||||
"""Matching label ↔ required_role insensible à la casse."""
|
||||
fields = [_field("nom complet", anchored=True), _field("ipp", anchored=True)]
|
||||
# required_roles en maj : doit quand même matcher
|
||||
assert assess_quality(fields, required_roles=["Nom Complet", "IPP"]) != "needs_review"
|
||||
|
||||
|
||||
def test_assess_quality_needs_review_matching_insensible_espaces():
|
||||
"""Matching insensible aux espaces en trop (strip)."""
|
||||
fields = [_field(" Nom ", anchored=True)]
|
||||
assert assess_quality(fields, required_roles=["Nom"]) != "needs_review"
|
||||
|
||||
|
||||
def test_assess_quality_needs_review_priorite_sur_partial():
|
||||
"""needs_review > partial : role manquant + confidence basse → needs_review."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.4), # basse
|
||||
# "IPP" absent → needs_review
|
||||
]
|
||||
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "needs_review"
|
||||
|
||||
|
||||
# --- partial ---
|
||||
|
||||
def test_assess_quality_partial_confidence_basse():
|
||||
"""Tous requis ancrés mais un champ ancré a confidence < min_confidence → partial."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.9),
|
||||
_field("IPP", anchored=True, confidence=0.4), # < 0.6
|
||||
]
|
||||
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "partial"
|
||||
|
||||
|
||||
def test_assess_quality_partial_champs_non_ancres_en_surplus():
|
||||
"""Tous requis ancrés, confidence ok, mais il y a des champs non ancrés en plus → partial."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.9),
|
||||
_field("Inconnu", anchored=False), # non ancré hors requis
|
||||
]
|
||||
assert assess_quality(fields, required_roles=["Nom"]) == "partial"
|
||||
|
||||
|
||||
def test_assess_quality_partial_sans_required_roles_confidence_basse():
|
||||
"""Sans required_roles, un champ ancré à confidence basse → partial."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.9),
|
||||
_field("IPP", anchored=True, confidence=0.3),
|
||||
]
|
||||
assert assess_quality(fields) == "partial"
|
||||
|
||||
|
||||
def test_assess_quality_partial_sans_required_roles_champ_non_ancre():
|
||||
"""Sans required_roles, au moins un champ non ancré → partial."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.9),
|
||||
_field("IPP", anchored=False),
|
||||
]
|
||||
assert assess_quality(fields) == "partial"
|
||||
|
||||
|
||||
# --- complete ---
|
||||
|
||||
def test_assess_quality_complete_tous_requis_ancres_confidence_ok():
|
||||
"""Tous requis ancrés, toutes confidences >= 0.6, aucun non ancré → complete."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.9),
|
||||
_field("IPP", anchored=True, confidence=0.7),
|
||||
]
|
||||
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "complete"
|
||||
|
||||
|
||||
def test_assess_quality_complete_sans_required_roles():
|
||||
"""Sans required_roles, au moins un champ ancré, tous >= min_confidence, aucun non ancré → complete."""
|
||||
fields = [
|
||||
_field("Nom", anchored=True, confidence=0.8),
|
||||
_field("IPP", anchored=True, confidence=0.95),
|
||||
]
|
||||
assert assess_quality(fields) == "complete"
|
||||
|
||||
|
||||
def test_assess_quality_complete_seuil_exactement_min_confidence():
|
||||
"""Confidence exactement égale à min_confidence (0.6) → complete (borne incluse)."""
|
||||
fields = [_field("Nom", anchored=True, confidence=0.6)]
|
||||
assert assess_quality(fields, required_roles=["Nom"]) == "complete"
|
||||
|
||||
|
||||
def test_assess_quality_complete_min_confidence_personnalise():
|
||||
"""Seuil personnalisé : confidence=0.7 >= min_confidence=0.7 → complete."""
|
||||
fields = [_field("Nom", anchored=True, confidence=0.7)]
|
||||
assert assess_quality(fields, min_confidence=0.7) == "complete"
|
||||
163
tests/unit/test_sanitize_log_entries.py
Normal file
163
tests/unit/test_sanitize_log_entries.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""Tests TDD de sanitize_log_entries — assainissement PII des logs Léa reçus côté serveur.
|
||||
|
||||
Branche feat/push-log-dgx. N'importe QUE pii_sanitizer (pas api_stream, DETTE-013).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
_ROOT = str(Path(__file__).resolve().parents[2])
|
||||
if _ROOT not in sys.path:
|
||||
sys.path.insert(0, _ROOT)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. message avec PII → brut absent, tokens présents
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_message_pii_tokenise():
|
||||
"""Un nom clinique + numéro long disparaissent ; des tokens [...] les remplacent.
|
||||
|
||||
Couche 1 (regex, sans NER) : détecte le format « Prénom NOM » (RE_PRENOM_NOM)
|
||||
et l'IPP structuré (RE_IPP). Le format inverse « NOM Prénom » relève de la
|
||||
couche 2 NER — hors scope ici.
|
||||
"""
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
entries = [
|
||||
{
|
||||
"ts": "2026-06-30T10:00:00Z",
|
||||
"level": "INFO",
|
||||
"logger": "lea.replay",
|
||||
"message": "Ouverture dossier Catherine MOREL IPP: 295841",
|
||||
}
|
||||
]
|
||||
result = sanitize_log_entries(entries)
|
||||
|
||||
assert len(result) == 1
|
||||
msg = result[0]["message"]
|
||||
assert "MOREL" not in msg, f"NOM toujours présent : {msg!r}"
|
||||
assert "Catherine" not in msg, f"Prénom toujours présent : {msg!r}"
|
||||
assert "295841" not in msg, f"IPP toujours présent : {msg!r}"
|
||||
assert "[" in msg, f"Aucun token dans : {msg!r}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. ts / level préservés à l'identique
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ts_level_preserves():
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
entries = [
|
||||
{"ts": "2026-06-30T10:00:00Z", "level": "WARNING",
|
||||
"logger": "lea.core", "message": "simple message sans pii"}
|
||||
]
|
||||
result = sanitize_log_entries(entries)
|
||||
|
||||
assert result[0]["ts"] == "2026-06-30T10:00:00Z"
|
||||
assert result[0]["level"] == "WARNING"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. liste vide → liste vide
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_liste_vide():
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
assert sanitize_log_entries([]) == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. entrée sans clé `message` → pas de crash, entrée conservée
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_entree_sans_message():
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
entries = [{"ts": "2026-06-30T10:00:01Z", "level": "DEBUG", "logger": "lea.init"}]
|
||||
result = sanitize_log_entries(entries)
|
||||
|
||||
assert len(result) == 1
|
||||
assert "message" not in result[0] # champ absent → reste absent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. cohérence : même PII dans 2 entrées → même token (mapping partagé)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_coherence_mapping_partage():
|
||||
"""La même PII dans deux messages du batch reçoit le même token."""
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
entries = [
|
||||
{"ts": "T1", "level": "INFO", "logger": "l", "message": "IPP: 295841 reçu"},
|
||||
{"ts": "T2", "level": "INFO", "logger": "l", "message": "Relance IPP: 295841"},
|
||||
]
|
||||
result = sanitize_log_entries(entries)
|
||||
|
||||
msg1 = result[0]["message"]
|
||||
msg2 = result[1]["message"]
|
||||
|
||||
# le brut est absent des deux
|
||||
assert "295841" not in msg1
|
||||
assert "295841" not in msg2
|
||||
|
||||
# le token est identique (mapping partagé)
|
||||
import re
|
||||
tokens1 = re.findall(r"\[IPP_\d+\]", msg1)
|
||||
tokens2 = re.findall(r"\[IPP_\d+\]", msg2)
|
||||
assert tokens1, f"Pas de token IPP dans msg1 : {msg1!r}"
|
||||
assert tokens2, f"Pas de token IPP dans msg2 : {msg2!r}"
|
||||
assert tokens1[0] == tokens2[0], (
|
||||
f"Tokens différents pour la même PII : {tokens1[0]} vs {tokens2[0]}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. `message` non-str → skip proprement, pas de crash
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_message_non_str():
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
entries = [
|
||||
{"ts": "T1", "level": "INFO", "logger": "l", "message": None},
|
||||
{"ts": "T2", "level": "INFO", "logger": "l", "message": 42},
|
||||
{"ts": "T3", "level": "INFO", "logger": "l", "message": ["liste"]},
|
||||
]
|
||||
result = sanitize_log_entries(entries)
|
||||
|
||||
assert len(result) == 3
|
||||
# les valeurs non-str sont préservées telles quelles
|
||||
assert result[0]["message"] is None
|
||||
assert result[1]["message"] == 42
|
||||
assert result[2]["message"] == ["liste"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 7. champ `logger` str est aussi assaini si porteur de PII
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_logger_pii_tokenise():
|
||||
"""Si le champ logger contient de la PII (ex. chemin patient), il est assaini."""
|
||||
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
|
||||
|
||||
entries = [
|
||||
{
|
||||
"ts": "T1",
|
||||
"level": "INFO",
|
||||
"logger": "lea.patient.MOREL_Catherine",
|
||||
"message": "step start",
|
||||
}
|
||||
]
|
||||
result = sanitize_log_entries(entries)
|
||||
logger_out = result[0]["logger"]
|
||||
# Le NOM doit être tokenisé (RE_PRENOM_NOM captera « Catherine MOREL » …
|
||||
# mais « MOREL_Catherine » n'est pas le format clinique standard — le test
|
||||
# vérifie surtout qu'il n'y a pas de crash et que le champ est traité.)
|
||||
# On ne fixe pas d'assertion sur la valeur : juste pas de crash.
|
||||
assert isinstance(logger_out, str)
|
||||
Reference in New Issue
Block a user