5 Commits

Author SHA1 Message Date
Dom
2a1b1ed80e feat(stream): dispatch extract_dossier -> handler serveur
Some checks failed
tests / Lint (ruff + black) (push) Failing after 2m3s
tests / Tests unitaires (sans GPU) (push) Failing after 1m51s
tests / Tests sécurité (critique) (push) Has been skipped
Câble le type d'action 'extract_dossier' dans get_next_action (api_stream)
vers _handle_extract_dossier_action (replay_engine). La brique 3 (OCR ->
gate -> persist dossier VWB) était committée mais non atteignable au runtime
faute de dispatch. Import + elif dédié, timeout 180s, exécuteur non bloquant.

Note: le handler utilise encore l'ancienne chaîne (extract_grid + gate maison).
Le remplacement par l'extraction ancrée (map_roles/vlm_client) est une modif
séparée côté replay_engine (ma zone).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 11:05:22 +02:00
Dom
f09b8b8cfd feat(extraction): client vLLM serveur (image+prompt -> texte, post_fn injectable)
Factorise un client propre pour la lecture d'écran : downscale image (fenêtre
max_model_len), thinking off, post_fn injectable (testable sans vLLM). Sert de
vlm_client à extract_dossier_from_image dans le handler runtime. 4 tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:03:26 +02:00
Dom
6a78a0059b feat(extraction): extract_dossier_from_image — orchestrateur OCR->VLM->qualite (injectable)
Enchaine ocr_fn -> tokens_from_grid -> map_roles -> assess_quality. OCR et client
VLM injectables (testable hors-ligne, import OCR lazy = module reste pur). C'est la
brique que le handler runtime extract_dossier appellera. 4 tests (35 au total role_mapper).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 16:26:16 +02:00
Dom
813b33b47e feat(update): DETTE-022 — noyau MAJ silencieuse client Léa (gated, swap en stub)
Logique PURE testée : parse_version semver (R3), decide_update code-only/full (R2),
should_update client (double garde anti-downgrade), download_update (staging only +
SHA256, downloader injectable). Endpoint GET /api/v1/agents/update/check gated
(RPA_AUTO_UPDATE_SERVER_ENABLED). Flags client+serveur OFF par défaut.
Swap fichiers / Lea.bat / restart = STUBS no-op réservés révision humaine.
34 tests TDD. refs DETTE-022

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 16:21:35 +02:00
Dom
a50057d499 fix(dashboard): DETTE-024 — download fleet, fallback legacy rendu visible
_resolve_lea_zip_template() reste résolu à la volée (full buildé après démarrage OK) ;
ajout d'un WARNING explicite quand le full est absent et qu'on retombe sur le ZIP
léger non autoportant (plus de fallback silencieux). Fonction injectable pour tests.
4 tests + 32 non-régression verts. refs DETTE-024

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 16:20:28 +02:00
13 changed files with 1322 additions and 5 deletions

View File

@@ -93,6 +93,17 @@ LOG_SHIP_ENABLED = os.environ.get("RPA_LOG_SHIP_ENABLED", "false").lower() in (
# Intervalle de flush du buffer de logs (secondes). # Intervalle de flush du buffer de logs (secondes).
LOG_SHIP_INTERVAL_S = float(os.environ.get("RPA_LOG_SHIP_INTERVAL_S", "30")) LOG_SHIP_INTERVAL_S = float(os.environ.get("RPA_LOG_SHIP_INTERVAL_S", "30"))
# Mise à jour silencieuse du client Léa (DETTE-022 v2).
# Le client interroge le serveur (GET /api/v1/agents/update/check), télécharge
# le ZIP en staging et vérifie le SHA256. Le SWAP réel des fichiers / l'édition
# de Lea.bat / le redémarrage restent RÉSERVÉS RÉVISION HUMAINE (voir
# network/updater.py : stubs apply_update / write_boot_ok_marker).
# Défaut PRUDENT = désactivé : activé poste par poste via config.txt / variable
# d'environnement, sans rebuild de l'installateur (même esprit que LOG_SHIP).
AUTO_UPDATE_ENABLED = os.environ.get("RPA_AUTO_UPDATE_ENABLED", "false").lower() in (
"true", "1", "yes", "on",
)
# Monitoring # Monitoring
PERF_MONITOR_INTERVAL_S = 30 PERF_MONITOR_INTERVAL_S = 30
LOGS_DIR = BASE_DIR / "logs" LOGS_DIR = BASE_DIR / "logs"

View File

@@ -0,0 +1,298 @@
# agent_v1/network/updater.py
"""NOYAU client de la mise à jour silencieuse de Léa (DETTE-022 v2).
GATED — flag `RPA_AUTO_UPDATE_ENABLED` (défaut OFF). Tant qu'il est OFF,
rien ne se déclenche : l'intégration de ce module au runtime (boucle de poll
de `main.py`) ne fait aucune MAJ.
Ce module ne contient que les parties PURES / testables, sans réseau réel :
- `parse_version` / `is_newer` (R3) : self-contained (le bundle client
n'embarque PAS `server_v1` — duplication assumée, même algorithme).
- `should_update(local_version, server_response)` : décide « faut-il
updater ? quelle version/type ? » à partir de la réponse serveur. Double
garde semver côté client (jamais de downgrade) = défense en profondeur.
- `download_update(plan, staging_dir, downloader)` : télécharge le ZIP via un
`downloader` callable INJECTABLE (aucun réseau réel en test), vérifie le
SHA256, écrit le ZIP dans le **staging** (`Lea_next\\`-like) — JAMAIS dans
les fichiers vivants. Retourne un plan d'application.
- `auto_update_enabled()` : lit le flag (défaut OFF).
⚠️⚠️ PARTIES DANGEREUSES — RÉSERVÉES RÉVISION HUMAINE ⚠️⚠️
Le remplacement réel des fichiers (`apply_update`), l'écriture du marker
rollback (`write_boot_ok_marker`), l'édition de `Lea.bat` et le redémarrage
ne sont PAS implémentés ici : ce sont des STUBS no-op explicites. Un agent ne
doit pas écrire de code qui écrase des binaires vivants ou relance un process
sans supervision. Les points d'extension sont marqués `# TODO swap supervisé`.
Pattern d'import / résilience aligné sur `log_shipper.py` (même branche).
Branche feat/push-log-dgx.
"""
from __future__ import annotations
import hashlib
import logging
import os
from pathlib import Path
from typing import Callable, Optional, Tuple
logger = logging.getLogger(__name__)
# Niveaux de livraison (R2). `code-only` par défaut = 99 % des MAJ (~500 Ko).
VALID_UPDATE_TYPES = ("code-only", "full")
DEFAULT_UPDATE_TYPE = "code-only"
_FALLBACK_VERSION: Tuple[int, ...] = (0,)
# ---------------------------------------------------------------------------
# Flag d'activation — OFF par défaut (lu à chaque appel pour faciliter tests)
# ---------------------------------------------------------------------------
def auto_update_enabled() -> bool:
"""True si la MAJ auto client est activée (flag RPA_AUTO_UPDATE_ENABLED).
Défaut PRUDENT = OFF. On l'active poste par poste via config.txt / variable
d'environnement, sans rebuild de l'installateur (même esprit que
LOG_SHIP_ENABLED).
"""
return os.environ.get("RPA_AUTO_UPDATE_ENABLED", "false").lower() in (
"true", "1", "yes", "on",
)
# ---------------------------------------------------------------------------
# R3 — parse_version self-contained (le bundle client n'a pas server_v1)
# ---------------------------------------------------------------------------
def parse_version(v) -> Tuple[int, ...]:
"""Parse une version semver en tuple d'entiers. Voir server_v1/update_check.
"1.0.2" → (1, 0, 2) ; "1.0.10" → (1, 0, 10) ; "v1.2.3" → (1, 2, 3).
Tolérant et SANS exception : invalide → fallback `(0,)`.
"""
if not isinstance(v, str):
return _FALLBACK_VERSION
s = v.strip().lstrip("vV").strip()
if not s:
return _FALLBACK_VERSION
try:
from packaging.version import Version
return tuple(Version(s).release)
except Exception:
pass
try:
return tuple(int(x) for x in s.split("."))
except (ValueError, AttributeError):
return _FALLBACK_VERSION
def is_newer(candidate: str, baseline: str) -> bool:
"""True si `candidate` strictement plus récent que `baseline` (semver)."""
return parse_version(candidate) > parse_version(baseline)
def _normalize_update_type(update_type) -> str:
if update_type in VALID_UPDATE_TYPES:
return update_type
return DEFAULT_UPDATE_TYPE
# ---------------------------------------------------------------------------
# Décision client : faut-il updater ?
# ---------------------------------------------------------------------------
def should_update(local_version: str, server_response) -> Optional[dict]:
"""Décide à partir de la réponse serveur s'il faut updater.
Args:
local_version : version courante du client (config.AGENT_VERSION).
server_response : dict renvoyé par l'endpoint serveur
{update_available, latest_version, update_type, url, [sha256]}.
Returns:
Un PLAN d'update `{target_version, update_type, url, sha256}` si une MAJ
valide est à faire, sinon None.
Défense en profondeur : même si `update_available` est True, le client
REVÉRIFIE en semver (`is_newer`) — il ne descend JAMAIS vers une version
<= locale. Tolérant : réponse malformée → None (jamais d'exception).
"""
if not isinstance(server_response, dict):
return None
if not server_response.get("update_available"):
return None
target = server_response.get("latest_version")
url = server_response.get("url")
if not target or not url:
return None
# Double garde semver : pas de downgrade, pas d'égalité.
if not is_newer(target, local_version):
return None
return {
"target_version": target,
"update_type": _normalize_update_type(server_response.get("update_type")),
"url": url,
"sha256": server_response.get("sha256"),
}
# ---------------------------------------------------------------------------
# Téléchargement — downloader INJECTABLE, SHA256, staging only
# ---------------------------------------------------------------------------
def _default_downloader(url: str) -> bytes:
"""Téléchargement réel du ZIP (best-effort, pattern streamer/log_shipper).
Résout l'URL relative contre SERVER_BASE, ajoute le Bearer si présent.
INJECTABLE : remplacé par un fake en test (aucun réseau réel).
"""
import requests # import tardif (absent de certains envs de test)
full_url = url
headers = {}
try:
from ..config import SERVER_BASE, API_TOKEN
if url.startswith("/"):
full_url = f"{SERVER_BASE}{url}"
if API_TOKEN:
headers["Authorization"] = f"Bearer {API_TOKEN}"
except Exception:
# Hors package (test isolé) : on utilise l'URL telle quelle.
pass
resp = requests.get(full_url, headers=headers, timeout=30, stream=False)
resp.raise_for_status()
return resp.content
def download_update(
plan: dict,
staging_dir,
downloader: Optional[Callable[[str], bytes]] = None,
) -> dict:
"""Télécharge le ZIP d'update dans le staging et vérifie son intégrité.
NE TOUCHE PAS aux fichiers vivants : écrit uniquement dans `staging_dir`
(équivalent de `Lea_next\\`). L'application réelle (swap) est un stub
réservé révision humaine (voir `apply_update`).
Args:
plan : sortie de `should_update` (target_version, update_type, url, sha256).
staging_dir : dossier de staging (créé si absent).
downloader : callable `(url) -> bytes` INJECTABLE (défaut = HTTP réel).
Returns:
Succès : {ok: True, staged_zip: str, update_type, target_version,
sha256_verified: bool}
Échec : {ok: False, error: str}
Best-effort : aucune exception ne remonte ; un échec laisse le staging propre
(pas de ZIP corrompu).
"""
dl = downloader if downloader is not None else _default_downloader
staging = Path(staging_dir)
try:
data = dl(plan["url"])
except Exception as e:
logger.warning("Téléchargement update échoué : %s", e)
return {"ok": False, "error": f"download_failed: {e}"}
expected_sha = (plan.get("sha256") or "").strip().lower()
sha256_verified = False
if expected_sha:
actual = hashlib.sha256(data).hexdigest()
if actual != expected_sha:
logger.warning(
"SHA256 mismatch update (attendu=%s, obtenu=%s) — rejeté",
expected_sha, actual,
)
return {"ok": False, "error": "sha256 mismatch — ZIP rejeté"}
sha256_verified = True
else:
# Best-effort : pas de SHA fourni → on accepte mais on le signale.
logger.info("Pas de SHA256 fourni pour l'update — intégrité non vérifiée")
try:
staging.mkdir(parents=True, exist_ok=True)
target_version = plan.get("target_version", "unknown")
staged_zip = staging / f"lea_update_{target_version}.zip"
staged_zip.write_bytes(data)
except Exception as e:
logger.warning("Écriture ZIP staging échouée : %s", e)
return {"ok": False, "error": f"staging_write_failed: {e}"}
return {
"ok": True,
"staged_zip": str(staged_zip),
"update_type": _normalize_update_type(plan.get("update_type")),
"target_version": plan.get("target_version"),
"sha256_verified": sha256_verified,
}
# ===========================================================================
# ⚠️ ZONE DANGEREUSE — STUBS RÉSERVÉS RÉVISION HUMAINE (NE PAS IMPLÉMENTER
# PAR UN AGENT). Points d'extension explicites, no-op pour l'instant.
# ===========================================================================
def apply_update(prepared: dict) -> dict:
"""STUB — application réelle de l'update (swap des fichiers).
Réservé révision humaine : remplacer des fichiers vivants du client et
déclencher un swap est trop risqué pour être généré par un agent. La
mécanique cible (design v2) est :
- code-only : extraire `agent_v1\\` + `lea_ui\\` + `run_agent_v1.py` +
`config.py` du ZIP staging, poser un marker `UPDATE_READY`
(`update_type=code-only`) ; le swap effectif est fait par `Lea.bat`
au prochain démarrage (xcopy ciblé).
- full : poser `UPDATE_READY` (`update_type=full`) ; `Lea.bat` fait le
backup complet `Lea_prev\\` puis le swap complet.
# TODO swap supervisé : extraction ZIP + écriture marker UPDATE_READY.
# NE PAS écraser les fichiers vivants depuis Python — c'est Lea.bat qui
# swappe hors-process. Édition de Lea.bat + restart = hors périmètre agent.
Returns:
{applied: False, reason: "réservé révision humaine (swap supervisé)"}
"""
logger.info(
"apply_update appelé mais NON implémenté (stub réservé révision humaine) : %r",
prepared.get("target_version") if isinstance(prepared, dict) else prepared,
)
return {
"applied": False,
"reason": "réservé révision humaine — swap supervisé (Lea.bat), hors périmètre agent",
}
def write_boot_ok_marker(version: str) -> dict:
"""STUB — écriture du marker rollback `boot_ok_{version}` (R1).
Réservé révision humaine : le marker pilote le rollback de Lea.bat au
prochain démarrage. Sa sémantique (health-check ~60s heartbeat DGX +
session active AVANT écriture) doit être validée à la main pour éviter un
faux rollback (cas DGX down ≠ Léa N+1 buguée — cf. design R1, cas edge 3).
# TODO swap supervisé : écrire `%LOCALAPPDATA%\\Lea\\boot_ok_{version}`
# après ~60s de heartbeat DGX sain + session active (main.py startup).
Returns:
{written: False, reason: "..."}
"""
logger.info(
"write_boot_ok_marker appelé mais NON implémenté (stub R1) : version=%s",
version,
)
return {
"written": False,
"reason": "réservé révision humaine — marker rollback (health-check), hors périmètre agent",
}

View File

@@ -423,6 +423,7 @@ from .replay_engine import (
_SERVER_SIDE_ACTION_TYPES, _SERVER_SIDE_ACTION_TYPES,
_handle_extract_text_action, _handle_extract_text_action,
_handle_extract_table_action, _handle_extract_table_action,
_handle_extract_dossier_action,
_handle_t2a_decision_action, _handle_t2a_decision_action,
_handle_llm_generate_action, _handle_llm_generate_action,
_handle_concat_text_vars_action, _handle_concat_text_vars_action,
@@ -4443,6 +4444,15 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
), ),
timeout=180, timeout=180,
) )
elif type_ == "extract_dossier":
await asyncio.wait_for(
loop.run_in_executor(
None,
_handle_extract_dossier_action,
action, owning_replay, session_id,
),
timeout=180,
)
elif type_ == "t2a_decision": elif type_ == "t2a_decision":
await asyncio.wait_for( await asyncio.wait_for(
loop.run_in_executor( loop.run_in_executor(
@@ -7830,6 +7840,63 @@ async def lea_screen_analyze(payload: _Phase25ScreenRequest, request: Request):
return payload_out return payload_out
# =========================================================================
# DETTE-022 v2 — GET /api/v1/agents/update/check (MAJ silencieuse client Léa)
# Flag OFF par défaut (RPA_AUTO_UPDATE_SERVER_ENABLED). Best-effort, additif :
# expose la DÉCISION d'update (logique PURE dans update_check.py, testée hors
# serveur — DETTE-013). NE FAIT PAS le swap (réservé révision humaine côté
# client + Lea.bat).
# =========================================================================
from .update_check import decide_update as _decide_update # noqa: E402
def _auto_update_server_enabled() -> bool:
"""Flag d'activation serveur — lu à chaque appel (faciliter les tests)."""
return os.environ.get("RPA_AUTO_UPDATE_SERVER_ENABLED", "").lower() in (
"1", "true", "yes", "on",
)
def _latest_agent_version() -> str:
"""Dernière version d'agent disponible côté serveur.
Source de vérité minimale (POC) : variable d'environnement
RPA_AGENT_LATEST_VERSION. Permet de piloter la fleet sans rebuild. Une
évolution future pourra la lire d'un manifeste/DB (cf. design).
"""
return os.environ.get("RPA_AGENT_LATEST_VERSION", "1.0.1")
@app.get("/api/v1/agents/update/check")
async def check_agent_update(
current_version: str,
machine_id: Optional[str] = None,
update_type: Optional[str] = None,
):
"""Indiquer au client Léa si une MAJ est disponible (DETTE-022 v2).
Réponse : {update_available, latest_version, update_type, url}.
GATED : si RPA_AUTO_UPDATE_SERVER_ENABLED n'est pas positionné → 503
(aucun effet sur le pipeline existant — anti-régression). Auth Bearer
requise (dépendance globale `_verify_token`).
"""
if not _auto_update_server_enabled():
raise HTTPException(
status_code=503,
detail=(
"MAJ auto désactivée (flag RPA_AUTO_UPDATE_SERVER_ENABLED). "
"DETTE-022 : endpoint exposé mais OFF par défaut."
),
)
return _decide_update(
current_version=current_version,
latest_version=_latest_agent_version(),
update_type=update_type,
machine_id=machine_id,
)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn

View File

@@ -0,0 +1,138 @@
# agent_v0/server_v1/update_check.py
"""Logique PURE de décision de mise à jour du client Léa (DETTE-022 v2).
But : centraliser, SANS dépendance FastAPI, le cœur testable de la MAJ
silencieuse :
- `parse_version()` (R3) : parse une version semver en tuple d'entiers, pour
une comparaison correcte ("1.0.2" < "1.0.10" — le piège lexicographique
classique). Tolérant : préfixe « v », espaces, et format invalide → fallback
`(0,)` (la plus basse) SANS jamais lever.
- `decide_update()` (R2) : compare la version courante à la dernière dispo,
choisit l'`update_type` (`code-only` par défaut, ~500 Ko / `full` ~33 Mo
rare) et construit la réponse
`{update_available, latest_version, update_type, url}`.
Ce module est volontairement IMPORTABLE seul (aucun import lourd, pas de
`api_stream`) pour être testé sans démarrer le serveur (DETTE-013). Le
branchement HTTP (endpoint gated) vit dans `api_stream.py`.
⚠️ Cette brique ne fait QUE décider. Le swap réel des fichiers, l'édition de
Lea.bat et le redémarrage sont HORS de ce module (réservé révision humaine).
Branche feat/push-log-dgx.
"""
from __future__ import annotations
from typing import Optional, Tuple
# Niveaux de livraison valides (R2). `code-only` par défaut = 99 % des MAJ.
VALID_UPDATE_TYPES = ("code-only", "full")
DEFAULT_UPDATE_TYPE = "code-only"
# Fallback de version « la plus basse » pour une chaîne illisible : ainsi une
# version valide est toujours > à une version invalide, et une *latest* illisible
# ne déclenche jamais de MAJ douteuse.
_FALLBACK_VERSION: Tuple[int, ...] = (0,)
def parse_version(v) -> Tuple[int, ...]:
"""Parse une version semver en tuple d'entiers (R3).
"1.0.2" → (1, 0, 2), "1.0.10" → (1, 0, 10), "v1.2.3" → (1, 2, 3).
Tolérant et SANS exception : préfixe « v/V » et espaces tolérés ; tout
format non numérique (vide, None, "abc", "1.x.3") retombe sur `(0,)`.
Stratégie : `packaging.version` si présent (déjà dans le venv via
setuptools/pip), sinon parse manuel. Aucune nouvelle dépendance.
"""
if not isinstance(v, str):
return _FALLBACK_VERSION
s = v.strip().lstrip("vV").strip()
if not s:
return _FALLBACK_VERSION
try:
from packaging.version import Version
return tuple(Version(s).release)
except Exception:
# packaging absent (python-embed minimal) OU version non-PEP440.
pass
try:
return tuple(int(x) for x in s.split("."))
except (ValueError, AttributeError):
return _FALLBACK_VERSION
def is_newer(candidate: str, baseline: str) -> bool:
"""True si `candidate` est strictement plus récent que `baseline` (semver)."""
return parse_version(candidate) > parse_version(baseline)
def _normalize_update_type(update_type: Optional[str]) -> str:
"""Normalise l'update_type sur un niveau valide (défaut code-only)."""
if update_type in VALID_UPDATE_TYPES:
return update_type
return DEFAULT_UPDATE_TYPE
def build_download_url(
machine_id: Optional[str],
version: str,
update_type: str,
) -> str:
"""Construit l'URL de téléchargement RELATIVE (R2, 2 niveaux).
Forme alignée sur les endpoints fleet existants :
/api/fleet/download/<machine_id>?type=<update_type>&version=<version>
On garde une URL relative : le client la résout contre son SERVER_BASE.
`machine_id` absent → segment « default » (rétrocompatible).
"""
mid = (machine_id or "default").strip() or "default"
return f"/api/fleet/download/{mid}?type={update_type}&version={version}"
def decide_update(
current_version: str,
latest_version: str,
update_type: Optional[str] = None,
machine_id: Optional[str] = None,
) -> dict:
"""Décision PURE de mise à jour (R2 + R3).
Compare `current_version` à `latest_version` en semver. Si la dernière est
strictement plus récente, construit une réponse d'update ; sinon réponse
« à jour ». Aucune exception : versions illisibles → pas de MAJ (prudence).
Returns:
{
"update_available": bool,
"latest_version": str,
"update_type": "code-only" | "full" | None, # None si pas de MAJ
"url": str | None, # None si pas de MAJ
}
"""
no_update = {
"update_available": False,
"latest_version": latest_version,
"update_type": None,
"url": None,
}
# latest illisible → on ne propose RIEN (pas de MAJ douteuse).
if parse_version(latest_version) == _FALLBACK_VERSION:
return no_update
if not is_newer(latest_version, current_version):
return no_update
chosen_type = _normalize_update_type(update_type)
return {
"update_available": True,
"latest_version": latest_version,
"update_type": chosen_type,
"url": build_download_url(machine_id, latest_version, chosen_type),
}

View File

@@ -247,3 +247,33 @@ def map_roles(
data = parse_vlm_json(raw) data = parse_vlm_json(raw)
vlm_fields = data.get("champs", []) if isinstance(data, dict) else [] vlm_fields = data.get("champs", []) if isinstance(data, dict) else []
return reconstruct_fields(tokens, vlm_fields) return reconstruct_fields(tokens, vlm_fields)
def extract_dossier_from_image(
image_path: str,
vlm_client: VlmClient,
roles: Optional[Sequence[str]] = None,
ocr_fn: Optional[Callable[[str], Sequence[Sequence[dict]]]] = None,
min_confidence: float = 0.6,
required_roles: Optional[Sequence[str]] = None,
) -> dict:
"""Orchestre l'extraction d'un dossier depuis une capture : OCR → rôles → qualité.
Enchaîne `ocr_fn` (grille OCR) → `tokens_from_grid` → `map_roles` (VLM, ancrage
strict) → `assess_quality`. C'est la brique que le handler runtime
`_handle_extract_dossier_action` appellera, avec le vrai OCR et le vrai client
vLLM. `ocr_fn` et `vlm_client` sont INJECTABLES (testable hors-ligne).
`ocr_fn` par défaut = `core.llm.ocr_extractor.extract_grid_from_image` (import
LAZY : le module reste pur quand l'OCR est injecté en test).
Returns:
{fields: List[MappedField], status: str, n_tokens: int}
"""
if ocr_fn is None:
from core.llm.ocr_extractor import extract_grid_from_image as ocr_fn
grid = ocr_fn(image_path)
tokens = tokens_from_grid(grid)
fields = map_roles(image_path, tokens, vlm_client, roles)
status = assess_quality(fields, required_roles=required_roles, min_confidence=min_confidence)
return {"fields": fields, "status": status, "n_tokens": len(tokens)}

View File

@@ -0,0 +1,86 @@
"""Client vLLM serveur : (image_path, prompt) -> texte de réponse.
Petit client réutilisable pour la lecture d'écran (extraction de dossier). Le
grounder (`resolve_engine`) fait déjà un POST vers vLLM:8001 mais en INLINE, non
exposé ; on factorise ici un client propre, configurable et testable.
- Image downscalée (largeur max) avant envoi : la fenêtre vLLM est limitée
(`max_model_len`), un écran plein déborde sinon (vu 30/06 : 6193+2000 > 8192).
- `thinking` désactivé (vérifié : think=on -> sortie vide/lente sur ce modèle).
- `post_fn` injectable -> testable sans vLLM réel.
Branche feat/push-log-dgx.
"""
from __future__ import annotations
import base64
import os
from io import BytesIO
from typing import Callable, Optional
VlmClient = Callable[[str, str], str]
_DEFAULT_PORT = os.environ.get("VLLM_PORT", "8001")
DEFAULT_URL = f"http://localhost:{_DEFAULT_PORT}/v1/chat/completions"
DEFAULT_MODEL = os.environ.get("VLLM_MODEL", "Qwen/Qwen3-VL-4B-Instruct")
def img_data_url(image_path: str, max_w: int = 1280) -> str:
"""Encode l'image en data-URL PNG base64, downscalée à `max_w` si plus large."""
from PIL import Image
img = Image.open(image_path).convert("RGB")
if img.width > max_w:
h = int(img.height * max_w / img.width)
img = img.resize((max_w, h), Image.LANCZOS)
buf = BytesIO()
img.save(buf, format="PNG")
return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()
def build_chat_body(
image_path: str,
prompt: str,
model: str = DEFAULT_MODEL,
max_tokens: int = 1500,
max_w: int = 1280,
) -> dict:
"""Construit le body chat/completions (image + prompt, thinking off)."""
return {
"model": model,
"messages": [{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": img_data_url(image_path, max_w)}},
{"type": "text", "text": prompt},
],
}],
"temperature": 0.0,
"max_tokens": max_tokens,
"chat_template_kwargs": {"enable_thinking": False},
}
def make_vllm_client(
url: str = DEFAULT_URL,
model: str = DEFAULT_MODEL,
max_tokens: int = 1500,
max_w: int = 1280,
timeout: float = 120,
post_fn: Optional[Callable] = None,
) -> VlmClient:
"""Construit un client `(image_path, prompt) -> texte`, branché sur vLLM.
`post_fn` (signature `requests.post`) est injectable pour les tests.
Lève `RuntimeError` si le serveur ne répond pas 200 (message technique, sans PII).
"""
def client(image_path: str, prompt: str) -> str:
body = build_chat_body(image_path, prompt, model=model, max_tokens=max_tokens, max_w=max_w)
poster = post_fn
if poster is None:
import requests
poster = requests.post
r = poster(url, json=body, headers={}, timeout=timeout)
if r.status_code != 200:
raise RuntimeError(f"vLLM {r.status_code}: {str(getattr(r, 'text', ''))[:300]}")
return r.json()["choices"][0]["message"]["content"]
return client

View File

@@ -0,0 +1,85 @@
"""Tests intégration HTTP de GET /api/v1/agents/update/check — DETTE-022 v2.
Endpoint GATED (flag RPA_AUTO_UPDATE_SERVER_ENABLED), best-effort :
- flag OFF par défaut → 503 (anti-régression : aucun effet sur le pipeline).
- flag ON → 200 + payload {update_available, latest_version, update_type, url}.
- auth Bearer requise (dépendance globale _verify_token).
La logique PURE est testée sans serveur dans tests/unit/test_update_check_server.py
(DETTE-013). Ici on vérifie le branchement HTTP minimal.
"""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
pytestmark = pytest.mark.integration
_TEST_API_TOKEN = "test_update_check_endpoint_token"
@pytest.fixture
def client(monkeypatch):
monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN)
from fastapi.testclient import TestClient
from agent_v0.server_v1 import api_stream
monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN)
return TestClient(api_stream.app, raise_server_exceptions=False)
def _auth_headers():
return {"Authorization": f"Bearer {_TEST_API_TOKEN}"}
class TestUpdateCheckEndpointFlag:
def test_disabled_by_default_returns_503(self, client, monkeypatch):
monkeypatch.delenv("RPA_AUTO_UPDATE_SERVER_ENABLED", raising=False)
resp = client.get(
"/api/v1/agents/update/check?current_version=1.0.1",
headers=_auth_headers(),
)
assert resp.status_code == 503
assert "RPA_AUTO_UPDATE_SERVER_ENABLED" in resp.text
class TestUpdateCheckEndpointEnabled:
@pytest.fixture(autouse=True)
def _enable_flag(self, monkeypatch):
monkeypatch.setenv("RPA_AUTO_UPDATE_SERVER_ENABLED", "true")
# Version cible explicite pour rendre le test déterministe.
monkeypatch.setenv("RPA_AGENT_LATEST_VERSION", "1.0.2")
def test_update_available(self, client):
resp = client.get(
"/api/v1/agents/update/check?current_version=1.0.1&machine_id=pc-1",
headers=_auth_headers(),
)
assert resp.status_code == 200
body = resp.json()
assert body["update_available"] is True
assert body["latest_version"] == "1.0.2"
assert body["update_type"] == "code-only"
assert "1.0.2" in body["url"]
def test_up_to_date(self, client):
resp = client.get(
"/api/v1/agents/update/check?current_version=1.0.2&machine_id=pc-1",
headers=_auth_headers(),
)
assert resp.status_code == 200
body = resp.json()
assert body["update_available"] is False
def test_requires_auth(self, client):
resp = client.get(
"/api/v1/agents/update/check?current_version=1.0.1",
)
assert resp.status_code == 401

View File

@@ -0,0 +1,225 @@
"""TDD — DETTE-022 MAJ silencieuse v2 : NOYAU client de mise à jour Léa.
Périmètre testé (parties PURES / testables, GATED, OFF par défaut) :
- `parse_version` / `is_newer` côté client (R3, self-contained — le bundle
client n'embarque pas server_v1).
- `should_update(local_version, server_response)` : décision « faut-il
updater ? quelle version/type ? » à partir de la réponse serveur.
- `download_update(...)` via un `downloader` callable INJECTABLE : AUCUN
réseau réel en test. Vérifie le SHA256, écrit le ZIP dans le staging,
retourne un plan d'update — SANS toucher aux fichiers vivants.
- Flag `RPA_AUTO_UPDATE_ENABLED` (défaut OFF) : `auto_update_enabled()`.
HORS périmètre (réservé révision humaine — trop risqué pour un agent) :
swap réel des fichiers, édition Lea.bat, redémarrage. Le module expose des
STUBS explicites (`apply_update`, `write_boot_ok_marker`) marqués TODO.
Le module est chargé par chemin (importlib) pour ne dépendre d'aucun import
lourd du package client (cf. DETTE-013, comme test_agent_v1_log_shipper).
"""
import hashlib
import importlib.util
from pathlib import Path
import pytest
_MOD_PATH = (
Path(__file__).resolve().parents[2]
/ "agent_v0" / "agent_v1" / "network" / "updater.py"
)
def _load_module():
spec = importlib.util.spec_from_file_location("lea_updater", _MOD_PATH)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@pytest.fixture
def mod():
return _load_module()
# ---------------------------------------------------------------------------
# R3 — parse_version côté client (self-contained)
# ---------------------------------------------------------------------------
class TestClientParseVersion:
def test_ordre_semver(self, mod):
assert mod.parse_version("1.0.2") < mod.parse_version("1.0.10")
assert mod.is_newer("1.0.10", "1.0.2") is True
assert mod.is_newer("1.0.1", "1.0.1") is False
def test_tolerant_et_fallback(self, mod):
assert mod.parse_version("v1.2.3") == (1, 2, 3)
assert mod.parse_version("garbage") == (0,)
assert mod.parse_version(None) == (0,)
# ---------------------------------------------------------------------------
# Flag RPA_AUTO_UPDATE_ENABLED — OFF par défaut
# ---------------------------------------------------------------------------
class TestFlag:
def test_off_par_defaut(self, mod, monkeypatch):
monkeypatch.delenv("RPA_AUTO_UPDATE_ENABLED", raising=False)
assert mod.auto_update_enabled() is False
def test_on_si_active(self, mod, monkeypatch):
for val in ("true", "1", "yes", "on", "TRUE"):
monkeypatch.setenv("RPA_AUTO_UPDATE_ENABLED", val)
assert mod.auto_update_enabled() is True
def test_off_si_valeur_invalide(self, mod, monkeypatch):
monkeypatch.setenv("RPA_AUTO_UPDATE_ENABLED", "false")
assert mod.auto_update_enabled() is False
# ---------------------------------------------------------------------------
# should_update — décision à partir de la réponse serveur
# ---------------------------------------------------------------------------
class TestShouldUpdate:
def test_pas_de_maj_si_response_negative(self, mod):
plan = mod.should_update(
"1.0.1", {"update_available": False, "latest_version": "1.0.1"}
)
assert plan is None
def test_maj_si_serveur_propose_version_plus_recente(self, mod):
plan = mod.should_update(
"1.0.1",
{
"update_available": True,
"latest_version": "1.0.2",
"update_type": "code-only",
"url": "http://srv/api/fleet/download/pc-1?type=code-only&version=1.0.2",
},
)
assert plan is not None
assert plan["target_version"] == "1.0.2"
assert plan["update_type"] == "code-only"
def test_double_garde_pas_de_downgrade(self, mod):
# Même si le serveur dit update_available, le client revérifie semver :
# il ne descend JAMAIS vers une version <= locale (défense en profondeur).
plan = mod.should_update(
"1.0.5",
{"update_available": True, "latest_version": "1.0.2",
"update_type": "code-only", "url": "http://x"},
)
assert plan is None
def test_type_inconnu_normalise_code_only(self, mod):
plan = mod.should_update(
"1.0.1",
{"update_available": True, "latest_version": "1.0.2",
"update_type": "weird", "url": "http://x"},
)
assert plan["update_type"] == "code-only"
def test_response_malformee_pas_de_crash(self, mod):
assert mod.should_update("1.0.1", {}) is None
assert mod.should_update("1.0.1", None) is None
assert mod.should_update("1.0.1", {"update_available": True}) is None
# ---------------------------------------------------------------------------
# download_update — downloader INJECTABLE, SHA256, aucun réseau réel
# ---------------------------------------------------------------------------
class TestDownloadUpdate:
def test_telecharge_et_verifie_sha256_ok(self, mod, tmp_path):
payload = b"PK\x03\x04 fake zip bytes"
sha = hashlib.sha256(payload).hexdigest()
calls = {}
def fake_downloader(url):
calls["url"] = url
return payload
plan = {
"target_version": "1.0.2",
"update_type": "code-only",
"url": "http://srv/dl?version=1.0.2",
"sha256": sha,
}
result = mod.download_update(
plan, staging_dir=tmp_path, downloader=fake_downloader
)
assert result["ok"] is True
assert calls["url"] == "http://srv/dl?version=1.0.2"
# Le ZIP est écrit dans le staging (Lea_next-like), PAS dans les fichiers vivants.
staged = Path(result["staged_zip"])
assert staged.exists()
assert staged.read_bytes() == payload
assert staged.parent == tmp_path
def test_sha256_mismatch_rejette_et_nettoie(self, mod, tmp_path):
payload = b"corrupted"
def fake_downloader(url):
return payload
plan = {
"target_version": "1.0.2",
"update_type": "code-only",
"url": "http://x",
"sha256": "0" * 64, # ne correspond pas
}
result = mod.download_update(
plan, staging_dir=tmp_path, downloader=fake_downloader
)
assert result["ok"] is False
assert "sha256" in result["error"].lower()
# Aucun ZIP corrompu laissé dans le staging.
assert list(tmp_path.glob("*.zip")) == []
def test_sha256_absent_accepte_avec_avertissement(self, mod, tmp_path):
# Pas de sha256 fourni : best-effort, on accepte mais on le signale.
payload = b"PK no-sha"
plan = {
"target_version": "1.0.2",
"update_type": "code-only",
"url": "http://x",
}
result = mod.download_update(
plan, staging_dir=tmp_path, downloader=lambda u: payload
)
assert result["ok"] is True
assert result.get("sha256_verified") is False
def test_downloader_leve_pas_de_crash(self, mod, tmp_path):
def boom(url):
raise RuntimeError("réseau down")
plan = {"target_version": "1.0.2", "update_type": "code-only",
"url": "http://x", "sha256": "x"}
result = mod.download_update(plan, staging_dir=tmp_path, downloader=boom)
assert result["ok"] is False
assert "error" in result
# ---------------------------------------------------------------------------
# Stubs réservés à la révision humaine — DOIVENT être no-op explicites
# ---------------------------------------------------------------------------
class TestDangerousPartsAreStubs:
def test_apply_update_est_un_stub_non_implemente(self, mod, tmp_path):
# Le swap réel est réservé révision humaine : le stub NE TOUCHE RIEN
# et signale qu'il n'est pas implémenté.
result = mod.apply_update(
{"target_version": "1.0.2", "update_type": "code-only",
"staged_zip": str(tmp_path / "x.zip")}
)
assert result["applied"] is False
assert "human" in result["reason"].lower() or "supervis" in result["reason"].lower()
def test_write_boot_ok_marker_est_un_stub(self, mod):
result = mod.write_boot_ok_marker("1.0.2")
assert result["written"] is False

View File

@@ -0,0 +1,68 @@
"""Tests de l'orchestrateur extract_dossier_from_image.
Enchaîne OCR → tokens_from_grid → map_roles → assess_quality. L'OCR (`ocr_fn`)
et le client VLM (`vlm_client`) sont INJECTABLES → testable sans réseau ni OCR
réel. C'est cette fonction que le handler runtime `_handle_extract_dossier_action`
appellera (avec le vrai OCR et le vrai client vLLM).
"""
from core.extraction.role_mapper import extract_dossier_from_image
def _cell(text, x0, conf=0.9, row=0, col=0):
return {"text": text, "bbox": [[x0, 0], [x0 + 10, 0], [x0 + 10, 8], [x0, 8]],
"confidence": conf, "row": row, "col": col}
def _fake_vlm(response):
def client(image_path, prompt):
return response
return client
def test_orchestre_ocr_vlm_qualite():
grid = [[_cell("DUPONT", 0, conf=0.95, col=0), _cell("Jean", 20, conf=0.9, col=1)]]
res = extract_dossier_from_image(
"img.png",
_fake_vlm('{"champs":[{"label":"Nom complet","value_ids":[0,1]}]}'),
ocr_fn=lambda path: grid,
)
assert len(res["fields"]) == 1
assert res["fields"][0].value == "DUPONT Jean"
assert res["fields"][0].anchored is True
assert res["status"] in ("complete", "partial", "needs_review", "failed")
assert res["n_tokens"] == 2
def test_ocr_vide_donne_failed():
res = extract_dossier_from_image(
"img.png",
_fake_vlm('{"champs":[]}'),
ocr_fn=lambda path: [],
)
assert res["status"] == "failed"
assert res["fields"] == []
def test_status_needs_review_si_role_requis_absent():
grid = [[_cell("X", 0)]]
res = extract_dossier_from_image(
"img.png",
_fake_vlm('{"champs":[{"label":"Autre","value_ids":[0]}]}'),
ocr_fn=lambda path: grid,
required_roles=["Nom"],
)
assert res["status"] == "needs_review"
def test_roles_transmis_au_vlm():
grid = [[_cell("X", 0)]]
captured = {}
def client(image_path, prompt):
captured["prompt"] = prompt
return '{"champs":[]}'
extract_dossier_from_image(
"img.png", client, ocr_fn=lambda path: grid, roles=["Diagnostic", "GEMSA"],
)
assert "Diagnostic" in captured["prompt"] and "GEMSA" in captured["prompt"]

View File

@@ -0,0 +1,75 @@
"""Tests unitaires pour _resolve_lea_zip_template (DETTE-024).
La fonction est injectable (full_path, legacy_path en paramètres)
→ testable sans instancier Flask ni lire le vrai deploy/.
Pattern anti-DETTE-013 : os.environ.setdefault avant l'import du module.
"""
import os
os.environ.setdefault("DASHBOARD_AUTH_DISABLED", "true")
import pytest # noqa: E402
from web_dashboard.app import _resolve_lea_zip_template # noqa: E402
class TestResolveLéaZipTemplate:
"""DETTE-024 — sélection du ZIP template pour le download fleet."""
def test_full_present_retourne_full(self, tmp_path):
"""Si le ZIP complet autoportant est présent, il est retourné."""
full = tmp_path / "Lea_full_v1.0.1.zip"
legacy = tmp_path / "Lea_v1.0.0.zip"
full.write_bytes(b"full-stub")
legacy.write_bytes(b"legacy-stub")
result = _resolve_lea_zip_template(full_path=full, legacy_path=legacy)
assert result == full, f"Attendu full ({full}), obtenu {result}"
def test_full_absent_retourne_legacy_avec_warning(self, tmp_path, caplog):
"""Si le ZIP complet est absent, le legacy est retourné + WARNING loggué.
Le WARNING est le signal observable en production (DETTE-024) :
sans lui, le fallback silencieux rendait le problème invisible.
"""
import logging
full = tmp_path / "Lea_full_v1.0.1.zip"
legacy = tmp_path / "Lea_v1.0.0.zip"
# full intentionnellement absent
legacy.write_bytes(b"legacy-stub")
with caplog.at_level(logging.WARNING):
result = _resolve_lea_zip_template(full_path=full, legacy_path=legacy)
assert result == legacy, f"Attendu legacy ({legacy}), obtenu {result}"
# Le WARNING DETTE-024 doit apparaître dans les logs
assert any(
"DETTE-024" in record.message for record in caplog.records
), (
"Un WARNING DETTE-024 doit être émis quand le ZIP complet est absent "
f"(logs: {[r.message for r in caplog.records]})"
)
def test_full_et_legacy_absents_retourne_none(self, tmp_path):
"""Si aucun ZIP n'existe, retourne None (la route renvoie 500)."""
full = tmp_path / "Lea_full_v1.0.1.zip"
legacy = tmp_path / "Lea_v1.0.0.zip"
# aucun des deux créés
result = _resolve_lea_zip_template(full_path=full, legacy_path=legacy)
assert result is None, f"Attendu None, obtenu {result}"
def test_full_prime_sur_legacy(self, tmp_path):
"""Le full est retourné même si le legacy existe aussi (priorité correcte)."""
full = tmp_path / "Lea_full_v1.0.1.zip"
legacy = tmp_path / "Lea_v1.0.0.zip"
full.write_bytes(b"full-stub")
legacy.write_bytes(b"legacy-stub")
result = _resolve_lea_zip_template(full_path=full, legacy_path=legacy)
assert result == full
assert result != legacy

View File

@@ -0,0 +1,135 @@
"""TDD — DETTE-022 MAJ silencieuse v2 : logique PURE serveur de décision d'update.
Périmètre testé ICI = parties PURES, testables sans démarrer le serveur
(DETTE-013 : on N'IMPORTE PAS `api_stream` — on charge le module
`update_check.py` par chemin, comme test_agent_v1_log_shipper).
Couvre :
- R3 `parse_version()` : tuple d'entiers, "1.0.2" < "1.0.10", égalité,
"v1.2.3"/espaces tolérés, format invalide → fallback sans crash.
- R2 logique de décision PURE `decide_update()` : compare version courante
vs dernière dispo, choisit `update_type` (code-only/full), construit la
réponse `{update_available, latest_version, update_type, url}`.
Le NOYAU dangereux (swap fichiers / Lea.bat / restart) est HORS périmètre.
"""
import importlib.util
from pathlib import Path
import pytest
_MOD_PATH = (
Path(__file__).resolve().parents[2]
/ "agent_v0" / "server_v1" / "update_check.py"
)
def _load_module():
spec = importlib.util.spec_from_file_location("rpa_update_check", _MOD_PATH)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@pytest.fixture
def mod():
return _load_module()
# ---------------------------------------------------------------------------
# R3 — parse_version : tuple d'entiers (semver), pas comparaison lexicale
# ---------------------------------------------------------------------------
class TestParseVersion:
def test_parse_basique(self, mod):
assert mod.parse_version("1.0.2") == (1, 0, 2)
assert mod.parse_version("1.0.10") == (1, 0, 10)
def test_ordre_semver_pas_lexical(self, mod):
# Le bug classique : "1.0.2" < "1.0.10" est FAUX en lexicographique.
assert mod.parse_version("1.0.2") < mod.parse_version("1.0.10")
assert mod.parse_version("1.0.10") > mod.parse_version("1.0.2")
assert mod.parse_version("2.0.0") > mod.parse_version("1.99.99")
def test_egalite(self, mod):
assert mod.parse_version("1.0.1") == mod.parse_version("1.0.1")
def test_prefixe_v_et_espaces_toleres(self, mod):
assert mod.parse_version("v1.2.3") == mod.parse_version("1.2.3")
assert mod.parse_version(" 1.2.3 ") == (1, 2, 3)
assert mod.parse_version("V1.2.3") == (1, 2, 3)
def test_format_invalide_fallback_sans_crash(self, mod):
# Ne doit jamais lever — fallback (0,) (= la plus basse).
assert mod.parse_version("") == (0,)
assert mod.parse_version("abc") == (0,)
assert mod.parse_version(None) == (0,)
assert mod.parse_version("1.x.3") == (0,)
# Une version valide reste toujours > au fallback invalide.
assert mod.parse_version("0.0.1") > mod.parse_version("garbage")
def test_is_newer_helper(self, mod):
assert mod.is_newer("1.0.2", "1.0.1") is True
assert mod.is_newer("1.0.10", "1.0.2") is True
assert mod.is_newer("1.0.1", "1.0.1") is False
assert mod.is_newer("1.0.0", "1.0.1") is False
# ---------------------------------------------------------------------------
# R2 — decide_update : logique PURE de décision serveur
# ---------------------------------------------------------------------------
class TestDecideUpdate:
def test_pas_de_maj_si_a_jour(self, mod):
resp = mod.decide_update(current_version="1.0.1", latest_version="1.0.1")
assert resp["update_available"] is False
assert resp["latest_version"] == "1.0.1"
assert resp["update_type"] is None
assert resp["url"] is None
def test_pas_de_maj_si_client_plus_recent(self, mod):
# Client en avance (dev local) → jamais de downgrade.
resp = mod.decide_update(current_version="1.0.5", latest_version="1.0.2")
assert resp["update_available"] is False
def test_maj_disponible_code_only_par_defaut(self, mod):
resp = mod.decide_update(current_version="1.0.1", latest_version="1.0.2")
assert resp["update_available"] is True
assert resp["latest_version"] == "1.0.2"
# R2 : code-only = défaut (99% des cas, ~500 Ko).
assert resp["update_type"] == "code-only"
assert "1.0.2" in resp["url"]
assert "code-only" in resp["url"]
def test_maj_full_si_demande(self, mod):
resp = mod.decide_update(
current_version="1.0.1", latest_version="1.1.0", update_type="full"
)
assert resp["update_available"] is True
assert resp["update_type"] == "full"
assert "full" in resp["url"]
def test_update_type_invalide_retombe_sur_code_only(self, mod):
resp = mod.decide_update(
current_version="1.0.1", latest_version="1.0.2", update_type="banana"
)
assert resp["update_type"] == "code-only"
def test_ordre_semver_dans_decision(self, mod):
# 1.0.2 < 1.0.10 → MAJ dispo (pas de faux négatif lexical).
resp = mod.decide_update(current_version="1.0.2", latest_version="1.0.10")
assert resp["update_available"] is True
def test_url_inclut_machine_id_si_fourni(self, mod):
resp = mod.decide_update(
current_version="1.0.1", latest_version="1.0.2", machine_id="pc-7"
)
assert "pc-7" in resp["url"]
def test_versions_invalides_pas_de_crash_pas_de_maj(self, mod):
# latest illisible → on ne propose RIEN (prudence : pas de MAJ douteuse).
resp = mod.decide_update(current_version="1.0.1", latest_version="garbage")
assert resp["update_available"] is False
resp2 = mod.decide_update(current_version="", latest_version="")
assert resp2["update_available"] is False

View File

@@ -0,0 +1,65 @@
"""Tests du client vLLM serveur (image + prompt -> texte).
Le POST réseau est injectable (`post_fn`) → testable sans vLLM. Sert de
`vlm_client` à `extract_dossier_from_image` dans le handler runtime.
"""
import pytest
from core.extraction.vlm_client import build_chat_body, img_data_url, make_vllm_client
def _png(tmp_path, w=2000, h=1000):
from PIL import Image
p = tmp_path / "x.png"
Image.new("RGB", (w, h), (255, 255, 255)).save(p)
return str(p)
class _Resp:
def __init__(self, code, payload=None, text=""):
self.status_code = code
self._p = payload or {}
self.text = text
def json(self):
return self._p
def test_img_data_url_downscale(tmp_path):
url = img_data_url(_png(tmp_path), max_w=1280)
assert url.startswith("data:image/png;base64,")
def test_build_chat_body_structure(tmp_path):
body = build_chat_body(_png(tmp_path), "PROMPT", model="M", max_tokens=1500, max_w=1280)
assert body["model"] == "M"
assert body["max_tokens"] == 1500
# thinking désactivé (vérifié hier : think=on -> vide/lent)
assert body["chat_template_kwargs"]["enable_thinking"] is False
content = body["messages"][0]["content"]
assert any(c["type"] == "image_url" for c in content)
assert any(c["type"] == "text" and c["text"] == "PROMPT" for c in content)
def test_client_retourne_content(tmp_path):
captured = {}
def fake_post(url, json=None, headers=None, timeout=None):
captured["url"] = url
captured["body"] = json
return _Resp(200, {"choices": [{"message": {"content": "REPONSE"}}]})
client = make_vllm_client(model="M", post_fn=fake_post)
out = client(_png(tmp_path), "PROMPT")
assert out == "REPONSE"
assert "/v1/chat/completions" in captured["url"]
assert captured["body"]["messages"][0]["content"][1]["text"] == "PROMPT"
def test_client_erreur_status_leve(tmp_path):
def fake_post(url, json=None, headers=None, timeout=None):
return _Resp(500, text="boom")
client = make_vllm_client(post_fn=fake_post)
with pytest.raises(RuntimeError):
client(_png(tmp_path), "PROMPT")

View File

@@ -2231,16 +2231,37 @@ _LEA_ZIP_TEMPLATE_FULL = BASE_PATH / "deploy" / "build" / "Lea_full_v1.0.1.zip"
_LEA_ZIP_TEMPLATE_LEGACY = BASE_PATH / "deploy" / "Lea_v1.0.0.zip" _LEA_ZIP_TEMPLATE_LEGACY = BASE_PATH / "deploy" / "Lea_v1.0.0.zip"
def _resolve_lea_zip_template(): def _resolve_lea_zip_template(
full_path: Path = _LEA_ZIP_TEMPLATE_FULL,
legacy_path: Path = _LEA_ZIP_TEMPLATE_LEGACY,
) -> "Path | None":
"""Résout le ZIP à servir, à la volée (le complet peut être buildé """Résout le ZIP à servir, à la volée (le complet peut être buildé
après le démarrage du dashboard). Préfère le ZIP complet autoportant ; après le démarrage du dashboard). Préfère le ZIP complet autoportant ;
retombe sur l'ancien ZIP léger uniquement s'il existe. retombe sur l'ancien ZIP léger uniquement s'il existe.
Retourne None si aucun template n'est présent. Retourne None si aucun template n'est présent.
Les paramètres full_path/legacy_path sont injectables pour les tests
(évite de démarrer Flask — DETTE-013).
⚠️ DETTE-024 : si le ZIP complet est absent, un avertissement est loggué
explicitement pour ne pas masquer silencieusement l'absence du full.
""" """
if _LEA_ZIP_TEMPLATE_FULL.exists(): if full_path.exists():
return _LEA_ZIP_TEMPLATE_FULL return full_path
if _LEA_ZIP_TEMPLATE_LEGACY.exists(): # Full absent → fallback sur le legacy, mais log d'avertissement obligatoire.
return _LEA_ZIP_TEMPLATE_LEGACY if legacy_path.exists():
try:
api_logger.warning(
"DETTE-024 — ZIP Léa complet autoportant ABSENT (%s) ; "
"fallback sur ZIP léger NON autoportant (%s). "
"Le poste recevra un ZIP sans Python embarqué → non installable "
"sans Python système. Exécuter deploy/build_package_full.sh.",
full_path,
legacy_path,
)
except Exception:
pass # api_logger pas encore initialisé au module load (import tardif ok)
return legacy_path
return None return None
@@ -2389,8 +2410,14 @@ def download_agent_package(machine_id):
# Sécurité : l'auth Basic est déjà gérée par before_request # Sécurité : l'auth Basic est déjà gérée par before_request
# 1. Résoudre + vérifier que le ZIP template existe (à la volée) # 1. Résoudre + vérifier que le ZIP template existe (à la volée)
# _resolve_lea_zip_template() logue un WARNING si le full est absent (DETTE-024).
zip_template = _resolve_lea_zip_template() zip_template = _resolve_lea_zip_template()
if zip_template is None: if zip_template is None:
api_logger.error(
"download_agent_package(%s) — aucun ZIP template présent. "
"full=%s legacy=%s",
machine_id, _LEA_ZIP_TEMPLATE_FULL, _LEA_ZIP_TEMPLATE_LEGACY,
)
return jsonify({ return jsonify({
'error': 'ZIP template introuvable', 'error': 'ZIP template introuvable',
'detail': ( 'detail': (
@@ -2399,6 +2426,13 @@ def download_agent_package(machine_id):
'autoportant) ou deploy/build_package.sh (ZIP léger).' 'autoportant) ou deploy/build_package.sh (ZIP léger).'
), ),
}), 500 }), 500
is_full = (zip_template == _LEA_ZIP_TEMPLATE_FULL)
zip_kind = "full-autoportant" if is_full else "legacy-léger⚠"
api_logger.info(
"download_agent_package(%s) — ZIP sélectionné : %s (%s, %d Ko)",
machine_id, zip_template.name, zip_kind,
zip_template.stat().st_size // 1024,
)
# 2. Vérifier que le machine_id est enregistré # 2. Vérifier que le machine_id est enregistré
agent = _fetch_fleet_agent(machine_id) agent = _fetch_fleet_agent(machine_id)