# agent_v1/network/updater.py """NOYAU client de la mise à jour silencieuse de Léa (DETTE-022 v2). GATED — flag `RPA_AUTO_UPDATE_ENABLED` (défaut OFF). Tant qu'il est OFF, rien ne se déclenche : l'intégration de ce module au runtime (boucle de poll de `main.py`) ne fait aucune MAJ. Ce module ne contient que les parties PURES / testables, sans réseau réel : - `parse_version` / `is_newer` (R3) : self-contained (le bundle client n'embarque PAS `server_v1` — duplication assumée, même algorithme). - `should_update(local_version, server_response)` : décide « faut-il updater ? quelle version/type ? » à partir de la réponse serveur. Double garde semver côté client (jamais de downgrade) = défense en profondeur. - `download_update(plan, staging_dir, downloader)` : télécharge le ZIP via un `downloader` callable INJECTABLE (aucun réseau réel en test), vérifie le SHA256, écrit le ZIP dans le **staging** (`Lea_next\\`-like) — JAMAIS dans les fichiers vivants. Retourne un plan d'application. - `auto_update_enabled()` : lit le flag (défaut OFF). ⚠️ SWAP — répartition claire des responsabilités : `apply_update` / `write_boot_ok_marker` ci-dessous ne font que l'ARMEMENT côté Python (extraction vers `agent_v1_new/` + marqueurs) — ils n'écrasent JAMAIS un fichier vivant. Le remplacement ATOMIQUE (renames), le redémarrage et le rollback sont faits HORS-PROCESS par `Lea.bat` au démarrage (revu ligne à ligne). Pattern d'import / résilience aligné sur `log_shipper.py` (même branche). Branche feat/push-log-dgx. """ from __future__ import annotations import hashlib import json import logging import os import shutil from pathlib import Path from typing import Callable, Optional, Tuple logger = logging.getLogger(__name__) # Niveaux de livraison (R2). `code-only` par défaut = 99 % des MAJ (~500 Ko). VALID_UPDATE_TYPES = ("code-only", "full") DEFAULT_UPDATE_TYPE = "code-only" _FALLBACK_VERSION: Tuple[int, ...] = (0,) # --------------------------------------------------------------------------- # Flag d'activation — OFF par défaut (lu à chaque appel pour faciliter tests) # --------------------------------------------------------------------------- def auto_update_enabled() -> bool: """True si la MAJ auto client est activée (flag RPA_AUTO_UPDATE_ENABLED). Défaut PRUDENT = OFF. On l'active poste par poste via config.txt / variable d'environnement, sans rebuild de l'installateur (même esprit que LOG_SHIP_ENABLED). """ return os.environ.get("RPA_AUTO_UPDATE_ENABLED", "false").lower() in ( "true", "1", "yes", "on", ) # --------------------------------------------------------------------------- # R3 — parse_version self-contained (le bundle client n'a pas server_v1) # --------------------------------------------------------------------------- def parse_version(v) -> Tuple[int, ...]: """Parse une version semver en tuple d'entiers. Voir server_v1/update_check. "1.0.2" → (1, 0, 2) ; "1.0.10" → (1, 0, 10) ; "v1.2.3" → (1, 2, 3). Tolérant et SANS exception : invalide → fallback `(0,)`. """ if not isinstance(v, str): return _FALLBACK_VERSION s = v.strip().lstrip("vV").strip() if not s: return _FALLBACK_VERSION try: from packaging.version import Version return tuple(Version(s).release) except Exception: pass try: return tuple(int(x) for x in s.split(".")) except (ValueError, AttributeError): return _FALLBACK_VERSION def is_newer(candidate: str, baseline: str) -> bool: """True si `candidate` strictement plus récent que `baseline` (semver).""" return parse_version(candidate) > parse_version(baseline) def _normalize_update_type(update_type) -> str: if update_type in VALID_UPDATE_TYPES: return update_type return DEFAULT_UPDATE_TYPE # --------------------------------------------------------------------------- # Décision client : faut-il updater ? # --------------------------------------------------------------------------- def should_update(local_version: str, server_response) -> Optional[dict]: """Décide à partir de la réponse serveur s'il faut updater. Args: local_version : version courante du client (config.AGENT_VERSION). server_response : dict renvoyé par l'endpoint serveur {update_available, latest_version, update_type, url, [sha256]}. Returns: Un PLAN d'update `{target_version, update_type, url, sha256}` si une MAJ valide est à faire, sinon None. Défense en profondeur : même si `update_available` est True, le client REVÉRIFIE en semver (`is_newer`) — il ne descend JAMAIS vers une version <= locale. Tolérant : réponse malformée → None (jamais d'exception). """ if not isinstance(server_response, dict): return None if not server_response.get("update_available"): return None target = server_response.get("latest_version") url = server_response.get("url") if not target or not url: return None # Double garde semver : pas de downgrade, pas d'égalité. if not is_newer(target, local_version): return None return { "target_version": target, "update_type": _normalize_update_type(server_response.get("update_type")), "url": url, "sha256": server_response.get("sha256"), } # --------------------------------------------------------------------------- # Téléchargement — downloader INJECTABLE, SHA256, staging only # --------------------------------------------------------------------------- def _default_downloader(url: str) -> bytes: """Téléchargement réel du ZIP (best-effort, pattern streamer/log_shipper). Résout l'URL relative contre SERVER_BASE, ajoute le Bearer si présent. INJECTABLE : remplacé par un fake en test (aucun réseau réel). """ import requests # import tardif (absent de certains envs de test) full_url = url headers = {} try: from ..config import SERVER_BASE, API_TOKEN if url.startswith("/"): full_url = f"{SERVER_BASE}{url}" if API_TOKEN: headers["Authorization"] = f"Bearer {API_TOKEN}" except Exception: # Hors package (test isolé) : on utilise l'URL telle quelle. pass resp = requests.get(full_url, headers=headers, timeout=30, stream=False) resp.raise_for_status() return resp.content def download_update( plan: dict, staging_dir, downloader: Optional[Callable[[str], bytes]] = None, ) -> dict: """Télécharge le ZIP d'update dans le staging et vérifie son intégrité. NE TOUCHE PAS aux fichiers vivants : écrit uniquement dans `staging_dir` (équivalent de `Lea_next\\`). L'application réelle (swap) est un stub réservé révision humaine (voir `apply_update`). Args: plan : sortie de `should_update` (target_version, update_type, url, sha256). staging_dir : dossier de staging (créé si absent). downloader : callable `(url) -> bytes` INJECTABLE (défaut = HTTP réel). Returns: Succès : {ok: True, staged_zip: str, update_type, target_version, sha256_verified: bool} Échec : {ok: False, error: str} Best-effort : aucune exception ne remonte ; un échec laisse le staging propre (pas de ZIP corrompu). """ dl = downloader if downloader is not None else _default_downloader staging = Path(staging_dir) try: data = dl(plan["url"]) except Exception as e: logger.warning("Téléchargement update échoué : %s", e) return {"ok": False, "error": f"download_failed: {e}"} expected_sha = (plan.get("sha256") or "").strip().lower() sha256_verified = False if expected_sha: actual = hashlib.sha256(data).hexdigest() if actual != expected_sha: logger.warning( "SHA256 mismatch update (attendu=%s, obtenu=%s) — rejeté", expected_sha, actual, ) return {"ok": False, "error": "sha256 mismatch — ZIP rejeté"} sha256_verified = True else: # Best-effort : pas de SHA fourni → on accepte mais on le signale. logger.info("Pas de SHA256 fourni pour l'update — intégrité non vérifiée") try: staging.mkdir(parents=True, exist_ok=True) target_version = plan.get("target_version", "unknown") staged_zip = staging / f"lea_update_{target_version}.zip" staged_zip.write_bytes(data) except Exception as e: logger.warning("Écriture ZIP staging échouée : %s", e) return {"ok": False, "error": f"staging_write_failed: {e}"} return { "ok": True, "staged_zip": str(staged_zip), "update_type": _normalize_update_type(plan.get("update_type")), "target_version": plan.get("target_version"), "sha256_verified": sha256_verified, } # --------------------------------------------------------------------------- # Interrogation serveur — checker INJECTABLE (GET /agents/update/check) # --------------------------------------------------------------------------- def _default_update_checker(local_version: str, machine_id: str): """Interroge le serveur : y a-t-il une MAJ ? (best-effort, INJECTABLE). GET SERVER_URL/agents/update/check?current_version=..&machine_id=.. (endpoint gated côté serveur — 503 si RPA_AUTO_UPDATE_SERVER_ENABLED OFF, auquel cas on renvoie None : pas de MAJ). Bearer si présent. Pattern aligné sur `log_shipper._default_sender`. INJECTABLE : remplacé par un fake en test. Returns: Le dict réponse serveur (`should_update` sait le lire), ou None si indisponible / gated / erreur (jamais d'exception ne remonte). """ try: import requests # import tardif headers = {} try: from ..config import SERVER_URL, API_TOKEN base = SERVER_URL if API_TOKEN: headers["Authorization"] = f"Bearer {API_TOKEN}" except Exception: base = "" url = f"{base}/agents/update/check" resp = requests.get( url, params={"current_version": local_version, "machine_id": machine_id}, headers=headers, timeout=10, allow_redirects=False, ) # 503 = endpoint gated OFF côté serveur → pas de MAJ (silencieux). if resp.status_code == 503: return None if not resp.ok: logger.debug("update/check HTTP %s", resp.status_code) return None return resp.json() except Exception as e: logger.debug("update/check indisponible : %s", e) return None # --------------------------------------------------------------------------- # Orchestrateur GATED — check → décide → download (staging) → stub apply # --------------------------------------------------------------------------- def run_update_cycle( local_version: str, machine_id: str, staging_dir, checker: Optional[Callable[[str, str], object]] = None, downloader: Optional[Callable[[str], bytes]] = None, app_dir=None, ) -> dict: """Un cycle complet de MAJ silencieuse — GATED, best-effort, SANS swap. Enchaîne : 1. GATE `auto_update_enabled()` (RPA_AUTO_UPDATE_ENABLED, défaut OFF) — si OFF, ne fait STRICTEMENT rien (aucun appel réseau). 2. `checker(local_version, machine_id)` → réponse serveur (canary-aware). 3. `should_update(...)` → plan (double garde semver, jamais de downgrade). 4. `download_update(...)` → ZIP dans le STAGING + vérif SHA256. Ne touche JAMAIS les fichiers vivants. 5. `apply_update` ARME le swap (extraction `agent_v1_new/` + marqueur UPDATE_READY) mais NE swappe PAS : le remplacement atomique + le redémarrage sont faits par Lea.bat au prochain démarrage. `applied` reste False tant que Léa n'a pas redémarré sur la nouvelle version. Jamais d'exception ne remonte (ne doit JAMAIS casser Léa). Retourne un dict d'état pour le diagnostic / le log : status ∈ {disabled, check_failed, up_to_date, download_failed, staged} Args: checker : callable `(local_version, machine_id) -> dict|None` INJECTABLE (défaut = HTTP réel vers l'endpoint gated). downloader : callable `(url) -> bytes` INJECTABLE (défaut = HTTP réel). """ if not auto_update_enabled(): return {"status": "disabled", "applied": False} chk = checker if checker is not None else _default_update_checker try: server_response = chk(local_version, machine_id) except Exception as e: logger.warning("update check a levé : %s", e) return {"status": "check_failed", "applied": False, "error": str(e)} plan = should_update(local_version, server_response) if plan is None: return {"status": "up_to_date", "applied": False} staged = download_update(plan, staging_dir, downloader=downloader) if not staged.get("ok"): return { "status": "download_failed", "applied": False, "error": staged.get("error"), } # Armement du swap : extraction du ZIP vers agent_v1_new\ + marqueur # UPDATE_READY. Le swap ATOMIQUE (renames) et le redémarrage sont faits # HORS-PROCESS par Lea.bat au prochain démarrage — JAMAIS depuis ici # (on n'écrase pas les fichiers d'un Léa en cours d'exécution). armed = apply_update(staged, app_dir=app_dir) return { "status": "armed" if armed.get("armed") else "arm_failed", "applied": False, # le swap effectif est fait par Lea.bat, pas ici "armed": bool(armed.get("armed", False)), "target_version": staged.get("target_version"), "update_type": staged.get("update_type"), "staged_zip": staged.get("staged_zip"), "sha256_verified": staged.get("sha256_verified", False), "marker": armed.get("marker"), "error": armed.get("error"), } # =========================================================================== # SWAP — côté Python : ARMEMENT SEULEMENT (extraction + marqueurs). # Le remplacement ATOMIQUE des fichiers vivants + le redémarrage + le # rollback sont faits HORS-PROCESS par `Lea.bat` au démarrage (renames). # Python n'écrase JAMAIS les fichiers d'un Léa en cours d'exécution. # =========================================================================== def _resolve_app_dir(app_dir) -> Path: """Répertoire d'install (contient `agent_v1/`, `run_agent_v1.py`, `Lea.bat`). INJECTABLE (tests : tmp_path). Défaut = parent du package agent_v1. """ if app_dir is not None: return Path(app_dir) try: from ..config import BASE_DIR # BASE_DIR = dossier du package agent_v1 return Path(BASE_DIR).parent except Exception: return Path(__file__).resolve().parent.parent.parent def apply_update(prepared: dict, app_dir=None) -> dict: """ARME le swap : extrait le ZIP staging vers `agent_v1_new/` + marqueur. NE swappe PAS et NE redémarre PAS (c'est le rôle de `Lea.bat`). Écrit uniquement à côté des fichiers vivants (dossier neuf + marqueur), donc l'opération est sûre même sur un Léa en cours d'exécution. 1. Extrait `prepared["staged_zip"]` → `/agent_v1_new/` (nettoyé au préalable ; garde-fou zip-slip). 2. Écrit `/UPDATE_READY` (JSON : version, type, chemins) que `Lea.bat` lira au prochain démarrage pour faire le swap atomique. Best-effort : aucune exception ne remonte (ne doit jamais casser Léa). Returns: succès : {armed: True, applied: False, target_version, update_type, marker, extracted_to} échec : {armed: False, applied: False, error} """ if not isinstance(prepared, dict): return {"armed": False, "applied": False, "error": "prepared invalide"} staged_zip = prepared.get("staged_zip") target_version = prepared.get("target_version", "unknown") update_type = _normalize_update_type(prepared.get("update_type")) try: root = _resolve_app_dir(app_dir) zip_path = Path(staged_zip) if staged_zip else None if zip_path is None or not zip_path.is_file(): return {"armed": False, "applied": False, "error": "ZIP staging introuvable"} new_dir = root / "agent_v1_new" if new_dir.exists(): shutil.rmtree(new_dir, ignore_errors=True) # nettoie un staging partiel new_dir.mkdir(parents=True, exist_ok=True) import zipfile new_root = new_dir.resolve() with zipfile.ZipFile(zip_path) as zf: for name in zf.namelist(): # garde-fou zip-slip (chemins ../) dest = (new_dir / name).resolve() if not str(dest).startswith(str(new_root)): shutil.rmtree(new_dir, ignore_errors=True) return {"armed": False, "applied": False, "error": f"zip-slip refusé : {name}"} zf.extractall(new_dir) marker = root / "UPDATE_READY" marker.write_text(json.dumps({ "target_version": target_version, "update_type": update_type, "extracted_to": str(new_dir), "staged_zip": str(zip_path), }), encoding="utf-8") logger.info( "Update ARMÉ : %s (%s) → %s ; swap au prochain démarrage (Lea.bat)", target_version, update_type, new_dir, ) return {"armed": True, "applied": False, "target_version": target_version, "update_type": update_type, "marker": str(marker), "extracted_to": str(new_dir)} except Exception as e: # noqa: BLE001 logger.warning("apply_update (armement) a échoué : %s", e) return {"armed": False, "applied": False, "error": f"arm_failed: {e}"} def write_boot_ok_marker(version: str, app_dir=None) -> dict: """Confirme un boot sain : écrit `boot_ok_{version}` + désarme le rollback. Appelé par `main.py` après ~90 s de tourne STABLE (liveness LOCALE, indépendante du DGX — évite un faux rollback quand le réseau est coupé). Retirer `PENDING_BOOT*` dit à `Lea.bat` que la nouvelle version a démarré correctement (sinon, au prochain lancement, Lea.bat rollback vers la version précédente). Best-effort : aucune exception ne remonte. """ try: root = _resolve_app_dir(app_dir) marker = root / f"boot_ok_{version}" marker.write_text("ok", encoding="utf-8") cleared = [] for p in root.glob("PENDING_BOOT*"): try: p.unlink() cleared.append(p.name) except OSError: pass logger.info("boot_ok écrit (%s) ; PENDING_BOOT retiré : %s", version, cleared or "aucun") return {"written": True, "marker": str(marker), "cleared_pending": cleared} except Exception as e: # noqa: BLE001 logger.warning("write_boot_ok_marker a échoué : %s", e) return {"written": False, "error": str(e)}