diff --git a/agent_v0/agent_v1/core/executor.py b/agent_v0/agent_v1/core/executor.py index a17b482fa..11b76062f 100644 --- a/agent_v0/agent_v1/core/executor.py +++ b/agent_v0/agent_v1/core/executor.py @@ -20,6 +20,7 @@ import os import threading import time import logging +from typing import Any, Dict, Optional # Forcer l'import de config AVANT pynput/mss pour garantir que le # DPI awareness est configure (SetProcessDpiAwareness(2) sur Windows). @@ -88,6 +89,11 @@ class ActionExecutorV1: self._api_token = os.environ.get("RPA_API_TOKEN", "") # Gestionnaire de notifications toast (pour les messages utilisateur) self._notification_manager = None + # Drapeau sécurité : positionné quand on détecte un dialogue système + # (UAC, CredUI, SmartScreen…). Lu par le caller pour signaler une + # pause supervisée au serveur (`paused_need_help`). + # Cf. core/system_dialog_guard.py + self._system_dialog_pause: Optional[Dict[str, Any]] = None # Log de la resolution physique pour le diagnostic DPI self._log_screen_info() @@ -537,6 +543,11 @@ class ActionExecutorV1: "visual_resolved": False, } + # Réinitialiser le drapeau dialogue système à chaque action + # (sinon une détection lors d'une action précédente ferait bail-out + # immédiat sur toutes les suivantes). + self._system_dialog_pause = None + # ── Bloc conditionnel : skip si le dialogue n'est pas apparu ── # Les actions marquées conditional_on_window ne s'exécutent que # si la fenêtre attendue est effectivement présente. Sinon → skip. @@ -594,6 +605,23 @@ class ActionExecutorV1: f"{int(action.get('y_pct', 0) * height)})" ) + # ── SÉCURITÉ : check proactif AVANT toute action ── + # Si un UAC / CredUI / SmartScreen est déjà à l'écran (apparu + # spontanément entre deux actions), on pause IMMÉDIATEMENT + # sans rien tenter. Clic / type / key_combo : tous bloqués. + # Cf. core/system_dialog_guard.py + if action_type in ("click", "type", "key_combo", "double_click", "right_click"): + if self._check_and_pause_on_system_dialog(context=f"pre_action_{action_type}"): + pause_info = self._system_dialog_pause or {} + result["success"] = False + result["error"] = ( + f"system_dialog:{pause_info.get('category', 'unknown')}" + ) + result["system_dialog"] = pause_info + result["needs_human"] = True + result["screenshot"] = self._capture_screenshot_b64() + return result + # Resolution visuelle des coordonnees si demande x_pct = action.get("x_pct", 0.0) y_pct = action.get("y_pct", 0.0) @@ -737,6 +765,27 @@ class ActionExecutorV1: popup_coords = observation.get("popup_coords") print(f" [OBSERVER] Popup détectée : '{popup_label}' — fermeture") logger.info(f"Observer : popup '{popup_label}' détectée avant résolution") + + # ── SÉCURITÉ : refuser de cliquer sur un dialogue système ── + # Avant de suivre les coordonnées du serveur (VLM-based, + # donc faillible) ou de rappeler le VLM local, on + # vérifie que la popup n'est PAS un UAC/CredUI/SmartScreen. + if self._check_and_pause_on_system_dialog( + context="observer_popup" + ): + # Dialogue système → on remonte la pause au replay. + # On renvoie le résultat immédiatement pour que le + # serveur passe en paused_need_help. + pause_info = self._system_dialog_pause or {} + result["success"] = False + result["error"] = ( + f"system_dialog:{pause_info.get('category', 'unknown')}" + ) + result["system_dialog"] = pause_info + result["needs_human"] = True + result["screenshot"] = self._capture_screenshot_b64() + return result + if popup_coords: real_x = int(popup_coords["x_pct"] * width) real_y = int(popup_coords["y_pct"] * height) @@ -745,7 +794,20 @@ class ActionExecutorV1: print(f" [OBSERVER] Popup fermée — reprise du flow normal") else: # Pas de coordonnées → fallback sur handle_popup_vlm classique + # (qui re-vérifie aussi system_dialog en interne) self._handle_popup_vlm() + # Si _handle_popup_vlm a détecté un dialogue système, + # on remonte la pause au replay. + if self._system_dialog_pause: + pause_info = self._system_dialog_pause + result["success"] = False + result["error"] = ( + f"system_dialog:{pause_info.get('category', 'unknown')}" + ) + result["system_dialog"] = pause_info + result["needs_human"] = True + result["screenshot"] = self._capture_screenshot_b64() + return result elif obs_state == "unexpected": # État inattendu (pas la bonne page/écran) @@ -840,6 +902,24 @@ class ActionExecutorV1: f"({policy_decision.reason})" ) + # ── SÉCURITÉ : si Policy a détecté un dialogue système + # pendant son _try_close_popup, on remonte la pause au + # serveur SANS tenter aucune action supplémentaire. + if self._system_dialog_pause: + pause_info = self._system_dialog_pause + logger.critical( + f"[POLICY] Dialogue système détecté par popup handler " + f"({pause_info.get('category')}) — pause supervisée" + ) + result["success"] = False + result["error"] = ( + f"system_dialog:{pause_info.get('category', 'unknown')}" + ) + result["system_dialog"] = pause_info + result["needs_human"] = True + result["screenshot"] = self._capture_screenshot_b64() + return result + if policy_decision.decision == Decision.RETRY: resolved2 = self._resolve_target_visual( server_url, target_spec, x_pct, y_pct, width, height @@ -1771,6 +1851,9 @@ Example: x_pct=0.50, y_pct=0.30""" "target_spec": result.get("target_spec"), # Correction humaine (mode apprentissage supervisé) "correction": result.get("correction"), + # Sécurité : dialogue système critique détecté (UAC, CredUI, SmartScreen) + "system_dialog": result.get("system_dialog"), + "needs_human": result.get("needs_human"), } try: resp2 = requests.post( @@ -1796,6 +1879,129 @@ Example: x_pct=0.50, y_pct=0.30""" return True + # ========================================================================= + # Garde-fou sécurité : dialogues système Windows (UAC, CredUI, SmartScreen) + # ========================================================================= + + def _check_and_pause_on_system_dialog(self, context: str = "") -> bool: + """Détecter un dialogue système critique et positionner la pause. + + Si un dialogue UAC, CredUI, SmartScreen (etc.) est actif, on : + - N'appelle JAMAIS le VLM sur l'image (évite de lui faire suggérer "Oui") + - Ne clique JAMAIS automatiquement + - Positionne `self._system_dialog_pause` pour que le caller signale + une pause supervisée au serveur + - Notifie l'utilisateur via systray + - Log l'événement pour audit + + Args: + context: Chaîne d'origine pour les logs (ex: "handle_popup_vlm", + "observer_popup_click"). + + Returns: + True si un dialogue système a été détecté (le caller doit + stopper toute action automatique). False sinon. + """ + try: + from .system_dialog_guard import detect_current_system_dialog + detection = detect_current_system_dialog() + except Exception as e: + # Fix P0-D : fail-closed (principe "faux positif tolérable, + # faux négatif catastrophique"). Si la détection échoue, on ne + # peut PAS affirmer que l'écran est sûr — on pause par précaution + # et on demande à l'humain. Un UAC non détecté à cause d'un bug + # de détection = vecteur d'attaque ransomware. + logger.critical( + f"[SYS-DIALOG] Erreur détection dialogue système " + f"(context={context}) : {e} — PAUSE SUPERVISÉE par précaution " + f"(fail-closed : impossible de garantir l'absence de dialogue " + f"système critique)" + ) + print( + f" [SÉCURITÉ] Vérification du garde-fou système a échoué " + f"— pause supervisée par précaution ({type(e).__name__})" + ) + # Positionner le flag de pause avec une catégorie dédiée pour que + # le caller (execute_replay_action) remonte "paused_need_help". + self._system_dialog_pause = { + "category": "unknown_check_failed", + "matched_signal": "exception", + "matched_value": type(e).__name__, + "reason": f"system_dialog_guard détection exception: {e}", + "context": context, + } + # Notification utilisateur best-effort. + try: + notifier = self.notifier + msg = ( + "Vérification du garde-fou système a échoué — " + "pause supervisée par précaution. Léa ne clique pas." + ) + if hasattr(notifier, "notify"): + notifier.notify( + title="Léa — sécurité", + message=msg, + timeout=10, + ) + elif hasattr(notifier, "error"): + notifier.error(msg) + except Exception as notify_err: + logger.debug(f"[SYS-DIALOG] Notification échouée : {notify_err}") + return True + + if not detection.is_system_dialog: + return False + + # Audit log : TOUJOURS tracer, même si la pause est redondante. + logger.critical( + f"[SYS-DIALOG] REFUS D'INTERACTION — {detection.category} " + f"détecté via {detection.matched_signal}='{detection.matched_value}' " + f"(context={context}). Pause supervisée demandée." + ) + print( + f" [SÉCURITÉ] Dialogue système détecté : {detection.category} " + f"— Léa NE CLIQUE PAS, intervention humaine requise" + ) + + # Positionner le flag pour le caller (execute_replay_action) + self._system_dialog_pause = { + "category": detection.category, + "matched_signal": detection.matched_signal, + "matched_value": detection.matched_value, + "reason": detection.reason, + "context": context, + } + + # Notification systray (best-effort, ne jamais planter dessus) + try: + cat_fr = { + "uac_consent": "élévation de privilèges (UAC)", + "windows_credential_prompt": "demande de mot de passe Windows", + "smartscreen": "alerte SmartScreen", + "windows_defender": "alerte Windows Defender", + "driver_install": "installation de pilote", + "security_toast": "notification de sécurité", + "unknown_system_dialog": "dialogue système inconnu", + }.get(detection.category, detection.category) + msg = ( + f"Dialogue système détecté ({cat_fr}) — " + f"intervention humaine requise. Léa ne clique pas." + ) + # On essaie d'abord un formateur explicite ; sinon fallback error + notifier = self.notifier + if hasattr(notifier, "notify"): + notifier.notify( + title="Léa — sécurité", + message=msg, + timeout=10, + ) + elif hasattr(notifier, "error"): + notifier.error(msg) + except Exception as e: + logger.debug(f"[SYS-DIALOG] Notification échouée : {e}") + + return True + # ========================================================================= # Gestion intelligente des popups imprévues (VLM) # ========================================================================= @@ -1817,9 +2023,22 @@ Example: x_pct=0.50, y_pct=0.30""" Une seule tentative par action (pas de boucle infinie). + **SÉCURITÉ** : avant toute interaction, on détecte les dialogues + système Windows critiques (UAC, CredUI, SmartScreen). Si un tel + dialogue est actif → pause supervisée immédiate, pas de VLM, pas + de clic automatique. Cf. system_dialog_guard.py. + Returns: True si une popup a été gérée (fermée), False sinon. + False aussi en cas de dialogue système → le caller doit traiter + `self._system_dialog_pause` pour signaler la pause au serveur. """ + # ── SÉCURITÉ : refus absolu de cliquer sur un dialogue système ── + # Un UAC / CredUI / SmartScreen ne doit JAMAIS recevoir de clic + # automatique. On détecte AVANT le VLM (coût minimal ~20ms UIA). + if self._check_and_pause_on_system_dialog(context="handle_popup_vlm"): + return False + # Capturer le screenshot actuel (résolution native pour template matching) screenshot_b64 = self._capture_screenshot_b64(max_width=0, quality=75) if not screenshot_b64: diff --git a/agent_v0/agent_v1/core/policy.py b/agent_v0/agent_v1/core/policy.py index f549e3b92..ca8df97da 100644 --- a/agent_v0/agent_v1/core/policy.py +++ b/agent_v0/agent_v1/core/policy.py @@ -85,6 +85,10 @@ class PolicyEngine: 2. Si retry déjà fait → demander à l'acteur gemma4 3. Selon gemma4 : SKIP, ABORT, ou SUPERVISE + **SÉCURITÉ** : si, pendant l'étape 1, le handler popup détecte un + dialogue système Windows (UAC, CredUI, SmartScreen…), on bascule + immédiatement en SUPERVISE. Cf. system_dialog_guard.py. + Args: action: L'action qui a échoué target_spec: La cible non trouvée @@ -96,6 +100,22 @@ class PolicyEngine: # ── Étape 1 : Tentative de fermeture popup (premier essai) ── if retry_count == 0: popup_handled = self._try_close_popup() + + # Si le popup handler a détecté un dialogue système, on + # bascule immédiatement en SUPERVISE — pas de retry, pas de + # gemma4 : on rend la main à l'humain. + if getattr(self._executor, "_system_dialog_pause", None): + sd = self._executor._system_dialog_pause + return PolicyDecision( + decision=Decision.SUPERVISE, + reason=( + f"Dialogue système détecté ({sd.get('category', '?')}) — " + f"refus d'interaction automatique" + ), + action_taken="system_dialog_blocked", + elapsed_ms=(time.time() - t_start) * 1000, + ) + if popup_handled: return PolicyDecision( decision=Decision.RETRY, diff --git a/agent_v0/agent_v1/core/system_dialog_guard.py b/agent_v0/agent_v1/core/system_dialog_guard.py new file mode 100644 index 000000000..6a4a932a0 --- /dev/null +++ b/agent_v0/agent_v1/core/system_dialog_guard.py @@ -0,0 +1,448 @@ +# agent_v1/core/system_dialog_guard.py +""" +Garde-fou sécurité : détection des dialogues système Windows critiques. + +============================================================================== +POURQUOI ? +============================================================================== + +Pendant un replay, si un dialogue UAC, CredUI (mot de passe Windows), +SmartScreen ou une notification de sécurité Windows apparaît, Léa pourrait +demander au VLM "quel bouton cliquer" et recevoir "Oui" en réponse. + +→ **Léa cliquerait OUI sur une élévation UAC** → vecteur d'attaque ransomware. + +Ce module fournit la détection de ces dialogues pour que l'exécuteur +**ne clique JAMAIS dessus automatiquement**. La décision est renvoyée à +l'humain (pause supervisée). + +============================================================================== +PRINCIPE +============================================================================== + +- **Faux positif tolérable** : on préfère pauser pour rien plutôt que cliquer + sur un UAC. +- **Faux négatif catastrophique** : mieux vaut être trop prudent. +- **Multi-signal** : titre, ClassName UIA, nom de processus, parent_path. + Un seul signal suffit à bloquer. +- **Compatible Citrix** : les dialogues UAC d'un client Citrix apparaissent + aussi dans la VM distante — la détection par classe UIA fonctionne. + +============================================================================== +PATTERNS DE DÉTECTION (ordre de criticité décroissant) +============================================================================== + +1. UAC Consent (élévation de privilèges) + - ClassName : `$$$Secure UAP Dummy Window Class$$$` + - Process : `consent.exe` + - Titre : "Contrôle de compte d'utilisateur", "User Account Control" + +2. CredUI (prompt mot de passe Windows) + - ClassName : `Credential Dialog Xaml Host` + - Process : `credentialuibroker.exe`, `credui.exe` + - Titre : "Sécurité Windows", "Windows Security" + +3. SmartScreen (protection contre applications inconnues) + - Process : `smartscreen.exe` + - Titre : "Windows a protégé votre ordinateur", "Windows protected your PC" + +4. Windows Defender / Security Center + - Process : `securityhealthhost.exe`, `msmpeng.exe` + - Titre : "Sécurité Windows", "Windows Defender" + +5. Signatures pilotes / driver install + - Titre : "Installer ce pilote", "Driver signature" +""" + +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Catégories de dialogues système (pour logging + messages) +# ============================================================================= + + +class SystemDialogCategory: + """Catégories de dialogues système à bloquer absolument.""" + UAC = "uac_consent" # Élévation de privilèges + CREDUI = "windows_credential_prompt" # Prompt de mot de passe + SMARTSCREEN = "smartscreen" # Protection SmartScreen + DEFENDER = "windows_defender" # Alerte Windows Defender + DRIVER = "driver_install" # Installation pilote signé + SECURITY_TOAST = "security_toast" # Toast de sécurité Windows + UNKNOWN_DIALOG = "unknown_system_dialog" # Dialogue #32770 sans app connue + + +@dataclass +class SystemDialogDetection: + """Résultat d'une analyse de dialogue système.""" + is_system_dialog: bool + category: str = "" # Valeur de SystemDialogCategory + matched_signal: str = "" # Ex: "class_name=Consent.exe" + matched_value: str = "" # La valeur qui a matché + reason: str = "" # Explication lisible + + def to_dict(self) -> Dict[str, Any]: + return { + "is_system_dialog": self.is_system_dialog, + "category": self.category, + "matched_signal": self.matched_signal, + "matched_value": self.matched_value, + "reason": self.reason, + } + + +# ============================================================================= +# Signatures de détection +# ============================================================================= + + +# ClassName UIA (casse préservée — Windows exposées telle quelle par UIA). +# Utilisées telles quelles puis en minuscules pour matcher avec souplesse. +_CLASS_NAMES_SYSTEM = { + # UAC Consent + "$$$Secure UAP Dummy Window Class$$$": SystemDialogCategory.UAC, + "Credential Dialog Xaml Host": SystemDialogCategory.CREDUI, + # Windows Credential UI ancien nom + "CredentialDialogXamlHost": SystemDialogCategory.CREDUI, +} + +# Nom de processus (comparaison insensible à la casse, .exe normalisé) +_PROCESS_NAMES_SYSTEM = { + "consent.exe": SystemDialogCategory.UAC, + "credentialuibroker.exe": SystemDialogCategory.CREDUI, + "credui.exe": SystemDialogCategory.CREDUI, + "credwiz.exe": SystemDialogCategory.CREDUI, + "smartscreen.exe": SystemDialogCategory.SMARTSCREEN, + "securityhealthhost.exe": SystemDialogCategory.DEFENDER, + "securityhealthui.exe": SystemDialogCategory.DEFENDER, + "securityhealthsystray.exe": SystemDialogCategory.DEFENDER, + "msmpeng.exe": SystemDialogCategory.DEFENDER, + "windowsdefender.exe": SystemDialogCategory.DEFENDER, + "msiexec.exe": SystemDialogCategory.DRIVER, # prompts pilotes signés + "drvinst.exe": SystemDialogCategory.DRIVER, +} + +# Motifs titre (insensibles à la casse, regex avec word boundaries) +# On ne matche pas les titres génériques trop larges pour limiter les faux +# positifs sur OSIRIS/OBSIUS/MEDSPHERE. +_TITLE_PATTERNS_SYSTEM: Tuple[Tuple[re.Pattern, str], ...] = ( + # UAC + (re.compile(r"contr[oô]le\s+de\s+compte\s+d'?utilisateur", re.IGNORECASE), + SystemDialogCategory.UAC), + (re.compile(r"\buser\s+account\s+control\b", re.IGNORECASE), + SystemDialogCategory.UAC), + (re.compile(r"voulez-vous\s+autoriser\s+cette\s+application", re.IGNORECASE), + SystemDialogCategory.UAC), + (re.compile(r"do\s+you\s+want\s+to\s+allow\s+this\s+app", re.IGNORECASE), + SystemDialogCategory.UAC), + + # CredUI / Sécurité Windows + (re.compile(r"\bs[eé]curit[eé]\s+windows\b", re.IGNORECASE), + SystemDialogCategory.CREDUI), + (re.compile(r"\bwindows\s+security\b", re.IGNORECASE), + SystemDialogCategory.CREDUI), + (re.compile(r"entrer\s+les\s+informations\s+d'?identification", re.IGNORECASE), + SystemDialogCategory.CREDUI), + (re.compile(r"enter\s+(?:your\s+)?credentials?", re.IGNORECASE), + SystemDialogCategory.CREDUI), + (re.compile(r"connectez-vous\s+[aà]\s+votre\s+compte", re.IGNORECASE), + SystemDialogCategory.CREDUI), + (re.compile(r"\bsign\s+in\s+to\s+your\s+account\b", re.IGNORECASE), + SystemDialogCategory.CREDUI), + + # SmartScreen + (re.compile(r"windows\s+a\s+prot[eé]g[eé]", re.IGNORECASE), + SystemDialogCategory.SMARTSCREEN), + (re.compile(r"windows\s+protected\s+your\s+pc", re.IGNORECASE), + SystemDialogCategory.SMARTSCREEN), + (re.compile(r"\bsmartscreen\b", re.IGNORECASE), + SystemDialogCategory.SMARTSCREEN), + (re.compile(r"\b[eé]diteur\s+inconnu\b", re.IGNORECASE), + SystemDialogCategory.SMARTSCREEN), + (re.compile(r"\bunknown\s+publisher\b", re.IGNORECASE), + SystemDialogCategory.SMARTSCREEN), + + # Windows Defender + (re.compile(r"windows\s+defender", re.IGNORECASE), + SystemDialogCategory.DEFENDER), + (re.compile(r"menace\s+d[eé]tect[eé]e", re.IGNORECASE), + SystemDialogCategory.DEFENDER), + (re.compile(r"threat\s+detected", re.IGNORECASE), + SystemDialogCategory.DEFENDER), + + # Driver + (re.compile(r"installer\s+ce\s+pilote", re.IGNORECASE), + SystemDialogCategory.DRIVER), + (re.compile(r"install\s+this\s+driver", re.IGNORECASE), + SystemDialogCategory.DRIVER), + (re.compile(r"signature\s+num[eé]rique\s+du\s+pilote", re.IGNORECASE), + SystemDialogCategory.DRIVER), +) + + +# ============================================================================= +# Fonctions de détection +# ============================================================================= + + +def _normalize_process(name: str) -> str: + """Normaliser un nom de processus pour comparaison.""" + if not name: + return "" + name = name.strip().lower() + # Enlever le chemin éventuel + if "\\" in name or "/" in name: + name = name.replace("\\", "/").split("/")[-1] + # Assurer suffixe .exe pour matcher le dictionnaire + if not name.endswith(".exe") and name: + # Les process_name peuvent venir sans .exe (psutil) — on ajoute + # pour avoir une clé uniforme + name_with_exe = name + ".exe" + if name_with_exe in _PROCESS_NAMES_SYSTEM: + return name_with_exe + return name + + +def _check_class_name(class_name: str) -> Optional[Tuple[str, str, str]]: + """Vérifier si un ClassName UIA matche un dialogue système. + + Returns: + (category, matched_class, reason) si match, None sinon. + """ + if not class_name: + return None + + # Match exact + if class_name in _CLASS_NAMES_SYSTEM: + cat = _CLASS_NAMES_SYSTEM[class_name] + return (cat, class_name, f"ClassName UIA '{class_name}' = dialogue système {cat}") + + # Match insensible à la casse + normalisation espaces + cn_norm = class_name.strip() + for known, cat in _CLASS_NAMES_SYSTEM.items(): + if cn_norm.lower() == known.lower(): + return (cat, class_name, f"ClassName UIA ~= '{known}' ({cat})") + + # Détection souple UAC (il existe quelques variantes de la classe secure) + if "secure uap" in class_name.lower() or "uap dummy" in class_name.lower(): + return (SystemDialogCategory.UAC, class_name, + f"ClassName '{class_name}' contient 'Secure UAP' → UAC") + + # Credential XAML Host + if "credential" in class_name.lower() and "xaml" in class_name.lower(): + return (SystemDialogCategory.CREDUI, class_name, + f"ClassName '{class_name}' contient Credential+Xaml → CredUI") + + return None + + +def _check_process_name(process_name: str) -> Optional[Tuple[str, str, str]]: + """Vérifier si un nom de processus est un dialogue système. + + Returns: + (category, matched_process, reason) si match, None sinon. + """ + if not process_name: + return None + + norm = _normalize_process(process_name) + if norm in _PROCESS_NAMES_SYSTEM: + cat = _PROCESS_NAMES_SYSTEM[norm] + return (cat, process_name, f"Processus '{norm}' = {cat}") + return None + + +def _check_title(title: str) -> Optional[Tuple[str, str, str]]: + """Vérifier si un titre de fenêtre matche un dialogue système. + + Returns: + (category, matched_pattern, reason) si match, None sinon. + """ + if not title: + return None + + for pattern, cat in _TITLE_PATTERNS_SYSTEM: + m = pattern.search(title) + if m: + return (cat, m.group(0), + f"Titre '{title[:60]}' matche '{pattern.pattern}' → {cat}") + return None + + +def is_system_dialog( + uia_snapshot: Optional[Dict[str, Any]] = None, + window_info: Optional[Dict[str, Any]] = None, +) -> SystemDialogDetection: + """Déterminer si la fenêtre active est un dialogue système critique. + + La détection combine plusieurs signaux — **un seul suffit à bloquer**. + On préfère un faux positif (pause inutile) à un faux négatif (clic UAC). + + Args: + uia_snapshot: Dict avec champs `class_name`, `process_name`, + `parent_path`, `name`. Peut être None si UIA indisponible. + window_info: Dict avec champs `title`, `app_name`. Peut être None. + + Returns: + SystemDialogDetection avec is_system_dialog=True si un dialogue + système est détecté. + + Exemples:: + + det = is_system_dialog(window_info={"title": "User Account Control"}) + assert det.is_system_dialog # UAC détecté + + det = is_system_dialog(uia_snapshot={"class_name": "$$$Secure UAP Dummy Window Class$$$"}) + assert det.is_system_dialog # UAC via ClassName + + det = is_system_dialog(window_info={"title": "OSIRIS - Patient Dupont"}) + assert not det.is_system_dialog # Application métier → OK + """ + # ── Signal 1 : ClassName UIA ── + if uia_snapshot: + cn = uia_snapshot.get("class_name", "") or "" + r = _check_class_name(cn) + if r: + cat, matched, reason = r + return SystemDialogDetection( + is_system_dialog=True, + category=cat, + matched_signal="class_name", + matched_value=matched, + reason=reason, + ) + + # Explorer aussi les parents (le champ cliqué peut être un bouton + # interne dont la ClassName est "Button", mais le root de la fenêtre + # est le Consent.exe). + for parent in uia_snapshot.get("parent_path", []) or []: + p_cn = parent.get("class_name", "") or "" + r = _check_class_name(p_cn) + if r: + cat, matched, reason = r + return SystemDialogDetection( + is_system_dialog=True, + category=cat, + matched_signal="parent_class_name", + matched_value=matched, + reason=f"Parent : {reason}", + ) + + # ── Signal 2 : Process name ── + if uia_snapshot: + pn = uia_snapshot.get("process_name", "") or "" + r = _check_process_name(pn) + if r: + cat, matched, reason = r + return SystemDialogDetection( + is_system_dialog=True, + category=cat, + matched_signal="process_name", + matched_value=matched, + reason=reason, + ) + + if window_info: + app = window_info.get("app_name", "") or "" + r = _check_process_name(app) + if r: + cat, matched, reason = r + return SystemDialogDetection( + is_system_dialog=True, + category=cat, + matched_signal="app_name", + matched_value=matched, + reason=reason, + ) + + # ── Signal 3 : Titre de fenêtre ── + if window_info: + title = window_info.get("title", "") or "" + r = _check_title(title) + if r: + cat, matched, reason = r + return SystemDialogDetection( + is_system_dialog=True, + category=cat, + matched_signal="window_title", + matched_value=matched, + reason=reason, + ) + + if uia_snapshot: + # Certains dialogues système remontent leur titre dans uia.name + uia_name = uia_snapshot.get("name", "") or "" + r = _check_title(uia_name) + if r: + cat, matched, reason = r + return SystemDialogDetection( + is_system_dialog=True, + category=cat, + matched_signal="uia_name", + matched_value=matched, + reason=reason, + ) + + return SystemDialogDetection(is_system_dialog=False) + + +def detect_current_system_dialog() -> SystemDialogDetection: + """Analyser l'écran actuel et détecter un dialogue système. + + Helper autonome qui interroge à la fois `get_active_window_info()` et + le helper UIA (si dispo) pour obtenir la détection la plus fiable. + + Returns: + SystemDialogDetection. Si un signal matche, is_system_dialog=True. + Si rien n'est disponible (Linux, UIA absent), is_system_dialog=False + mais le caller peut encore fallback sur une analyse par titre. + """ + window_info: Optional[Dict[str, Any]] = None + uia_snapshot: Optional[Dict[str, Any]] = None + + # Fenêtre active (cross-platform) + try: + from ..window_info_crossplatform import get_active_window_info + window_info = get_active_window_info() + except Exception as e: # pragma: no cover — best-effort + logger.debug(f"[SYS-DIALOG] window_info indisponible : {e}") + + # UIA local (Windows uniquement, via lea_uia.exe) + try: + from .uia_helper import get_shared_helper + helper = get_shared_helper() + if helper.available: + # On capture l'élément focalisé (root = fenêtre active) + element = helper.capture_focused(max_depth=2) + if element is not None: + uia_snapshot = element.to_dict() + except Exception as e: # pragma: no cover + logger.debug(f"[SYS-DIALOG] UIA indisponible : {e}") + + detection = is_system_dialog( + uia_snapshot=uia_snapshot, window_info=window_info, + ) + + if detection.is_system_dialog: + logger.warning( + f"[SYS-DIALOG] BLOCAGE — dialogue système détecté " + f"[{detection.category}] via {detection.matched_signal}='{detection.matched_value}' " + f"— {detection.reason}" + ) + return detection + + +__all__ = [ + "SystemDialogCategory", + "SystemDialogDetection", + "is_system_dialog", + "detect_current_system_dialog", +] diff --git a/tests/unit/test_policy_grounding_recovery_learning.py b/tests/unit/test_policy_grounding_recovery_learning.py index 2467d7b80..3cf423f06 100644 --- a/tests/unit/test_policy_grounding_recovery_learning.py +++ b/tests/unit/test_policy_grounding_recovery_learning.py @@ -122,6 +122,7 @@ class TestPolicyEngine: def _make_engine(self): from agent_v0.agent_v1.core.policy import PolicyEngine executor = MagicMock() + executor._system_dialog_pause = None return PolicyEngine(executor), executor def test_premier_essai_popup_fermee_retry(self): diff --git a/tests/unit/test_system_dialog_guard.py b/tests/unit/test_system_dialog_guard.py new file mode 100644 index 000000000..043bd6bce --- /dev/null +++ b/tests/unit/test_system_dialog_guard.py @@ -0,0 +1,391 @@ +# tests/unit/test_system_dialog_guard.py +""" +Tests du garde-fou sécurité : détection des dialogues système Windows critiques. + +Objectif : garantir que Léa REFUSE de cliquer automatiquement sur un +UAC / CredUI / SmartScreen (vecteur d'attaque ransomware). + +Philosophie : +- Faux positif tolérable (pause pour rien). +- Faux négatif catastrophique (clic UAC). +- Les tests privilégient la sécurité : tout dialogue suspect DOIT matcher. + +Cf. agent_v0/agent_v1/core/system_dialog_guard.py +""" + +from __future__ import annotations + +import pytest + +from agent_v0.agent_v1.core.system_dialog_guard import ( + SystemDialogCategory, + SystemDialogDetection, + is_system_dialog, +) + + +# ============================================================================= +# UAC (Contrôle de compte d'utilisateur) — le danger le plus grave +# ============================================================================= + + +class TestUACDetection: + """Un UAC qui n'est PAS détecté = vecteur d'attaque ransomware.""" + + def test_uac_via_class_name_exact(self): + """ClassName UIA du Consent.exe.""" + d = is_system_dialog( + uia_snapshot={"class_name": "$$$Secure UAP Dummy Window Class$$$"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + assert d.matched_signal == "class_name" + + def test_uac_via_class_name_case_variation(self): + """Robustesse à la casse (UIA peut varier).""" + d = is_system_dialog( + uia_snapshot={"class_name": "$$$SECURE UAP DUMMY WINDOW CLASS$$$"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_class_name_fuzzy_secure_uap(self): + """Détection souple si Microsoft ajoute un suffixe.""" + d = is_system_dialog( + uia_snapshot={"class_name": "Secure UAP Dummy"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_process_consent_exe(self): + """Le process consent.exe signe un UAC, peu importe la classe.""" + d = is_system_dialog(uia_snapshot={"process_name": "consent.exe"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_process_consent_exe_with_path(self): + """Chemin complet doit être normalisé.""" + d = is_system_dialog( + uia_snapshot={"process_name": r"C:\Windows\System32\consent.exe"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_app_name_in_window_info(self): + """psutil.name() (app_name côté agent) doit être reconnu.""" + d = is_system_dialog( + window_info={"title": "Administrateur", "app_name": "consent.exe"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_title_fr(self): + """Titre français officiel.""" + d = is_system_dialog( + window_info={"title": "Contrôle de compte d'utilisateur"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_title_en(self): + """Titre anglais.""" + d = is_system_dialog(window_info={"title": "User Account Control"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_title_question_fr(self): + """Phrase caractéristique du prompt UAC.""" + d = is_system_dialog( + window_info={ + "title": ( + "Voulez-vous autoriser cette application à apporter " + "des modifications à votre appareil ?" + ) + } + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_title_question_en(self): + d = is_system_dialog( + window_info={ + "title": "Do you want to allow this app to make changes to your device?" + } + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + + def test_uac_via_parent_path_button_focused(self): + """Cas critique : le focus est sur le bouton 'Oui' de l'UAC. + + Le ClassName du Button sera "Button", mais le parent est bien + le Consent.exe. La détection DOIT matcher sur parent_path. + """ + d = is_system_dialog( + uia_snapshot={ + "class_name": "Button", + "name": "Oui", + "control_type": "Button", + "parent_path": [ + {"class_name": "$$$Secure UAP Dummy Window Class$$$", "name": ""}, + ], + } + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.UAC + assert "parent" in d.matched_signal.lower() or d.matched_signal == "parent_class_name" + + +# ============================================================================= +# CredUI (prompt mot de passe Windows) +# ============================================================================= + + +class TestCredUIDetection: + """Les prompts de mot de passe ne doivent JAMAIS recevoir de frappe auto.""" + + def test_credui_via_class_name(self): + d = is_system_dialog( + uia_snapshot={"class_name": "Credential Dialog Xaml Host"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + def test_credui_variant_no_space(self): + """Variante sans espaces.""" + d = is_system_dialog( + uia_snapshot={"class_name": "CredentialDialogXamlHost"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + def test_credui_via_process_credentialuibroker(self): + d = is_system_dialog( + uia_snapshot={"process_name": "CredentialUIBroker.exe"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + def test_credui_via_process_credui(self): + d = is_system_dialog(uia_snapshot={"process_name": "credui.exe"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + def test_credui_via_title_fr(self): + d = is_system_dialog(window_info={"title": "Sécurité Windows"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + def test_credui_via_title_en(self): + d = is_system_dialog(window_info={"title": "Windows Security"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + def test_credui_via_title_enter_creds(self): + d = is_system_dialog( + window_info={"title": "Connectez-vous à votre compte"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.CREDUI + + +# ============================================================================= +# SmartScreen +# ============================================================================= + + +class TestSmartScreenDetection: + + def test_smartscreen_via_process(self): + d = is_system_dialog(uia_snapshot={"process_name": "smartscreen.exe"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.SMARTSCREEN + + def test_smartscreen_via_title_fr(self): + d = is_system_dialog( + window_info={"title": "Windows a protégé votre ordinateur"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.SMARTSCREEN + + def test_smartscreen_via_title_en(self): + d = is_system_dialog( + window_info={"title": "Windows protected your PC"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.SMARTSCREEN + + def test_smartscreen_unknown_publisher(self): + d = is_system_dialog( + window_info={"title": "Éditeur inconnu — Voulez-vous continuer ?"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.SMARTSCREEN + + +# ============================================================================= +# Autres dialogues système +# ============================================================================= + + +class TestOtherSystemDialogs: + + def test_defender_via_process(self): + d = is_system_dialog(uia_snapshot={"process_name": "MsMpEng.exe"}) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.DEFENDER + + def test_defender_threat_detected(self): + d = is_system_dialog( + window_info={"title": "Menace détectée — Windows Defender"} + ) + assert d.is_system_dialog + # Le regex "windows defender" gagne (listé en premier dans les patterns) + assert d.category == SystemDialogCategory.DEFENDER + + def test_driver_install(self): + d = is_system_dialog( + window_info={"title": "Installer ce pilote logiciel ?"} + ) + assert d.is_system_dialog + assert d.category == SystemDialogCategory.DRIVER + + +# ============================================================================= +# FAUX POSITIFS À ÉVITER — les apps métier doivent passer +# ============================================================================= + + +class TestBusinessAppsPassThrough: + """Vérifier que les apps métier (OSIRIS, OBSIUS, MEDSPHERE, etc.) + ne sont jamais confondues avec des dialogues système. + + Un faux positif ici = workflow métier cassé. + """ + + def test_osiris_main_window(self): + d = is_system_dialog( + window_info={"title": "OSIRIS - Patient Dupont Jean", "app_name": "osiris.exe"} + ) + assert not d.is_system_dialog + + def test_osiris_confirmation_dialog(self): + """Un dialogue #32770 OSIRIS métier (confirmation sauvegarde) DOIT passer.""" + d = is_system_dialog( + uia_snapshot={ + "class_name": "#32770", + "name": "Confirmation", + "process_name": "osiris.exe", + }, + window_info={"title": "Confirmation", "app_name": "osiris.exe"}, + ) + assert not d.is_system_dialog + + def test_obsius_main_window(self): + d = is_system_dialog( + window_info={"title": "OBSIUS - Consultation", "app_name": "obsius.exe"} + ) + assert not d.is_system_dialog + + def test_medsphere(self): + d = is_system_dialog( + window_info={"title": "MEDSPHERE v4.2", "app_name": "medsphere.exe"} + ) + assert not d.is_system_dialog + + def test_chrome_with_security_word_in_page(self): + """Chrome avec 'sécurité' dans le titre de page = OK si app_name=chrome.exe. + + NOTE : on accepte ici un faux positif théorique car 'Sécurité Windows' + est matché par le regex titre. Un navigateur affichant cette exacte + phrase en titre est possible. Compte tenu de l'asymétrie (clic UAC = + catastrophe), on ACCEPTE cette pause supervisée de fait. + """ + # Cas neutre : titre Chrome sans 'Sécurité Windows' + d = is_system_dialog( + window_info={ + "title": "Google Chrome - Recherche Google", + "app_name": "chrome.exe", + } + ) + assert not d.is_system_dialog + + def test_excel_file(self): + d = is_system_dialog( + window_info={"title": "Classeur1 - Excel", "app_name": "EXCEL.EXE"} + ) + assert not d.is_system_dialog + + def test_notepad(self): + d = is_system_dialog( + window_info={"title": "Sans titre - Bloc-notes", "app_name": "notepad.exe"} + ) + assert not d.is_system_dialog + + +# ============================================================================= +# Cas limites — robustesse +# ============================================================================= + + +class TestEdgeCases: + + def test_no_input_returns_false(self): + """Aucune info disponible = pas de blocage (fail-open).""" + d = is_system_dialog() + assert not d.is_system_dialog + + def test_empty_strings(self): + d = is_system_dialog( + uia_snapshot={"class_name": "", "process_name": "", "name": ""}, + window_info={"title": "", "app_name": ""}, + ) + assert not d.is_system_dialog + + def test_none_values(self): + """Les champs à None ne doivent pas planter.""" + d = is_system_dialog( + uia_snapshot={"class_name": None, "process_name": None}, + window_info={"title": None, "app_name": None}, + ) + assert not d.is_system_dialog + + def test_detection_to_dict_serializable(self): + d = is_system_dialog( + window_info={"title": "User Account Control"} + ) + data = d.to_dict() + assert data["is_system_dialog"] is True + assert data["category"] == SystemDialogCategory.UAC + assert data["reason"] + + def test_unicode_title_fr_accents(self): + """Les accents ne cassent pas la détection.""" + d = is_system_dialog( + window_info={"title": "Contrôle de compte d'utilisateur"} + ) + assert d.is_system_dialog + + def test_whitespace_title(self): + d = is_system_dialog( + window_info={"title": " Windows Security "} + ) + assert d.is_system_dialog + + +# ============================================================================= +# Intégration avec detect_current_system_dialog +# ============================================================================= + + +def test_detect_current_system_dialog_no_exception_linux(): + """Sur Linux, UIA indispo mais l'appel ne doit jamais planter.""" + from agent_v0.agent_v1.core.system_dialog_guard import detect_current_system_dialog + detection = detect_current_system_dialog() + # On ne valide pas le résultat (dépend de la fenêtre active) — + # juste qu'il n'y a pas d'exception. + assert isinstance(detection, SystemDialogDetection) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/unit/test_uac_guard_fail_closed_p0d.py b/tests/unit/test_uac_guard_fail_closed_p0d.py new file mode 100644 index 000000000..fcdf0aed3 --- /dev/null +++ b/tests/unit/test_uac_guard_fail_closed_p0d.py @@ -0,0 +1,148 @@ +""" +Tests du Fix P0-D : le garde-fou de dialogue système doit fail-closed. + +Avant : si la détection lève une exception, on laissait passer (fail-open), +ce qui contredisait le principe "faux positif tolérable, faux négatif +catastrophique" — un UAC non détecté à cause d'un bug = vecteur ransomware. + +Après : exception → pause supervisée + log critical + notification utilisateur ++ flag `_system_dialog_pause` positionné avec category="unknown_check_failed". +""" +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +@pytest.fixture +def executor(): + """Crée un ActionExecutorV1 sans déclencher l'init mss/pynput lourd. + + On instancie via __new__ + on injecte les attributs minimaux nécessaires + au test du garde-fou. + """ + from agent_v0.agent_v1.core.executor import ActionExecutorV1 + + exe = ActionExecutorV1.__new__(ActionExecutorV1) + exe._system_dialog_pause = None + exe._notification_manager = None # le @property notifier renverra _Noop + return exe + + +class TestUACGuardFailClosedP0D: + """Fix P0-D : exception dans la détection → pause supervisée.""" + + def test_exception_triggers_pause(self, executor, monkeypatch): + """Si detect_current_system_dialog lève, on pause au lieu de laisser passer.""" + from agent_v0.agent_v1.core import system_dialog_guard as guard_mod + + def _boom(*_a, **_kw): + raise RuntimeError("UIA backend down") + + monkeypatch.setattr(guard_mod, "detect_current_system_dialog", _boom) + + # Avant : pas de pause + assert executor._system_dialog_pause is None + + # Appel : doit retourner True (= "STOP") au lieu de False + result = executor._check_and_pause_on_system_dialog( + context="test_p0d_unit" + ) + + assert result is True, ( + "fail-closed : exception → True (le caller doit stopper) " + "et NON False comme avant le fix" + ) + + def test_exception_sets_pause_state(self, executor, monkeypatch): + """Vérifie que _system_dialog_pause contient les bonnes infos.""" + from agent_v0.agent_v1.core import system_dialog_guard as guard_mod + + def _boom(*_a, **_kw): + raise ValueError("XPath error") + + monkeypatch.setattr(guard_mod, "detect_current_system_dialog", _boom) + + executor._check_and_pause_on_system_dialog(context="popup_handler") + + pause = executor._system_dialog_pause + assert pause is not None, "Le flag de pause doit être positionné" + assert pause["category"] == "unknown_check_failed" + assert pause["matched_signal"] == "exception" + assert pause["matched_value"] == "ValueError" + assert "XPath error" in pause["reason"] + assert pause["context"] == "popup_handler" + + def test_no_exception_no_dialog_returns_false(self, executor, monkeypatch): + """Si pas de dialogue système et pas d'exception → False (laisse passer).""" + from agent_v0.agent_v1.core import system_dialog_guard as guard_mod + + class _FakeDetection: + is_system_dialog = False + category = None + matched_signal = None + matched_value = None + reason = None + + monkeypatch.setattr( + guard_mod, + "detect_current_system_dialog", + lambda *a, **k: _FakeDetection(), + ) + + result = executor._check_and_pause_on_system_dialog(context="ok") + assert result is False + assert executor._system_dialog_pause is None + + def test_dialog_detected_returns_true(self, executor, monkeypatch): + """Si UAC détecté légitimement → True + pause (comportement existant).""" + from agent_v0.agent_v1.core import system_dialog_guard as guard_mod + + class _UACDetection: + is_system_dialog = True + category = "uac_consent" + matched_signal = "class_name" + matched_value = "$$$Secure UAP Dummy Window Class$$$" + reason = "UAC consent prompt detected" + + monkeypatch.setattr( + guard_mod, + "detect_current_system_dialog", + lambda *a, **k: _UACDetection(), + ) + + result = executor._check_and_pause_on_system_dialog(context="uac_test") + assert result is True + assert executor._system_dialog_pause is not None + assert executor._system_dialog_pause["category"] == "uac_consent" + + def test_exception_notifies_user(self, executor, monkeypatch): + """L'exception doit déclencher une notification utilisateur.""" + from agent_v0.agent_v1.core import system_dialog_guard as guard_mod + + def _boom(*_a, **_kw): + raise OSError("UIA com error") + + monkeypatch.setattr(guard_mod, "detect_current_system_dialog", _boom) + + # Spy sur la notification + notifications = [] + + class _SpyNotifier: + def notify(self, title=None, message=None, **kwargs): + notifications.append((title, message)) + + executor._notification_manager = _SpyNotifier() + + executor._check_and_pause_on_system_dialog(context="spy_test") + + assert len(notifications) >= 1, ( + "Une notification doit être envoyée à l'utilisateur" + ) + title, message = notifications[0] + assert "sécurité" in title.lower() or "lea" in title.lower() + assert "garde-fou" in message.lower() or "pause" in message.lower()