5 Commits

Author SHA1 Message Date
Dom
3ed9798f06 feat(agent_v1): log shipper — remontee auto des logs vers le serveur (gated OFF)
Some checks failed
tests / Lint (ruff + black) (push) Failing after 1m43s
tests / Tests unitaires (sans GPU) (push) Failing after 1m51s
tests / Tests sécurité (critique) (push) Has been skipped
LogShipperHandler + LogShipper : buffer borne, flush par batch <= max, resilience
0-perte (rejeu sur echec), sender injectable. Flag RPA_LOG_SHIP_ENABLED (defaut
off, activable par config.txt sans rebuild). Sanitizer client = identite (rempart
PII = serveur, cf commit precedent). Wiring gated dans main.py. 8 tests TDD.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 13:30:08 +02:00
Dom
b65710ae43 feat(server): assainissement PII des logs clients à la réception
sanitize_log_entries (réutilise anonymize_text, mapping partagé = tokens cohérents),
branché dans POST /api/v1/agents/logs avant le store : message + logger tokenisés,
ts/level préservés. 7 tests TDD. Rempart PII central du push-log (couvre les postes).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 13:30:08 +02:00
Dom
509a026cfc feat(extraction): assess_quality — statut qualité dossier (4 niveaux)
complete / partial / needs_review / failed (priorité décroissante), matching
rôle requis insensible casse+espaces, seuil min_confidence paramétrable (0.6).
16 tests ajoutés (31 au total, verts). Brique TDD via sous-agent, code révisé.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 11:42:14 +02:00
Dom
a62b720144 feat(extraction): map_roles — orchestrateur VLM ancrage strict (client injectable)
build_role_prompt (modes libre / guidé par rôles), parse_vlm_json (robuste :
tolère les fences, {} si invalide), map_roles (prompt -> VLM -> parse -> reconstruct).
Client VLM injecté => testable hors-ligne. 6 tests unit ajoutés (15 au total).
Non branché au runtime (brique validée isolément).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 11:34:43 +02:00
Dom
14b1bf844a feat(extraction): role_mapper — reconstruction de champs ancrée OCR (0 hallucination)
Le VLM ne fournit que des value_ids ; la value est reconstruite côté Python
depuis l'OCR (le texte VLM est ignoré) -> 0 hallucination par construction.
9 tests unitaires : ancrage, ids hors plage, dédup ordonnée, value_ids vide,
confidence min, bbox englobante, anti-injection. Module pur, non branché runtime.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:38:11 +02:00
9 changed files with 1316 additions and 3 deletions

View File

@@ -82,6 +82,17 @@ BLUR_SENSITIVE = os.environ.get("RPA_BLUR_SENSITIVE", "true").lower() in ("true"
# Configurable via variable d'environnement pour permettre l'ajustement
LOG_RETENTION_DAYS = int(os.environ.get("RPA_LOG_RETENTION_DAYS", "180"))
# Remontée automatique des logs vers le serveur (push-log-DGX).
# Diagnostic des postes clinique SANS AnyDesk : les logs (déjà écrits sur disque)
# sont poussés au serveur, rangés par machine_id, consultables au dashboard.
# Défaut PRUDENT = désactivé : on l'active poste par poste via config.txt /
# variable d'environnement, sans rebuild de l'installateur.
LOG_SHIP_ENABLED = os.environ.get("RPA_LOG_SHIP_ENABLED", "false").lower() in (
"true", "1", "yes",
)
# Intervalle de flush du buffer de logs (secondes).
LOG_SHIP_INTERVAL_S = float(os.environ.get("RPA_LOG_SHIP_INTERVAL_S", "30"))
# Monitoring
PERF_MONITOR_INTERVAL_S = 30
LOGS_DIR = BASE_DIR / "logs"

View File

@@ -17,7 +17,7 @@ import threading
from .config import (
SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID, LOG_RETENTION_DAYS, LOG_FILE,
SCREEN_RESOLUTION, DPI_SCALE, OS_THEME, API_TOKEN, MAX_SESSION_DURATION_S,
STREAMING_ENDPOINT,
STREAMING_ENDPOINT, LOG_SHIP_ENABLED, LOG_SHIP_INTERVAL_S,
)
from .core.captor import EventCaptorV1
from .core.executor import ActionExecutorV1
@@ -62,6 +62,26 @@ except Exception:
for _noisy in ("urllib3", "requests.packages.urllib3", "PIL", "mss"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
# push-log-DGX : remontée automatique des logs vers le serveur (diagnostic des
# postes SANS AnyDesk). GARDÉ derrière RPA_LOG_SHIP_ENABLED (défaut désactivé) —
# activable poste par poste via config.txt, sans rebuild. Le handler est attaché
# au logger racine APRÈS setup_logging (les logs partent aussi dans le fichier).
_log_shipper = None
if LOG_SHIP_ENABLED:
try:
from .network.log_shipper import LogShipper
_log_shipper = LogShipper(
machine_id=MACHINE_ID,
max_batch=int(os.environ.get("RPA_AGENT_LOGS_MAX_BATCH", "1000")),
flush_interval_s=LOG_SHIP_INTERVAL_S,
)
logging.getLogger().addHandler(_log_shipper.handler)
_log_shipper.start()
except Exception as _e:
# Ne JAMAIS empêcher Léa de démarrer pour un problème de remontée de logs.
logging.getLogger(__name__).warning("Log shipper non démarré : %s", _e)
_log_shipper = None
logger = logging.getLogger(__name__)
# Intervalle de polling replay (secondes)

View File

@@ -0,0 +1,317 @@
# agent_v1/network/log_shipper.py
"""Remontée AUTOMATIQUE des logs du client Léa vers le serveur (push-log-DGX).
But : diagnostiquer les postes Windows clinique SANS AnyDesk. Les logs déjà
écrits sur disque par `logging_setup.py` (rotation quotidienne, rétention 180 j,
Règlement IA Art. 12) sont en plus poussés au serveur, rangés par `machine_id`,
consultables au dashboard.
Serveur (déjà prêt — NE PAS toucher) :
POST /api/v1/agents/logs
body = {machine_id: str, logs: [{ts, level, logger, message}]}
borne RPA_AGENT_LOGS_MAX_BATCH (défaut 1000) — 413 si dépassée.
Conception :
- `LogShipperHandler(logging.Handler)` : sur `emit(record)`, formate au
schéma EXACT `{ts, level, logger, message}`, applique un assainissement
PII au message (défense en profondeur — la discipline `log_safe` à la
source logue déjà des hashes/longueurs, pas du contenu brut), puis
empile dans un buffer borné.
- `LogShipper` : flush par BATCH (≤ max_batch) via un `sender` callable
INJECTABLE `(machine_id, logs) -> bool`. Défaut = POST réel Bearer
(pattern `streamer.py`).
- Résilience (ZÉRO perte) : si `sender` renvoie False ou lève, les logs
RESTENT dans le buffer et sont rejoués au flush suivant. Le fichier de
log local reste de toute façon la source durable (survit au crash) ; le
buffer RAM est un best-effort de remontée, volontairement NON persisté en
SQLite (le `PersistentBuffer` est session/event-scoped — y mêler des logs
polluerait la DB d'events). Borne mémoire = `max_buffer` (drop des plus
VIEUX au-delà — un log récent vaut mieux qu'un vieux pour le diagnostic).
Pattern d'import PII : on tente `anonymize_text` (server_v1.pii_sanitizer,
source de vérité des tokens typés) via le même import paresseux tolérant que
`ui/messages.py`. Sur un vrai poste (sans server_v1), on retombe sur l'identité :
acceptable car la PII de message est déjà neutralisée à la source par la
discipline `log_safe`. Le sanitizer reste INJECTABLE pour les tests/évolutions.
Branche feat/push-log-dgx.
"""
from __future__ import annotations
import logging
import threading
import time
from collections import deque
from typing import Callable, Deque, Dict, List, Optional
logger = logging.getLogger(__name__)
# Schéma d'une entrée de log poussée au serveur.
# ts : epoch (float) — l'heure de l'évènement
# level : nom du niveau ("INFO", "WARNING"...)
# logger : nom du logger (record.name)
# message : message formaté (args interpolés) ET assaini PII
# Défaut aligné sur la borne serveur RPA_AGENT_LOGS_MAX_BATCH (api_stream.py).
DEFAULT_MAX_BATCH = 1000
# Borne mémoire du buffer : au-delà, on droppe les plus VIEUX (diagnostic =
# on préfère les logs récents). Quelques milliers d'entrées = quelques Mo RAM.
DEFAULT_MAX_BUFFER = 5000
# ---------------------------------------------------------------------------
# Assainissement PII du message (défense en profondeur)
# ---------------------------------------------------------------------------
def _default_message_sanitizer(text: str) -> str:
"""Sanitizer par défaut côté client = identité.
Le **rempart PII des logs est le SERVEUR** : `sanitize_log_entries`
ré-assainit chaque message à la réception (`/api/v1/agents/logs`), via le
même `anonymize_text` que les events. Tenter un import de `server_v1` côté
poste à CHAQUE ligne de log est inutile (absent du bundle client) et coûteux
(exception attrapée par emit). La discipline `log_safe` neutralise déjà la
PII à la source. Reste INJECTABLE pour tests/évolutions.
"""
return text
# ---------------------------------------------------------------------------
# Handler — empile les LogRecords dans un buffer partagé
# ---------------------------------------------------------------------------
class LogShipperHandler(logging.Handler):
"""Handler logging qui sérialise chaque record et l'empile pour envoi.
Ne fait AUCUN réseau : il alimente seulement le buffer du `LogShipper`.
L'envoi est piloté par `LogShipper.flush()` (thread dédié périodique).
"""
def __init__(
self,
buffer: Deque[Dict],
lock: threading.Lock,
message_sanitizer: Callable[[str], str],
max_buffer: int = DEFAULT_MAX_BUFFER,
level=logging.NOTSET,
):
super().__init__(level=level)
self._buffer = buffer
self._lock = lock
self._sanitize = message_sanitizer
self._max_buffer = max_buffer
def _format_record(self, record: logging.LogRecord) -> Dict:
"""Construit l'entrée au schéma EXACT {ts, level, logger, message}.
`record.getMessage()` interpole les args (%s...). Le message est ensuite
passé au sanitizer PII. Tolérant : un message non formatable ne doit pas
faire perdre l'entrée.
"""
try:
message = record.getMessage()
except Exception:
message = str(record.msg)
try:
message = self._sanitize(message)
except Exception:
# Le sanitizer ne doit jamais casser le logging.
pass
return {
"ts": record.created,
"level": record.levelname,
"logger": record.name,
"message": message,
}
def emit(self, record: logging.LogRecord) -> None:
"""Sérialise et empile le record (best-effort, ne lève jamais)."""
try:
entry = self._format_record(record)
with self._lock:
# deque(maxlen) droppe automatiquement le plus VIEUX au-delà
# de la borne — pas de croissance mémoire non bornée.
self._buffer.append(entry)
except Exception:
# handleError respecte logging.raiseExceptions (silencieux en prod).
self.handleError(record)
# ---------------------------------------------------------------------------
# Shipper — flush périodique par batch via un sender injectable
# ---------------------------------------------------------------------------
class LogShipper:
"""Orchestre la remontée des logs : buffer + flush par batch.
Args:
machine_id : identifiant du poste (config.MACHINE_ID en prod).
sender : callable INJECTABLE `(machine_id, logs) -> bool`. True =
accusé de réception serveur. Défaut = POST réel Bearer.
max_batch : taille max d'un batch (≤ borne serveur). Défaut 1000.
max_buffer : borne mémoire du buffer (drop des plus vieux au-delà).
message_sanitizer : assainissement PII du message. Défaut = pii_sanitizer
si disponible, sinon identité.
"""
def __init__(
self,
machine_id: str,
sender: Optional[Callable[[str, List[Dict]], bool]] = None,
max_batch: int = DEFAULT_MAX_BATCH,
max_buffer: int = DEFAULT_MAX_BUFFER,
message_sanitizer: Optional[Callable[[str], str]] = None,
flush_interval_s: float = 30.0,
):
self.machine_id = machine_id
self.max_batch = max(1, int(max_batch))
self.flush_interval_s = flush_interval_s
self._sender = sender if sender is not None else self._default_sender
self._sanitize = message_sanitizer or _default_message_sanitizer
self._lock = threading.Lock()
self._buffer: Deque[Dict] = deque(maxlen=max_buffer)
self.handler = LogShipperHandler(
buffer=self._buffer,
lock=self._lock,
message_sanitizer=self._sanitize,
max_buffer=max_buffer,
)
self._running = False
self._thread: Optional[threading.Thread] = None
# ------------------------------------------------------------------
# Introspection (diagnostic / tests)
# ------------------------------------------------------------------
def peek_buffer(self) -> List[Dict]:
"""Copie des entrées en attente (lecture seule, pour diagnostic/tests)."""
with self._lock:
return list(self._buffer)
def pending(self) -> int:
with self._lock:
return len(self._buffer)
# ------------------------------------------------------------------
# Flush — envoie le buffer par batches ≤ max_batch
# ------------------------------------------------------------------
def flush(self) -> int:
"""Envoie le buffer par batches successifs. Retourne le nb de logs ACK.
Résilience ZÉRO perte : on retire un batch du buffer, on tente l'envoi.
- Succès → les entrées sont définitivement consommées.
- Échec (False ou exception) → on REMET les entrées en tête du buffer
et on ARRÊTE la passe (serveur probablement down) ; rejeu au flush
suivant. Les entrées non encore extraites restent en place.
"""
sent = 0
while True:
with self._lock:
if not self._buffer:
break
batch: List[Dict] = []
for _ in range(min(self.max_batch, len(self._buffer))):
batch.append(self._buffer.popleft())
try:
ok = self._sender(self.machine_id, batch)
except Exception as e:
ok = False
logger.debug("Log shipper sender a levé : %s", e)
if ok:
sent += len(batch)
continue
# Échec : on remet le batch en tête (ordre préservé) et on arrête.
with self._lock:
self._buffer.extendleft(reversed(batch))
break
return sent
# ------------------------------------------------------------------
# Sender réel — POST Bearer (pattern streamer.py)
# ------------------------------------------------------------------
@staticmethod
def _auth_headers() -> dict:
"""Headers Bearer (pattern streamer.py)."""
try:
from ..config import API_TOKEN
except Exception:
API_TOKEN = ""
if API_TOKEN:
return {"Authorization": f"Bearer {API_TOKEN}"}
return {}
def _default_sender(self, machine_id: str, logs: List[Dict]) -> bool:
"""POST réel vers /api/v1/agents/logs. True si HTTP 2xx.
Best-effort : tout échec réseau/serveur → False (logs conservés,
rejoués). Aucune exception ne remonte au-delà du sender.
"""
try:
import requests
from ..config import SERVER_URL
url = f"{SERVER_URL}/agents/logs"
resp = requests.post(
url,
json={"machine_id": machine_id, "logs": logs},
headers=self._auth_headers(),
timeout=5,
allow_redirects=False,
)
return bool(resp.ok)
except Exception as e:
logger.debug("Log shipper POST échoué : %s", e)
return False
# ------------------------------------------------------------------
# Boucle de flush périodique (thread daemon)
# ------------------------------------------------------------------
def start(self) -> None:
"""Démarre le thread de flush périodique (idempotent)."""
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._flush_loop, daemon=True, name="lea-log-shipper"
)
self._thread.start()
logger.info(
"Log shipper démarré (machine_id=%s, intervalle=%.0fs, batch≤%d)",
self.machine_id, self.flush_interval_s, self.max_batch,
)
def stop(self, final_flush: bool = True) -> None:
"""Arrête la boucle et tente un dernier flush (best-effort)."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
if final_flush:
try:
self.flush()
except Exception:
pass
def _flush_loop(self) -> None:
while self._running:
# Découpe l'attente pour réagir vite à stop().
waited = 0.0
step = 0.5
while self._running and waited < self.flush_interval_s:
time.sleep(step)
waited += step
if not self._running:
break
try:
self.flush()
except Exception as e:
logger.debug("Log shipper flush loop : %s", e)

View File

@@ -27,7 +27,7 @@ from fastapi import BackgroundTasks, Depends, FastAPI, File, HTTPException, Requ
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from .pii_sanitizer import sanitize_event
from .pii_sanitizer import sanitize_event, sanitize_log_entries
from .replay_failure_logger import log_replay_failure
from .replay_verifier import ReplayVerifier, VerificationResult
from .replay_learner import ReplayLearner
@@ -7263,7 +7263,10 @@ async def agents_logs(request: AgentLogsRequest):
# Bloque les postes révoqués/désinstallés + met à jour last_seen_at.
_guard_agent_registry_access(machine_id, endpoint="agents/logs")
received = agent_logs_store.append(machine_id, request.logs)
# Assainissement PII côté serveur avant persistance (couche 1 regex, sans NER).
# Un mapping partagé sur le batch garantit la cohérence des tokens ([NOM_1]…).
safe_logs = sanitize_log_entries(request.logs)
received = agent_logs_store.append(machine_id, safe_logs)
return {"status": "ok", "received": received, "machine_id": machine_id}

View File

@@ -203,6 +203,40 @@ def sanitize_event(event: Dict, *, mapping: Optional[Dict] = None) -> Dict:
return ev
def sanitize_log_entries(
entries: List[Dict], *, mapping: Optional[Dict] = None
) -> List[Dict]:
"""Assainit un batch de log-entries reçues d'un client Léa avant persistance.
Pour chaque entrée, renvoie une **copie** où les champs texte porteurs de PII
sont passés par `anonymize_text` :
- `message` (str) : assaini par `anonymize_text`.
- `logger` (str) : assaini de la même façon (peut porter un chemin patient).
- `ts` et `level` : préservés à l'identique, jamais touchés.
Un `mapping` partagé est utilisé pour **toutes** les entrées du batch afin de
garantir la cohérence des tokens (même PII → même token). Si `mapping` est
None, un mapping local est créé et partagé entre toutes les entrées du batch.
Tolère les valeurs absentes, None ou non-str sans lever d'exception.
N'utilise que `anonymize_text` — aucune regex supplémentaire.
"""
if not entries:
return []
if mapping is None:
mapping = {}
result: List[Dict] = []
for entry in entries:
item = copy.copy(entry) # copie superficielle suffit (valeurs scalaires)
for field in ("message", "logger"):
v = item.get(field)
if isinstance(v, str):
item[field] = anonymize_text(v, mapping=mapping)[0]
result.append(item)
return result
# Clés d'un workflow core portant du texte potentiellement PII : cible OCR
# (`by_text`), noms d'écrans/labels dérivés des titres. Le contenu saisi est
# déjà neutralisé à la source (sanitize_event → [SAISIE]).

View File

@@ -0,0 +1,249 @@
"""role_mapper — reconstruction de champs ANCRÉS sur l'OCR.
Principe cardinal (gate validé le 30/06 sur DPI urgences réel) :
le VLM ne fournit QUE des ids de tokens OCR (`value_ids`) ; la valeur est
reconstruite ici depuis l'OCR. Aucun texte produit par le VLM ne peut entrer
dans une valeur → **0 hallucination par construction**.
Ce module est volontairement PUR (pas d'appel réseau/VLM) : il prend les tokens
OCR (issus de `core.llm.ocr_extractor.extract_grid_from_image`) et la réponse
déjà désérialisée du VLM, et produit des champs ancrés. L'appel VLM lui-même
est orchestré ailleurs (et mockable), pour rester testable hors-ligne.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Callable, List, Optional, Sequence, Tuple
BBox = Tuple[int, int, int, int] # (x_min, y_min, x_max, y_max)
@dataclass
class OcrToken:
"""Un token OCR indexé par un id stable."""
id: int
text: str
confidence: float = 1.0
bbox: Optional[BBox] = None
@dataclass
class MappedField:
"""Un champ {rôle → valeur} dont la valeur est 100% issue de l'OCR."""
label: str
value: str
value_ids: List[int]
confidence: float
bbox: Optional[BBox]
anchored: bool
invalid_ids: List[int]
def _norm_bbox(bbox) -> Optional[BBox]:
"""Normalise une bbox en (x_min, y_min, x_max, y_max).
Accepte soit 4 points EasyOCR `[[x,y], ...]`, soit un quadruplet déjà plat.
"""
if bbox is None:
return None
if len(bbox) == 4 and all(isinstance(v, (int, float)) for v in bbox):
return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
def tokens_from_grid(grid: Sequence[Sequence[dict]]) -> List[OcrToken]:
"""Convertit une grille `extract_grid_from_image` en tokens indexés (id séquentiel).
L'ordre des ids suit l'ordre de lecture de la grille (lignes top→bottom,
colonnes left→right), ce qui donne au VLM un référentiel stable.
"""
tokens: List[OcrToken] = []
tid = 0
for row in grid:
for cell in row:
tokens.append(OcrToken(
id=tid,
text=cell["text"],
confidence=float(cell.get("confidence", 1.0)),
bbox=_norm_bbox(cell.get("bbox")),
))
tid += 1
return tokens
def _enclosing_bbox(bboxes: Sequence[Optional[BBox]]) -> Optional[BBox]:
present = [b for b in bboxes if b is not None]
if not present:
return None
return (
min(b[0] for b in present),
min(b[1] for b in present),
max(b[2] for b in present),
max(b[3] for b in present),
)
def reconstruct_fields(
tokens: Sequence[OcrToken],
vlm_fields: Sequence[dict],
) -> List[MappedField]:
"""Reconstruit les champs à partir des tokens OCR et des `value_ids` du VLM.
Pour chaque champ VLM `{label, value_ids:[...]}` :
- déduplique les ids en préservant l'ordre de lecture donné par le VLM ;
- filtre les ids hors OCR (listés dans `invalid_ids`) ;
- reconstruit la valeur par concaténation des `text` des tokens valides ;
- confidence = min des tokens ancrés (le plus prudent), bbox = englobante.
Tout champ `value`/texte fourni par le VLM est IGNORÉ : seule la liste
d'ids fait foi (anti-hallucination).
"""
by_id = {t.id: t for t in tokens}
out: List[MappedField] = []
for vf in vlm_fields:
label = vf.get("label", "")
seen: List[int] = []
for i in (vf.get("value_ids") or []):
if i not in seen:
seen.append(i)
valid = [i for i in seen if i in by_id]
invalid = [i for i in seen if i not in by_id]
toks = [by_id[i] for i in valid]
out.append(MappedField(
label=label,
value=" ".join(t.text for t in toks),
value_ids=valid,
confidence=min((t.confidence for t in toks), default=0.0),
bbox=_enclosing_bbox([t.bbox for t in toks]),
anchored=bool(valid),
invalid_ids=invalid,
))
return out
# --- Orchestration VLM (client injectable pour rester testable hors-ligne) ---
# Un client VLM est un callable (image_path, prompt) -> texte de réponse.
VlmClient = Callable[[str, str], str]
def build_role_prompt(
tokens: Sequence[OcrToken],
roles: Optional[Sequence[str]] = None,
) -> str:
"""Construit le prompt d'attribution de rôles (ancrage strict par ids).
Mode *guidé* si `roles` est fourni (rôles attendus de l'écran), sinon *libre*
(le VLM nomme lui-même les champs). Dans les deux cas le VLM ne renvoie que
des `value_ids` — jamais de texte recopié.
"""
ocr_list = [{"id": t.id, "text": t.text} for t in tokens]
if roles:
roles_line = (
"Rôles attendus sur cet écran (associe chacun s'il est présent) : "
+ ", ".join(roles) + ".\n"
)
else:
roles_line = (
"Identifie librement les champs présents — le 'label' est le rôle du champ.\n"
)
return (
"Tu reçois une capture d'écran d'un dossier patient et la liste des tokens "
"détectés par OCR (chaque token : id, text).\n"
+ roles_line +
"Pour chaque champ, désigne les tokens OCR qui composent sa VALEUR.\n"
"RÈGLES STRICTES :\n"
"- Tu ne recopies AUCUN texte. Tu renvoies seulement 'value_ids' : la liste "
"des id de tokens OCR (dans l'ordre de lecture) qui forment la valeur.\n"
"- 'label' = le rôle du champ. N'invente aucun champ.\n"
"- Réponds UNIQUEMENT en JSON PLAT :\n"
'{"ecran":"<type en 3 mots>","champs":[{"label":"...","value_ids":[<int>,...]}]}\n\n'
"Tokens OCR :\n" + json.dumps(ocr_list, ensure_ascii=False)
)
def parse_vlm_json(text: str) -> dict:
"""Extrait le 1er objet JSON d'une réponse VLM (tolère les fences ```json).
Robuste : renvoie `{}` si la réponse n'est pas du JSON exploitable (pas de
crash en batch).
"""
if not text:
return {}
s = text.strip()
if "```" in s:
parts = s.split("```")
if len(parts) >= 2:
s = parts[1]
if s.lstrip().lower().startswith("json"):
s = s.lstrip()[4:]
a, b = s.find("{"), s.rfind("}")
if a < 0 or b <= a:
return {}
try:
return json.loads(s[a:b + 1])
except (ValueError, TypeError):
return {}
def _norm_label(label: str) -> str:
"""Normalise un label pour comparaison : minuscules + strip espaces."""
return label.strip().lower()
def assess_quality(
fields: Sequence[MappedField],
required_roles: Optional[Sequence[str]] = None,
min_confidence: float = 0.6,
) -> str:
"""Évalue la qualité d'extraction d'un dossier à partir des champs reconstruits.
Renvoie l'un des 4 statuts (par priorité décroissante) :
- "failed" : aucun champ, OU aucun champ ancré.
- "needs_review" : au moins un rôle requis absent ou non ancré.
- "partial" : rôles requis ok mais confidence insuffisante OU champs non ancrés.
- "complete" : tout ancré, toutes confidences >= min_confidence, aucun non ancré.
Le matching required_role ↔ field.label est insensible à la casse et aux espaces.
"""
# --- failed : aucun champ du tout, ou aucun ancré ---
anchored = [f for f in fields if f.anchored]
if not fields or not anchored:
return "failed"
# --- needs_review : rôle requis absent ou non ancré ---
if required_roles:
anchored_labels = {_norm_label(f.label) for f in anchored}
for role in required_roles:
if _norm_label(role) not in anchored_labels:
return "needs_review"
# --- partial : confidence basse sur un champ ancré OU champs non ancrés ---
has_low_confidence = any(f.confidence < min_confidence for f in anchored)
has_unanchored = any(not f.anchored for f in fields)
if has_low_confidence or has_unanchored:
return "partial"
# --- complete ---
return "complete"
def map_roles(
image_path: str,
tokens: Sequence[OcrToken],
vlm_client: VlmClient,
roles: Optional[Sequence[str]] = None,
) -> List[MappedField]:
"""Orchestre l'attribution de rôles : prompt → VLM → parse → reconstruction ancrée.
`vlm_client` est injecté (testable hors-ligne). Le résultat est toujours
ancré sur l'OCR via `reconstruct_fields`.
"""
prompt = build_role_prompt(tokens, roles)
raw = vlm_client(image_path, prompt)
data = parse_vlm_json(raw)
vlm_fields = data.get("champs", []) if isinstance(data, dict) else []
return reconstruct_fields(tokens, vlm_fields)

View File

@@ -0,0 +1,220 @@
"""TDD — push-log-DGX : log shipper client Léa (remontée auto des logs).
Le serveur expose déjà `POST /api/v1/agents/logs` (body
`{machine_id, logs:[{ts, level, logger, message}]}`, borne
`RPA_AGENT_LOGS_MAX_BATCH`). Côté client, on veut :
- `LogShipperHandler(logging.Handler)` : sur `emit`, formate un LogRecord
au schéma exact `{ts, level, logger, message}`, applique un assainissement
PII au message, et empile dans un buffer.
- `LogShipper` : flush périodique du buffer par BATCH (≤ max_batch) via un
`sender` callable INJECTABLE `(machine_id, logs) -> bool`. Résilience :
si `sender` renvoie False ou lève, les logs RESTENT (rejoués au flush
suivant — ZÉRO perte ; conformité AI Act Art. 12).
Le module est chargé par chemin (importlib) pour ne dépendre d'aucun import
lourd du package client (cf. DETTE-011/013, comme test_agent_v1_logging.py).
"""
import importlib.util
import logging
from pathlib import Path
import pytest
_MOD_PATH = (
Path(__file__).resolve().parents[2]
/ "agent_v0" / "agent_v1" / "network" / "log_shipper.py"
)
def _load_module():
spec = importlib.util.spec_from_file_location("lea_log_shipper", _MOD_PATH)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@pytest.fixture
def mod():
return _load_module()
def _make_record(name="lea.test", level=logging.INFO, msg="hello %s", args=("world",)):
"""Construit un vrai LogRecord (pas un mock) pour tester le formatage."""
return logging.LogRecord(
name=name, level=level, pathname=__file__, lineno=1,
msg=msg, args=args, exc_info=None,
)
# ---------------------------------------------------------------------------
# 1. emit formate un LogRecord au schéma exact {ts, level, logger, message}
# ---------------------------------------------------------------------------
def test_emit_formate_au_schema_exact(mod):
shipper = mod.LogShipper(machine_id="poste-1", sender=lambda m, l: True)
handler = shipper.handler
handler.emit(_make_record(name="lea.captor", level=logging.WARNING,
msg="bonjour %s", args=("monde",)))
buffered = shipper.peek_buffer()
assert len(buffered) == 1
entry = buffered[0]
# Schéma EXACT : pas de clé en plus, pas de clé en moins.
assert set(entry.keys()) == {"ts", "level", "logger", "message"}
assert entry["level"] == "WARNING"
assert entry["logger"] == "lea.captor"
assert entry["message"] == "bonjour monde" # args interpolés
assert isinstance(entry["ts"], (int, float))
# ---------------------------------------------------------------------------
# 2. log_safe / assainissement PII appliqué au message avant envoi
# ---------------------------------------------------------------------------
def test_pii_assaini_avant_envoi(mod):
# Sanitizer injecté déterministe : PII -> token (mime anonymize_text).
def fake_sanitizer(text):
return text.replace("ROSSIGNOL", "[NOM_1]")
shipper = mod.LogShipper(
machine_id="poste-1", sender=lambda m, l: True,
message_sanitizer=fake_sanitizer,
)
shipper.handler.emit(_make_record(msg="clic sur patient ROSSIGNOL", args=None))
entry = shipper.peek_buffer()[0]
assert "ROSSIGNOL" not in entry["message"]
assert "[NOM_1]" in entry["message"]
# ---------------------------------------------------------------------------
# 3. flush envoie un batch <= max et appelle sender(machine_id, logs)
# ---------------------------------------------------------------------------
def test_flush_envoie_batch_borne_et_appelle_sender(mod):
calls = []
def sender(machine_id, logs):
calls.append((machine_id, logs))
return True
shipper = mod.LogShipper(machine_id="poste-42", sender=sender, max_batch=10)
for i in range(5):
shipper.handler.emit(_make_record(msg=f"event {i}", args=None))
sent = shipper.flush()
assert sent == 5
assert len(calls) == 1
machine_id, logs = calls[0]
assert machine_id == "poste-42"
assert len(logs) == 5
assert logs[0]["message"] == "event 0"
# Buffer vidé après succès
assert shipper.peek_buffer() == []
# ---------------------------------------------------------------------------
# 4. sender échoue (False / exception) -> logs CONSERVÉS, rejoués au flush suivant
# ---------------------------------------------------------------------------
def test_sender_echec_false_conserve_les_logs(mod):
state = {"fail": True, "received": None}
def flaky_sender(machine_id, logs):
if state["fail"]:
return False # échec récupérable
state["received"] = list(logs)
return True
shipper = mod.LogShipper(machine_id="p", sender=flaky_sender)
for i in range(3):
shipper.handler.emit(_make_record(msg=f"m{i}", args=None))
sent = shipper.flush() # échec
assert sent == 0
assert len(shipper.peek_buffer()) == 3 # ZÉRO perte
state["fail"] = False
sent = shipper.flush() # rejeu
assert sent == 3
assert [e["message"] for e in state["received"]] == ["m0", "m1", "m2"]
assert shipper.peek_buffer() == []
def test_sender_exception_conserve_les_logs(mod):
def exploding_sender(machine_id, logs):
raise ConnectionError("serveur down")
shipper = mod.LogShipper(machine_id="p", sender=exploding_sender)
shipper.handler.emit(_make_record(msg="important", args=None))
sent = shipper.flush() # ne doit PAS propager
assert sent == 0
assert len(shipper.peek_buffer()) == 1 # log conservé
# ---------------------------------------------------------------------------
# 5. buffer vide -> sender NON appelé
# ---------------------------------------------------------------------------
def test_buffer_vide_sender_non_appele(mod):
calls = []
shipper = mod.LogShipper(
machine_id="p", sender=lambda m, l: calls.append((m, l)) or True
)
sent = shipper.flush()
assert sent == 0
assert calls == []
# ---------------------------------------------------------------------------
# 6. > max_batch entrées -> découpage en plusieurs batches
# ---------------------------------------------------------------------------
def test_decoupage_en_plusieurs_batches(mod):
batches = []
def sender(machine_id, logs):
batches.append(len(logs))
return True
shipper = mod.LogShipper(machine_id="p", sender=sender, max_batch=3)
for i in range(7):
shipper.handler.emit(_make_record(msg=f"x{i}", args=None))
sent = shipper.flush()
assert sent == 7
# 7 entrées, max_batch=3 -> 3 + 3 + 1
assert batches == [3, 3, 1]
# Chaque batch <= max_batch
assert all(n <= 3 for n in batches)
assert shipper.peek_buffer() == []
def test_decoupage_echec_partiel_conserve_le_reste(mod):
"""Si un batch intermédiaire échoue, on arrête et on garde le reste (0 perte)."""
batches = []
def sender(machine_id, logs):
batches.append([e["message"] for e in logs])
# Le 2e batch échoue
return len(batches) != 2
shipper = mod.LogShipper(machine_id="p", sender=sender, max_batch=2)
for i in range(6):
shipper.handler.emit(_make_record(msg=f"x{i}", args=None))
sent = shipper.flush()
# 1er batch (x0,x1) part ; 2e (x2,x3) échoue -> on arrête.
assert sent == 2
assert batches[0] == ["x0", "x1"]
# x2..x5 restent dans le buffer dans l'ordre.
restant = [e["message"] for e in shipper.peek_buffer()]
assert restant == ["x2", "x3", "x4", "x5"]

View File

@@ -0,0 +1,296 @@
"""Tests du role_mapper : reconstruction de champs ANCRÉS sur l'OCR.
Principe cardinal (cf. gate vert 30/06) : le VLM ne fournit QUE des ids de tokens OCR
(value_ids) ; la valeur est reconstruite côté Python depuis l'OCR. Aucun texte produit
par le VLM ne doit pouvoir entrer dans une valeur -> 0 hallucination par construction.
"""
import pytest
from core.extraction.role_mapper import (
MappedField,
OcrToken,
assess_quality,
build_role_prompt,
map_roles,
reconstruct_fields,
tokens_from_grid,
)
def _tok(tid, text, conf=0.9, bbox=(0, 0, 10, 10)):
return OcrToken(id=tid, text=text, confidence=conf, bbox=bbox)
def test_reconstruit_value_concatene_tokens_dans_lordre():
tokens = [_tok(0, "DUPONT"), _tok(1, "Jean")]
fields = reconstruct_fields(tokens, [{"label": "Nom complet", "value_ids": [0, 1]}])
assert len(fields) == 1
assert fields[0].label == "Nom complet"
assert fields[0].value == "DUPONT Jean"
assert fields[0].anchored is True
def test_ignore_les_ids_hors_plage_et_les_liste():
tokens = [_tok(0, "DUPONT")]
fields = reconstruct_fields(tokens, [{"label": "Nom", "value_ids": [0, 99]}])
assert fields[0].value == "DUPONT"
assert fields[0].invalid_ids == [99]
assert fields[0].anchored is True
def test_value_ids_vide_donne_champ_non_ancre():
tokens = [_tok(0, "DUPONT")]
fields = reconstruct_fields(tokens, [{"label": "Poids", "value_ids": []}])
assert fields[0].value == ""
assert fields[0].anchored is False
def test_aucun_id_valide_donne_champ_non_ancre():
tokens = [_tok(0, "DUPONT")]
fields = reconstruct_fields(tokens, [{"label": "Poids", "value_ids": [7, 8]}])
assert fields[0].anchored is False
assert fields[0].value == ""
assert fields[0].invalid_ids == [7, 8]
def test_dedup_ids_en_preservant_lordre():
tokens = [_tok(0, "DUPONT"), _tok(1, "Jean")]
fields = reconstruct_fields(tokens, [{"label": "X", "value_ids": [1, 1, 0]}])
assert fields[0].value == "Jean DUPONT"
assert fields[0].value_ids == [1, 0]
def test_confidence_est_le_min_des_tokens_ancres():
tokens = [_tok(0, "A", conf=0.95), _tok(1, "B", conf=0.70)]
fields = reconstruct_fields(tokens, [{"label": "X", "value_ids": [0, 1]}])
assert fields[0].confidence == pytest.approx(0.70)
def test_bbox_englobante_des_tokens_ancres():
tokens = [_tok(0, "A", bbox=(0, 0, 10, 10)), _tok(1, "B", bbox=(20, 5, 40, 15))]
fields = reconstruct_fields(tokens, [{"label": "X", "value_ids": [0, 1]}])
assert fields[0].bbox == (0, 0, 40, 15)
def test_invariant_aucun_texte_hors_ocr():
# 'value' fournie par le VLM est ignorée : seul value_ids compte.
tokens = [_tok(0, "DUPONT")]
fields = reconstruct_fields(
tokens, [{"label": "Nom", "value_ids": [0], "value": "HALLUCINATION"}]
)
assert fields[0].value == "DUPONT"
def test_tokens_from_grid_indexe_et_normalise_bbox():
# grille extract_grid_from_image : bbox = 4 points EasyOCR
grid = [
[
{"text": "Nom", "bbox": [[0, 0], [10, 0], [10, 8], [0, 8]],
"confidence": 0.9, "row": 0, "col": 0},
{"text": "DUPONT", "bbox": [[20, 0], [60, 0], [60, 8], [20, 8]],
"confidence": 0.95, "row": 0, "col": 1},
],
]
tokens = tokens_from_grid(grid)
assert [t.id for t in tokens] == [0, 1]
assert tokens[0].text == "Nom"
assert tokens[1].bbox == (20, 0, 60, 8)
# --- map_roles : orchestrateur (client VLM injectable, donc testable hors-ligne) ---
def _fake_client(response, capture=None):
"""Faux client VLM : enregistre éventuellement le prompt reçu, renvoie une réponse fixe."""
def client(image_path, prompt):
if capture is not None:
capture["prompt"] = prompt
capture["image_path"] = image_path
return response
return client
def test_map_roles_reconstruit_via_client_injecte():
tokens = [_tok(0, "DUPONT"), _tok(1, "Jean")]
client = _fake_client('{"champs":[{"label":"Nom complet","value_ids":[0,1]}]}')
fields = map_roles("img.png", tokens, client)
assert len(fields) == 1
assert fields[0].label == "Nom complet"
assert fields[0].value == "DUPONT Jean"
def test_map_roles_tolere_les_fences_json():
tokens = [_tok(0, "DUPONT")]
client = _fake_client('```json\n{"champs":[{"label":"Nom","value_ids":[0]}]}\n```')
fields = map_roles("img.png", tokens, client)
assert fields[0].value == "DUPONT"
def test_map_roles_json_invalide_retourne_liste_vide():
# robustesse batch : une réponse VLM non-JSON ne doit pas crasher.
tokens = [_tok(0, "DUPONT")]
client = _fake_client("désolé, je n'ai pas compris")
fields = map_roles("img.png", tokens, client)
assert fields == []
def test_build_role_prompt_inclut_les_tokens_avec_ids():
tokens = [_tok(0, "Poids"), _tok(1, "72")]
prompt = build_role_prompt(tokens)
assert "Poids" in prompt and "72" in prompt
assert "value_ids" in prompt # on demande bien des ids, pas du texte recopié
def test_build_role_prompt_guide_liste_les_roles_attendus():
tokens = [_tok(0, "X")]
prompt = build_role_prompt(tokens, roles=["Nom", "IPP", "Poids"])
assert "Nom" in prompt and "IPP" in prompt and "Poids" in prompt
def test_map_roles_passe_les_roles_au_prompt():
tokens = [_tok(0, "X")]
cap = {}
client = _fake_client('{"champs":[]}', capture=cap)
map_roles("img.png", tokens, client, roles=["Diagnostic", "GEMSA"])
assert "Diagnostic" in cap["prompt"] and "GEMSA" in cap["prompt"]
# ---------------------------------------------------------------------------
# assess_quality — évaluation de la qualité d'extraction d'un dossier
# ---------------------------------------------------------------------------
def _field(label, value="val", anchored=True, confidence=0.9, value_ids=None, invalid_ids=None):
"""Helper : construit un MappedField directement (sans passer par OCR/VLM)."""
return MappedField(
label=label,
value=value if anchored else "",
value_ids=value_ids or ([0] if anchored else []),
confidence=confidence,
bbox=(0, 0, 10, 10) if anchored else None,
anchored=anchored,
invalid_ids=invalid_ids or [],
)
# --- failed ---
def test_assess_quality_failed_aucun_champ():
"""Liste vide → failed."""
assert assess_quality([]) == "failed"
def test_assess_quality_failed_aucun_champ_ancre():
"""Tous non ancrés → failed."""
fields = [_field("Nom", anchored=False), _field("IPP", anchored=False)]
assert assess_quality(fields) == "failed"
def test_assess_quality_failed_un_champ_value_vide():
"""Un seul champ, anchored=False, value vide → failed."""
fields = [_field("Nom", anchored=False, value_ids=[])]
assert assess_quality(fields) == "failed"
# --- needs_review ---
def test_assess_quality_needs_review_role_requis_absent():
"""Un rôle requis n'est pas dans fields → needs_review."""
fields = [_field("Nom", anchored=True)]
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "needs_review"
def test_assess_quality_needs_review_role_requis_non_ancre():
"""Rôle requis présent mais anchored=False → needs_review."""
fields = [_field("Nom", anchored=True), _field("IPP", anchored=False)]
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "needs_review"
def test_assess_quality_needs_review_matching_insensible_casse():
"""Matching label ↔ required_role insensible à la casse."""
fields = [_field("nom complet", anchored=True), _field("ipp", anchored=True)]
# required_roles en maj : doit quand même matcher
assert assess_quality(fields, required_roles=["Nom Complet", "IPP"]) != "needs_review"
def test_assess_quality_needs_review_matching_insensible_espaces():
"""Matching insensible aux espaces en trop (strip)."""
fields = [_field(" Nom ", anchored=True)]
assert assess_quality(fields, required_roles=["Nom"]) != "needs_review"
def test_assess_quality_needs_review_priorite_sur_partial():
"""needs_review > partial : role manquant + confidence basse → needs_review."""
fields = [
_field("Nom", anchored=True, confidence=0.4), # basse
# "IPP" absent → needs_review
]
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "needs_review"
# --- partial ---
def test_assess_quality_partial_confidence_basse():
"""Tous requis ancrés mais un champ ancré a confidence < min_confidence → partial."""
fields = [
_field("Nom", anchored=True, confidence=0.9),
_field("IPP", anchored=True, confidence=0.4), # < 0.6
]
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "partial"
def test_assess_quality_partial_champs_non_ancres_en_surplus():
"""Tous requis ancrés, confidence ok, mais il y a des champs non ancrés en plus → partial."""
fields = [
_field("Nom", anchored=True, confidence=0.9),
_field("Inconnu", anchored=False), # non ancré hors requis
]
assert assess_quality(fields, required_roles=["Nom"]) == "partial"
def test_assess_quality_partial_sans_required_roles_confidence_basse():
"""Sans required_roles, un champ ancré à confidence basse → partial."""
fields = [
_field("Nom", anchored=True, confidence=0.9),
_field("IPP", anchored=True, confidence=0.3),
]
assert assess_quality(fields) == "partial"
def test_assess_quality_partial_sans_required_roles_champ_non_ancre():
"""Sans required_roles, au moins un champ non ancré → partial."""
fields = [
_field("Nom", anchored=True, confidence=0.9),
_field("IPP", anchored=False),
]
assert assess_quality(fields) == "partial"
# --- complete ---
def test_assess_quality_complete_tous_requis_ancres_confidence_ok():
"""Tous requis ancrés, toutes confidences >= 0.6, aucun non ancré → complete."""
fields = [
_field("Nom", anchored=True, confidence=0.9),
_field("IPP", anchored=True, confidence=0.7),
]
assert assess_quality(fields, required_roles=["Nom", "IPP"]) == "complete"
def test_assess_quality_complete_sans_required_roles():
"""Sans required_roles, au moins un champ ancré, tous >= min_confidence, aucun non ancré → complete."""
fields = [
_field("Nom", anchored=True, confidence=0.8),
_field("IPP", anchored=True, confidence=0.95),
]
assert assess_quality(fields) == "complete"
def test_assess_quality_complete_seuil_exactement_min_confidence():
"""Confidence exactement égale à min_confidence (0.6) → complete (borne incluse)."""
fields = [_field("Nom", anchored=True, confidence=0.6)]
assert assess_quality(fields, required_roles=["Nom"]) == "complete"
def test_assess_quality_complete_min_confidence_personnalise():
"""Seuil personnalisé : confidence=0.7 >= min_confidence=0.7 → complete."""
fields = [_field("Nom", anchored=True, confidence=0.7)]
assert assess_quality(fields, min_confidence=0.7) == "complete"

View File

@@ -0,0 +1,163 @@
"""Tests TDD de sanitize_log_entries — assainissement PII des logs Léa reçus côté serveur.
Branche feat/push-log-dgx. N'importe QUE pii_sanitizer (pas api_stream, DETTE-013).
"""
from __future__ import annotations
import sys
from pathlib import Path
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
# ---------------------------------------------------------------------------
# 1. message avec PII → brut absent, tokens présents
# ---------------------------------------------------------------------------
def test_message_pii_tokenise():
"""Un nom clinique + numéro long disparaissent ; des tokens [...] les remplacent.
Couche 1 (regex, sans NER) : détecte le format « Prénom NOM » (RE_PRENOM_NOM)
et l'IPP structuré (RE_IPP). Le format inverse « NOM Prénom » relève de la
couche 2 NER — hors scope ici.
"""
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
entries = [
{
"ts": "2026-06-30T10:00:00Z",
"level": "INFO",
"logger": "lea.replay",
"message": "Ouverture dossier Catherine MOREL IPP: 295841",
}
]
result = sanitize_log_entries(entries)
assert len(result) == 1
msg = result[0]["message"]
assert "MOREL" not in msg, f"NOM toujours présent : {msg!r}"
assert "Catherine" not in msg, f"Prénom toujours présent : {msg!r}"
assert "295841" not in msg, f"IPP toujours présent : {msg!r}"
assert "[" in msg, f"Aucun token dans : {msg!r}"
# ---------------------------------------------------------------------------
# 2. ts / level préservés à l'identique
# ---------------------------------------------------------------------------
def test_ts_level_preserves():
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
entries = [
{"ts": "2026-06-30T10:00:00Z", "level": "WARNING",
"logger": "lea.core", "message": "simple message sans pii"}
]
result = sanitize_log_entries(entries)
assert result[0]["ts"] == "2026-06-30T10:00:00Z"
assert result[0]["level"] == "WARNING"
# ---------------------------------------------------------------------------
# 3. liste vide → liste vide
# ---------------------------------------------------------------------------
def test_liste_vide():
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
assert sanitize_log_entries([]) == []
# ---------------------------------------------------------------------------
# 4. entrée sans clé `message` → pas de crash, entrée conservée
# ---------------------------------------------------------------------------
def test_entree_sans_message():
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
entries = [{"ts": "2026-06-30T10:00:01Z", "level": "DEBUG", "logger": "lea.init"}]
result = sanitize_log_entries(entries)
assert len(result) == 1
assert "message" not in result[0] # champ absent → reste absent
# ---------------------------------------------------------------------------
# 5. cohérence : même PII dans 2 entrées → même token (mapping partagé)
# ---------------------------------------------------------------------------
def test_coherence_mapping_partage():
"""La même PII dans deux messages du batch reçoit le même token."""
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
entries = [
{"ts": "T1", "level": "INFO", "logger": "l", "message": "IPP: 295841 reçu"},
{"ts": "T2", "level": "INFO", "logger": "l", "message": "Relance IPP: 295841"},
]
result = sanitize_log_entries(entries)
msg1 = result[0]["message"]
msg2 = result[1]["message"]
# le brut est absent des deux
assert "295841" not in msg1
assert "295841" not in msg2
# le token est identique (mapping partagé)
import re
tokens1 = re.findall(r"\[IPP_\d+\]", msg1)
tokens2 = re.findall(r"\[IPP_\d+\]", msg2)
assert tokens1, f"Pas de token IPP dans msg1 : {msg1!r}"
assert tokens2, f"Pas de token IPP dans msg2 : {msg2!r}"
assert tokens1[0] == tokens2[0], (
f"Tokens différents pour la même PII : {tokens1[0]} vs {tokens2[0]}"
)
# ---------------------------------------------------------------------------
# 6. `message` non-str → skip proprement, pas de crash
# ---------------------------------------------------------------------------
def test_message_non_str():
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
entries = [
{"ts": "T1", "level": "INFO", "logger": "l", "message": None},
{"ts": "T2", "level": "INFO", "logger": "l", "message": 42},
{"ts": "T3", "level": "INFO", "logger": "l", "message": ["liste"]},
]
result = sanitize_log_entries(entries)
assert len(result) == 3
# les valeurs non-str sont préservées telles quelles
assert result[0]["message"] is None
assert result[1]["message"] == 42
assert result[2]["message"] == ["liste"]
# ---------------------------------------------------------------------------
# 7. champ `logger` str est aussi assaini si porteur de PII
# ---------------------------------------------------------------------------
def test_logger_pii_tokenise():
"""Si le champ logger contient de la PII (ex. chemin patient), il est assaini."""
from agent_v0.server_v1.pii_sanitizer import sanitize_log_entries
entries = [
{
"ts": "T1",
"level": "INFO",
"logger": "lea.patient.MOREL_Catherine",
"message": "step start",
}
]
result = sanitize_log_entries(entries)
logger_out = result[0]["logger"]
# Le NOM doit être tokenisé (RE_PRENOM_NOM captera « Catherine MOREL » …
# mais « MOREL_Catherine » n'est pas le format clinique standard — le test
# vérifie surtout qu'il n'y a pas de crash et que le champ est traité.)
# On ne fixe pas d'assertion sur la valeur : juste pas de crash.
assert isinstance(logger_out, str)