Files
rpa_vision_v3/core/auth/auth_handler.py
Dom d5deac3029 feat: replay visuel VLM-first, worker séparé, package Léa, AZERTY, sécurité HTTPS
Pipeline replay visuel :
- VLM-first : l'agent appelle Ollama directement pour trouver les éléments
- Template matching en fallback (seuil strict 0.90)
- Stop immédiat si élément non trouvé (pas de clic blind)
- Replay depuis session brute (/replay-session) sans attendre le VLM
- Vérification post-action (screenshot hash avant/après)
- Gestion des popups (Enter/Escape/Tab+Enter)

Worker VLM séparé :
- run_worker.py : process distinct du serveur HTTP
- Communication par fichiers (_worker_queue.txt + _replay_active.lock)
- Le serveur HTTP ne fait plus jamais de VLM → toujours réactif
- Service systemd rpa-worker.service

Capture clavier :
- raw_keys (vk + press/release) pour replay exact indépendant du layout
- Fix AZERTY : ToUnicodeEx + AltGr detection
- Enter capturé comme \n, Tab comme \t
- Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites)
- Fusion text_input consécutifs, dédup key_combo

Sécurité & Internet :
- HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design)
- Token API fixe dans .env.local
- HTTP Basic Auth sur VWB
- Security headers (HSTS, CSP, nosniff)
- CORS domaines publics, plus de wildcard

Infrastructure :
- DPI awareness (SetProcessDpiAwareness) Python + Rust
- Métadonnées système (dpi_scale, window_bounds, monitors, os_theme)
- Template matching multi-scale [0.5, 2.0]
- Résolution dynamique (plus de hardcode 1920x1080)
- VLM prefill fix (47x speedup, 3.5s au lieu de 180s)

Modules :
- core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler
- core/federation/ : LearningPack export/import anonymisé, FAISS global
- deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt)

UX :
- Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant)
- Bibliothèque persistante (cache local + SQLite)
- Clustering hybride (titre fenêtre + DBSCAN)
- EdgeConstraints + PostConditions peuplés
- GraphBuilder compound actions (toutes les frappes)

Agent Rust :
- Token Bearer auth (network.rs)
- sysinfo.rs (DPI, résolution, window bounds via Win32 API)
- config.txt lu automatiquement
- Support Chrome/Brave/Firefox (pas que Edge)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:19:18 +01:00

524 lines
19 KiB
Python

"""
Gestionnaire d'authentification automatique pendant le replay.
Détecte les écrans d'authentification et injecte les credentials appropriés.
Fonctionne avec le ScreenState du core pipeline et le CredentialVault chiffré.
Stratégie de détection :
1. Analyse OCR : cherche des patterns textuels indicatifs d'un écran d'auth
("mot de passe", "identifiant", "code de vérification", etc.)
2. Analyse UI : cherche des éléments sémantiques typiques (champ password,
bouton "Se connecter", etc.)
3. Identification de l'application : via window_title du ScreenState
La confiance est calculée selon le nombre de signaux détectés :
- 1 signal = 0.3 (faible)
- 2 signaux = 0.6 (moyen)
- 3+ signaux = 0.85+ (élevé)
"""
import logging
import re
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from .credential_vault import CredentialVault
from .totp_generator import TOTPGenerator
logger = logging.getLogger(__name__)
# =========================================================================
# Patterns de détection d'écrans d'authentification
# =========================================================================
# Patterns OCR (texte visible sur l'écran) — FR + EN pour support bilingue
_AUTH_TEXT_PATTERNS = [
# Français
r"mot\s+de\s+passe",
r"identifiant",
r"nom\s+d'utilisateur",
r"connexion",
r"se\s+connecter",
r"authentification",
r"code\s+de\s+v[ée]rification",
r"code\s+otp",
r"double\s+authentification",
r"v[ée]rification\s+en\s+deux\s+[ée]tapes",
# Anglais
r"password",
r"username",
r"sign\s+in",
r"log\s*in",
r"verification\s+code",
r"two.factor",
r"2fa",
r"one.time\s+password",
r"enter\s+your\s+code",
]
# Patterns pour identifier spécifiquement un écran TOTP/2FA
_TOTP_TEXT_PATTERNS = [
r"code\s+de\s+v[ée]rification",
r"code\s+otp",
r"double\s+authentification",
r"v[ée]rification\s+en\s+deux",
r"two.factor",
r"2fa",
r"one.time\s+password",
r"enter\s+your\s+code",
r"code\s+[àa]\s+\d+\s+chiffres",
r"authenticator",
]
# Libellés de boutons de validation
_SUBMIT_BUTTON_PATTERNS = [
r"se\s+connecter",
r"connexion",
r"valider",
r"envoyer",
r"confirmer",
r"sign\s+in",
r"log\s*in",
r"submit",
r"verify",
r"ok",
]
# Compilations pour performance
_AUTH_REGEXES = [re.compile(p, re.IGNORECASE) for p in _AUTH_TEXT_PATTERNS]
_TOTP_REGEXES = [re.compile(p, re.IGNORECASE) for p in _TOTP_TEXT_PATTERNS]
_SUBMIT_REGEXES = [re.compile(p, re.IGNORECASE) for p in _SUBMIT_BUTTON_PATTERNS]
@dataclass
class AuthRequest:
"""Requête d'authentification détectée sur un écran.
Attributes:
auth_type: Type d'authentification détecté ("login", "totp", "login_and_totp").
app_name: Application identifiée (depuis window_title).
detected_fields: Champs détectés sur l'écran (positions, types).
confidence: Confiance de la détection (0.0 à 1.0).
"""
auth_type: str # "login", "totp", "login_and_totp"
app_name: str # App identifiée (depuis window_title)
detected_fields: Dict[str, Any] = field(default_factory=dict)
confidence: float = 0.0
class AuthHandler:
"""Gestionnaire d'authentification automatique pour le replay.
Analyse les ScreenStates pour détecter les écrans d'authentification
et génère les actions de replay correspondantes.
Usage :
handler = AuthHandler(vault)
auth_req = handler.detect_auth_screen(screen_state)
if auth_req:
actions = handler.get_auth_actions(auth_req)
# Injecter les actions dans la queue de replay
"""
def __init__(self, vault: CredentialVault):
"""Initialise le gestionnaire d'authentification.
Args:
vault: Instance du coffre-fort de credentials.
"""
self._vault = vault
def detect_auth_screen(self, screen_state: Any) -> Optional[AuthRequest]:
"""Analyse un ScreenState pour détecter un écran d'authentification.
La détection combine plusieurs signaux :
- Textes OCR correspondant à des patterns d'auth
- Éléments UI de type password/text_input
- Boutons de validation ("Se connecter", "Valider")
Args:
screen_state: ScreenState du core pipeline (ou dict compatible).
Returns:
AuthRequest si un écran d'auth est détecté avec confiance > 0.3,
None sinon.
"""
# Extraire les textes détectés et les éléments UI
texts = self._extract_texts(screen_state)
ui_elements = self._extract_ui_elements(screen_state)
app_name = self._extract_app_name(screen_state)
# Compteur de signaux de détection
signals: Dict[str, Any] = {}
# Signal 1 : Patterns textuels d'authentification
auth_text_matches = []
for text in texts:
for regex in _AUTH_REGEXES:
if regex.search(text):
auth_text_matches.append(regex.pattern)
if auth_text_matches:
signals["auth_text"] = auth_text_matches
# Signal 2 : Patterns textuels TOTP/2FA
totp_text_matches = []
for text in texts:
for regex in _TOTP_REGEXES:
if regex.search(text):
totp_text_matches.append(regex.pattern)
if totp_text_matches:
signals["totp_text"] = totp_text_matches
# Signal 3 : Champs UI de type password
password_fields = []
username_fields = []
submit_buttons = []
otp_fields = []
for elem in ui_elements:
elem_type = self._get_elem_attr(elem, "type", "")
elem_role = self._get_elem_attr(elem, "role", "")
elem_label = self._get_elem_attr(elem, "label", "").lower()
elem_tags = self._get_elem_attr(elem, "tags", [])
# Champ mot de passe
if elem_role == "password" or "password" in elem_tags:
password_fields.append(elem)
elif elem_type == "text_input" and any(
p in elem_label for p in ("mot de passe", "password", "mdp")
):
password_fields.append(elem)
# Champ identifiant/username
if elem_type == "text_input" and any(
p in elem_label
for p in ("identifiant", "username", "utilisateur", "login", "email", "e-mail")
):
username_fields.append(elem)
# Champ OTP
if elem_type == "text_input" and any(
p in elem_label for p in ("code", "otp", "vérification", "verification")
):
otp_fields.append(elem)
# Bouton de validation
if elem_type == "button":
for regex in _SUBMIT_REGEXES:
if regex.search(elem_label):
submit_buttons.append(elem)
break
if password_fields:
signals["password_field"] = len(password_fields)
if username_fields:
signals["username_field"] = len(username_fields)
if submit_buttons:
signals["submit_button"] = len(submit_buttons)
if otp_fields:
signals["otp_field"] = len(otp_fields)
# Pas assez de signaux → pas d'écran d'auth
if not signals:
return None
# Déterminer le type d'auth
# Les signaux textuels "auth_text" peuvent contenir des patterns ambigus
# (ex: "2fa" apparaît dans les deux listes). On ne compte comme signal
# login que les patterns auth_text qui ne sont PAS aussi des patterns TOTP.
auth_only_text = set(signals.get("auth_text", [])) - set(signals.get("totp_text", []))
has_login_signals = bool(
password_fields
or auth_only_text
or username_fields
)
has_totp_signals = bool(
otp_fields
or "totp_text" in signals
)
if has_login_signals and has_totp_signals:
auth_type = "login_and_totp"
elif has_totp_signals:
auth_type = "totp"
else:
auth_type = "login"
# Calculer la confiance (nombre de signaux distincts)
num_signals = len(signals)
if num_signals >= 4:
confidence = 0.95
elif num_signals >= 3:
confidence = 0.85
elif num_signals >= 2:
confidence = 0.6
else:
confidence = 0.3
# Construire les champs détectés
detected_fields: Dict[str, Any] = {}
if username_fields:
detected_fields["username_field"] = self._elem_to_dict(username_fields[0])
if password_fields:
detected_fields["password_field"] = self._elem_to_dict(password_fields[0])
if otp_fields:
detected_fields["otp_field"] = self._elem_to_dict(otp_fields[0])
if submit_buttons:
detected_fields["submit_button"] = self._elem_to_dict(submit_buttons[0])
auth_request = AuthRequest(
auth_type=auth_type,
app_name=app_name,
detected_fields=detected_fields,
confidence=confidence,
)
logger.info(
"Écran d'authentification détecté : type=%s app=%s confiance=%.2f signaux=%s",
auth_type,
app_name,
confidence,
list(signals.keys()),
)
return auth_request
def get_auth_actions(self, auth_request: AuthRequest) -> List[Dict[str, Any]]:
"""Génère les actions de replay pour s'authentifier.
Produit une séquence d'actions que l'Agent V1 peut exécuter :
- click sur le champ username, type le login
- click sur le champ password, type le mot de passe
- (optionnel) type le code TOTP
- click sur le bouton de validation
Args:
auth_request: Requête d'authentification détectée.
Returns:
Liste d'actions de replay (format compatible avec la queue de replay).
Liste vide si les credentials ne sont pas trouvés dans le vault.
"""
actions: List[Dict[str, Any]] = []
app_name = auth_request.app_name
fields = auth_request.detected_fields
# Générer un préfixe unique pour les action_ids
prefix = f"auth_{uuid.uuid4().hex[:6]}"
# ---- Login : username + password ----
if auth_request.auth_type in ("login", "login_and_totp"):
login_creds = self._vault.get_credential(app_name, "login")
if not login_creds:
logger.warning(
"Pas de credential 'login' pour l'app '%s' dans le vault",
app_name,
)
return []
# Action : cliquer sur le champ username et taper
username_field = fields.get("username_field")
if username_field:
actions.append({
"action_id": f"{prefix}_click_username",
"type": "click",
"target": username_field.get("center", [0, 0]),
"description": f"Clic champ identifiant ({app_name})",
"_auth_action": True,
})
actions.append({
"action_id": f"{prefix}_type_username",
"type": "type_text",
"text": login_creds.get("username", ""),
"description": f"Saisie identifiant ({app_name})",
"_auth_action": True,
})
# Action : cliquer sur le champ password et taper
password_field = fields.get("password_field")
if password_field:
actions.append({
"action_id": f"{prefix}_click_password",
"type": "click",
"target": password_field.get("center", [0, 0]),
"description": f"Clic champ mot de passe ({app_name})",
"_auth_action": True,
})
actions.append({
"action_id": f"{prefix}_type_password",
"type": "type_text",
"text": login_creds.get("password", ""),
"description": f"Saisie mot de passe ({app_name})",
"_auth_action": True,
})
# ---- TOTP : générer et taper le code ----
if auth_request.auth_type in ("totp", "login_and_totp"):
totp_creds = self._vault.get_credential(app_name, "totp_seed")
if not totp_creds:
logger.warning(
"Pas de credential 'totp_seed' pour l'app '%s' dans le vault",
app_name,
)
# On continue quand même si le login a été fait
if not actions:
return []
else:
totp = TOTPGenerator(
secret=totp_creds["secret"],
digits=totp_creds.get("digits", 6),
interval=totp_creds.get("interval", 30),
algorithm=totp_creds.get("algorithm", "SHA1"),
)
# Attendre si le code expire dans moins de 5 secondes
remaining = totp.time_remaining()
if remaining < 5:
actions.append({
"action_id": f"{prefix}_wait_totp",
"type": "wait",
"duration_ms": (remaining + 1) * 1000,
"reason": "attente_nouveau_code_totp",
"description": f"Attente nouveau code TOTP ({remaining}s restantes)",
"_auth_action": True,
})
code = totp.generate()
otp_field = fields.get("otp_field")
if otp_field:
actions.append({
"action_id": f"{prefix}_click_otp",
"type": "click",
"target": otp_field.get("center", [0, 0]),
"description": f"Clic champ OTP ({app_name})",
"_auth_action": True,
})
actions.append({
"action_id": f"{prefix}_type_totp",
"type": "type_text",
"text": code,
"description": f"Saisie code TOTP ({app_name})",
"_auth_action": True,
})
# ---- Bouton de validation ----
submit_button = fields.get("submit_button")
if submit_button and actions:
actions.append({
"action_id": f"{prefix}_click_submit",
"type": "click",
"target": submit_button.get("center", [0, 0]),
"description": f"Clic validation ({app_name})",
"_auth_action": True,
})
# Pause après validation pour laisser l'app charger
if actions:
actions.append({
"action_id": f"{prefix}_wait_after_auth",
"type": "wait",
"duration_ms": 2000,
"reason": "attente_chargement_post_auth",
"description": f"Attente post-authentification ({app_name})",
"_auth_action": True,
})
logger.info(
"Actions d'authentification générées : %d actions pour %s (type=%s)",
len(actions),
app_name,
auth_request.auth_type,
)
return actions
# =========================================================================
# Méthodes d'extraction internes
# =========================================================================
def _extract_texts(self, screen_state: Any) -> List[str]:
"""Extrait tous les textes détectés depuis un ScreenState.
Supporte les objets ScreenState du core et les dicts bruts.
"""
texts: List[str] = []
# ScreenState core (dataclass)
if hasattr(screen_state, "perception") and hasattr(
screen_state.perception, "detected_text"
):
texts.extend(screen_state.perception.detected_text)
# Dict brut (sessions streaming)
elif isinstance(screen_state, dict):
perception = screen_state.get("perception", {})
if isinstance(perception, dict):
texts.extend(perception.get("detected_text", []))
# Texte OCR brut
if "ocr_text" in screen_state:
texts.append(screen_state["ocr_text"])
# Textes des éléments UI
for elem in screen_state.get("ui_elements", []):
label = elem.get("label", "")
if label:
texts.append(label)
# Textes des éléments UI (objets)
if hasattr(screen_state, "ui_elements"):
for elem in screen_state.ui_elements:
label = self._get_elem_attr(elem, "label", "")
if label:
texts.append(label)
return texts
def _extract_ui_elements(self, screen_state: Any) -> List[Any]:
"""Extrait les éléments UI depuis un ScreenState."""
if hasattr(screen_state, "ui_elements"):
return list(screen_state.ui_elements)
if isinstance(screen_state, dict):
return screen_state.get("ui_elements", [])
return []
def _extract_app_name(self, screen_state: Any) -> str:
"""Extrait le nom de l'application depuis un ScreenState."""
# ScreenState core
if hasattr(screen_state, "window") and hasattr(screen_state.window, "app_name"):
return screen_state.window.app_name
# Dict brut
if isinstance(screen_state, dict):
window = screen_state.get("window", {})
if isinstance(window, dict):
return window.get("app_name", "unknown")
return "unknown"
@staticmethod
def _get_elem_attr(elem: Any, attr: str, default: Any = None) -> Any:
"""Récupère un attribut d'un élément UI (objet ou dict)."""
if isinstance(elem, dict):
return elem.get(attr, default)
return getattr(elem, attr, default)
@staticmethod
def _elem_to_dict(elem: Any) -> Dict[str, Any]:
"""Convertit un élément UI en dict minimal pour les detected_fields."""
if isinstance(elem, dict):
return {
"type": elem.get("type", ""),
"label": elem.get("label", ""),
"center": elem.get("center", [0, 0]),
"element_id": elem.get("element_id", ""),
}
return {
"type": getattr(elem, "type", ""),
"label": getattr(elem, "label", ""),
"center": list(getattr(elem, "center", (0, 0))),
"element_id": getattr(elem, "element_id", ""),
}