""" Gestionnaire d'authentification automatique pendant le replay. Détecte les écrans d'authentification et injecte les credentials appropriés. Fonctionne avec le ScreenState du core pipeline et le CredentialVault chiffré. Stratégie de détection : 1. Analyse OCR : cherche des patterns textuels indicatifs d'un écran d'auth ("mot de passe", "identifiant", "code de vérification", etc.) 2. Analyse UI : cherche des éléments sémantiques typiques (champ password, bouton "Se connecter", etc.) 3. Identification de l'application : via window_title du ScreenState La confiance est calculée selon le nombre de signaux détectés : - 1 signal = 0.3 (faible) - 2 signaux = 0.6 (moyen) - 3+ signaux = 0.85+ (élevé) """ import logging import re import uuid from dataclasses import dataclass, field from typing import Any, Dict, List, Optional from .credential_vault import CredentialVault from .totp_generator import TOTPGenerator logger = logging.getLogger(__name__) # ========================================================================= # Patterns de détection d'écrans d'authentification # ========================================================================= # Patterns OCR (texte visible sur l'écran) — FR + EN pour support bilingue _AUTH_TEXT_PATTERNS = [ # Français r"mot\s+de\s+passe", r"identifiant", r"nom\s+d'utilisateur", r"connexion", r"se\s+connecter", r"authentification", r"code\s+de\s+v[ée]rification", r"code\s+otp", r"double\s+authentification", r"v[ée]rification\s+en\s+deux\s+[ée]tapes", # Anglais r"password", r"username", r"sign\s+in", r"log\s*in", r"verification\s+code", r"two.factor", r"2fa", r"one.time\s+password", r"enter\s+your\s+code", ] # Patterns pour identifier spécifiquement un écran TOTP/2FA _TOTP_TEXT_PATTERNS = [ r"code\s+de\s+v[ée]rification", r"code\s+otp", r"double\s+authentification", r"v[ée]rification\s+en\s+deux", r"two.factor", r"2fa", r"one.time\s+password", r"enter\s+your\s+code", r"code\s+[àa]\s+\d+\s+chiffres", r"authenticator", ] # Libellés de boutons de validation _SUBMIT_BUTTON_PATTERNS = [ r"se\s+connecter", r"connexion", r"valider", r"envoyer", r"confirmer", r"sign\s+in", r"log\s*in", r"submit", r"verify", r"ok", ] # Compilations pour performance _AUTH_REGEXES = [re.compile(p, re.IGNORECASE) for p in _AUTH_TEXT_PATTERNS] _TOTP_REGEXES = [re.compile(p, re.IGNORECASE) for p in _TOTP_TEXT_PATTERNS] _SUBMIT_REGEXES = [re.compile(p, re.IGNORECASE) for p in _SUBMIT_BUTTON_PATTERNS] @dataclass class AuthRequest: """Requête d'authentification détectée sur un écran. Attributes: auth_type: Type d'authentification détecté ("login", "totp", "login_and_totp"). app_name: Application identifiée (depuis window_title). detected_fields: Champs détectés sur l'écran (positions, types). confidence: Confiance de la détection (0.0 à 1.0). """ auth_type: str # "login", "totp", "login_and_totp" app_name: str # App identifiée (depuis window_title) detected_fields: Dict[str, Any] = field(default_factory=dict) confidence: float = 0.0 class AuthHandler: """Gestionnaire d'authentification automatique pour le replay. Analyse les ScreenStates pour détecter les écrans d'authentification et génère les actions de replay correspondantes. Usage : handler = AuthHandler(vault) auth_req = handler.detect_auth_screen(screen_state) if auth_req: actions = handler.get_auth_actions(auth_req) # Injecter les actions dans la queue de replay """ def __init__(self, vault: CredentialVault): """Initialise le gestionnaire d'authentification. Args: vault: Instance du coffre-fort de credentials. """ self._vault = vault def detect_auth_screen(self, screen_state: Any) -> Optional[AuthRequest]: """Analyse un ScreenState pour détecter un écran d'authentification. La détection combine plusieurs signaux : - Textes OCR correspondant à des patterns d'auth - Éléments UI de type password/text_input - Boutons de validation ("Se connecter", "Valider") Args: screen_state: ScreenState du core pipeline (ou dict compatible). Returns: AuthRequest si un écran d'auth est détecté avec confiance > 0.3, None sinon. """ # Extraire les textes détectés et les éléments UI texts = self._extract_texts(screen_state) ui_elements = self._extract_ui_elements(screen_state) app_name = self._extract_app_name(screen_state) # Compteur de signaux de détection signals: Dict[str, Any] = {} # Signal 1 : Patterns textuels d'authentification auth_text_matches = [] for text in texts: for regex in _AUTH_REGEXES: if regex.search(text): auth_text_matches.append(regex.pattern) if auth_text_matches: signals["auth_text"] = auth_text_matches # Signal 2 : Patterns textuels TOTP/2FA totp_text_matches = [] for text in texts: for regex in _TOTP_REGEXES: if regex.search(text): totp_text_matches.append(regex.pattern) if totp_text_matches: signals["totp_text"] = totp_text_matches # Signal 3 : Champs UI de type password password_fields = [] username_fields = [] submit_buttons = [] otp_fields = [] for elem in ui_elements: elem_type = self._get_elem_attr(elem, "type", "") elem_role = self._get_elem_attr(elem, "role", "") elem_label = self._get_elem_attr(elem, "label", "").lower() elem_tags = self._get_elem_attr(elem, "tags", []) # Champ mot de passe if elem_role == "password" or "password" in elem_tags: password_fields.append(elem) elif elem_type == "text_input" and any( p in elem_label for p in ("mot de passe", "password", "mdp") ): password_fields.append(elem) # Champ identifiant/username if elem_type == "text_input" and any( p in elem_label for p in ("identifiant", "username", "utilisateur", "login", "email", "e-mail") ): username_fields.append(elem) # Champ OTP if elem_type == "text_input" and any( p in elem_label for p in ("code", "otp", "vérification", "verification") ): otp_fields.append(elem) # Bouton de validation if elem_type == "button": for regex in _SUBMIT_REGEXES: if regex.search(elem_label): submit_buttons.append(elem) break if password_fields: signals["password_field"] = len(password_fields) if username_fields: signals["username_field"] = len(username_fields) if submit_buttons: signals["submit_button"] = len(submit_buttons) if otp_fields: signals["otp_field"] = len(otp_fields) # Pas assez de signaux → pas d'écran d'auth if not signals: return None # Déterminer le type d'auth # Les signaux textuels "auth_text" peuvent contenir des patterns ambigus # (ex: "2fa" apparaît dans les deux listes). On ne compte comme signal # login que les patterns auth_text qui ne sont PAS aussi des patterns TOTP. auth_only_text = set(signals.get("auth_text", [])) - set(signals.get("totp_text", [])) has_login_signals = bool( password_fields or auth_only_text or username_fields ) has_totp_signals = bool( otp_fields or "totp_text" in signals ) if has_login_signals and has_totp_signals: auth_type = "login_and_totp" elif has_totp_signals: auth_type = "totp" else: auth_type = "login" # Calculer la confiance (nombre de signaux distincts) num_signals = len(signals) if num_signals >= 4: confidence = 0.95 elif num_signals >= 3: confidence = 0.85 elif num_signals >= 2: confidence = 0.6 else: confidence = 0.3 # Construire les champs détectés detected_fields: Dict[str, Any] = {} if username_fields: detected_fields["username_field"] = self._elem_to_dict(username_fields[0]) if password_fields: detected_fields["password_field"] = self._elem_to_dict(password_fields[0]) if otp_fields: detected_fields["otp_field"] = self._elem_to_dict(otp_fields[0]) if submit_buttons: detected_fields["submit_button"] = self._elem_to_dict(submit_buttons[0]) auth_request = AuthRequest( auth_type=auth_type, app_name=app_name, detected_fields=detected_fields, confidence=confidence, ) logger.info( "Écran d'authentification détecté : type=%s app=%s confiance=%.2f signaux=%s", auth_type, app_name, confidence, list(signals.keys()), ) return auth_request def get_auth_actions(self, auth_request: AuthRequest) -> List[Dict[str, Any]]: """Génère les actions de replay pour s'authentifier. Produit une séquence d'actions que l'Agent V1 peut exécuter : - click sur le champ username, type le login - click sur le champ password, type le mot de passe - (optionnel) type le code TOTP - click sur le bouton de validation Args: auth_request: Requête d'authentification détectée. Returns: Liste d'actions de replay (format compatible avec la queue de replay). Liste vide si les credentials ne sont pas trouvés dans le vault. """ actions: List[Dict[str, Any]] = [] app_name = auth_request.app_name fields = auth_request.detected_fields # Générer un préfixe unique pour les action_ids prefix = f"auth_{uuid.uuid4().hex[:6]}" # ---- Login : username + password ---- if auth_request.auth_type in ("login", "login_and_totp"): login_creds = self._vault.get_credential(app_name, "login") if not login_creds: logger.warning( "Pas de credential 'login' pour l'app '%s' dans le vault", app_name, ) return [] # Action : cliquer sur le champ username et taper username_field = fields.get("username_field") if username_field: actions.append({ "action_id": f"{prefix}_click_username", "type": "click", "target": username_field.get("center", [0, 0]), "description": f"Clic champ identifiant ({app_name})", "_auth_action": True, }) actions.append({ "action_id": f"{prefix}_type_username", "type": "type_text", "text": login_creds.get("username", ""), "description": f"Saisie identifiant ({app_name})", "_auth_action": True, }) # Action : cliquer sur le champ password et taper password_field = fields.get("password_field") if password_field: actions.append({ "action_id": f"{prefix}_click_password", "type": "click", "target": password_field.get("center", [0, 0]), "description": f"Clic champ mot de passe ({app_name})", "_auth_action": True, }) actions.append({ "action_id": f"{prefix}_type_password", "type": "type_text", "text": login_creds.get("password", ""), "description": f"Saisie mot de passe ({app_name})", "_auth_action": True, }) # ---- TOTP : générer et taper le code ---- if auth_request.auth_type in ("totp", "login_and_totp"): totp_creds = self._vault.get_credential(app_name, "totp_seed") if not totp_creds: logger.warning( "Pas de credential 'totp_seed' pour l'app '%s' dans le vault", app_name, ) # On continue quand même si le login a été fait if not actions: return [] else: totp = TOTPGenerator( secret=totp_creds["secret"], digits=totp_creds.get("digits", 6), interval=totp_creds.get("interval", 30), algorithm=totp_creds.get("algorithm", "SHA1"), ) # Attendre si le code expire dans moins de 5 secondes remaining = totp.time_remaining() if remaining < 5: actions.append({ "action_id": f"{prefix}_wait_totp", "type": "wait", "duration_ms": (remaining + 1) * 1000, "reason": "attente_nouveau_code_totp", "description": f"Attente nouveau code TOTP ({remaining}s restantes)", "_auth_action": True, }) code = totp.generate() otp_field = fields.get("otp_field") if otp_field: actions.append({ "action_id": f"{prefix}_click_otp", "type": "click", "target": otp_field.get("center", [0, 0]), "description": f"Clic champ OTP ({app_name})", "_auth_action": True, }) actions.append({ "action_id": f"{prefix}_type_totp", "type": "type_text", "text": code, "description": f"Saisie code TOTP ({app_name})", "_auth_action": True, }) # ---- Bouton de validation ---- submit_button = fields.get("submit_button") if submit_button and actions: actions.append({ "action_id": f"{prefix}_click_submit", "type": "click", "target": submit_button.get("center", [0, 0]), "description": f"Clic validation ({app_name})", "_auth_action": True, }) # Pause après validation pour laisser l'app charger if actions: actions.append({ "action_id": f"{prefix}_wait_after_auth", "type": "wait", "duration_ms": 2000, "reason": "attente_chargement_post_auth", "description": f"Attente post-authentification ({app_name})", "_auth_action": True, }) logger.info( "Actions d'authentification générées : %d actions pour %s (type=%s)", len(actions), app_name, auth_request.auth_type, ) return actions # ========================================================================= # Méthodes d'extraction internes # ========================================================================= def _extract_texts(self, screen_state: Any) -> List[str]: """Extrait tous les textes détectés depuis un ScreenState. Supporte les objets ScreenState du core et les dicts bruts. """ texts: List[str] = [] # ScreenState core (dataclass) if hasattr(screen_state, "perception") and hasattr( screen_state.perception, "detected_text" ): texts.extend(screen_state.perception.detected_text) # Dict brut (sessions streaming) elif isinstance(screen_state, dict): perception = screen_state.get("perception", {}) if isinstance(perception, dict): texts.extend(perception.get("detected_text", [])) # Texte OCR brut if "ocr_text" in screen_state: texts.append(screen_state["ocr_text"]) # Textes des éléments UI for elem in screen_state.get("ui_elements", []): label = elem.get("label", "") if label: texts.append(label) # Textes des éléments UI (objets) if hasattr(screen_state, "ui_elements"): for elem in screen_state.ui_elements: label = self._get_elem_attr(elem, "label", "") if label: texts.append(label) return texts def _extract_ui_elements(self, screen_state: Any) -> List[Any]: """Extrait les éléments UI depuis un ScreenState.""" if hasattr(screen_state, "ui_elements"): return list(screen_state.ui_elements) if isinstance(screen_state, dict): return screen_state.get("ui_elements", []) return [] def _extract_app_name(self, screen_state: Any) -> str: """Extrait le nom de l'application depuis un ScreenState.""" # ScreenState core if hasattr(screen_state, "window") and hasattr(screen_state.window, "app_name"): return screen_state.window.app_name # Dict brut if isinstance(screen_state, dict): window = screen_state.get("window", {}) if isinstance(window, dict): return window.get("app_name", "unknown") return "unknown" @staticmethod def _get_elem_attr(elem: Any, attr: str, default: Any = None) -> Any: """Récupère un attribut d'un élément UI (objet ou dict).""" if isinstance(elem, dict): return elem.get(attr, default) return getattr(elem, attr, default) @staticmethod def _elem_to_dict(elem: Any) -> Dict[str, Any]: """Convertit un élément UI en dict minimal pour les detected_fields.""" if isinstance(elem, dict): return { "type": elem.get("type", ""), "label": elem.get("label", ""), "center": elem.get("center", [0, 0]), "element_id": elem.get("element_id", ""), } return { "type": getattr(elem, "type", ""), "label": getattr(elem, "label", ""), "center": list(getattr(elem, "center", (0, 0))), "element_id": getattr(elem, "element_id", ""), }