Files
rpa_vision_v3/tests/unit/test_auth.py
Dom d5deac3029 feat: replay visuel VLM-first, worker séparé, package Léa, AZERTY, sécurité HTTPS
Pipeline replay visuel :
- VLM-first : l'agent appelle Ollama directement pour trouver les éléments
- Template matching en fallback (seuil strict 0.90)
- Stop immédiat si élément non trouvé (pas de clic blind)
- Replay depuis session brute (/replay-session) sans attendre le VLM
- Vérification post-action (screenshot hash avant/après)
- Gestion des popups (Enter/Escape/Tab+Enter)

Worker VLM séparé :
- run_worker.py : process distinct du serveur HTTP
- Communication par fichiers (_worker_queue.txt + _replay_active.lock)
- Le serveur HTTP ne fait plus jamais de VLM → toujours réactif
- Service systemd rpa-worker.service

Capture clavier :
- raw_keys (vk + press/release) pour replay exact indépendant du layout
- Fix AZERTY : ToUnicodeEx + AltGr detection
- Enter capturé comme \n, Tab comme \t
- Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites)
- Fusion text_input consécutifs, dédup key_combo

Sécurité & Internet :
- HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design)
- Token API fixe dans .env.local
- HTTP Basic Auth sur VWB
- Security headers (HSTS, CSP, nosniff)
- CORS domaines publics, plus de wildcard

Infrastructure :
- DPI awareness (SetProcessDpiAwareness) Python + Rust
- Métadonnées système (dpi_scale, window_bounds, monitors, os_theme)
- Template matching multi-scale [0.5, 2.0]
- Résolution dynamique (plus de hardcode 1920x1080)
- VLM prefill fix (47x speedup, 3.5s au lieu de 180s)

Modules :
- core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler
- core/federation/ : LearningPack export/import anonymisé, FAISS global
- deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt)

UX :
- Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant)
- Bibliothèque persistante (cache local + SQLite)
- Clustering hybride (titre fenêtre + DBSCAN)
- EdgeConstraints + PostConditions peuplés
- GraphBuilder compound actions (toutes les frappes)

Agent Rust :
- Token Bearer auth (network.rs)
- sysinfo.rs (DPI, résolution, window bounds via Win32 API)
- config.txt lu automatiquement
- Support Chrome/Brave/Firefox (pas que Edge)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:19:18 +01:00

577 lines
24 KiB
Python

"""
Tests du module d'authentification automatique (core/auth).
Couvre :
- TOTPGenerator : génération, vérification, vecteurs de test RFC 6238
- CredentialVault : CRUD, chiffrement, persistance
- AuthHandler : détection d'écrans d'auth, génération d'actions
"""
import json
import os
import tempfile
import time
import pytest
from core.auth.credential_vault import CredentialVault, _HAS_FERNET
from core.auth.totp_generator import TOTPGenerator
from core.auth.auth_handler import AuthHandler, AuthRequest
# =========================================================================
# Tests TOTP
# =========================================================================
class TestTOTPGenerator:
"""Tests du générateur TOTP RFC 6238."""
def test_generate_returns_6_digits(self):
"""Le code généré fait exactement 6 chiffres."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP")
code = totp.generate()
assert len(code) == 6
assert code.isdigit()
def test_generate_deterministic(self):
"""Le même timestamp donne le même code."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP")
ts = 1700000000.0
code1 = totp.generate(timestamp=ts)
code2 = totp.generate(timestamp=ts)
assert code1 == code2
def test_verify_current_code(self):
"""Le code généré est validé par verify()."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP")
ts = time.time()
code = totp.generate(timestamp=ts)
assert totp.verify(code, timestamp=ts)
def test_verify_rejects_wrong_code(self):
"""Un code incorrect est rejeté."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP")
# Utiliser un timestamp suffisamment grand pour éviter les problèmes
# avec window=-1 (counter négatif)
assert not totp.verify("000000", timestamp=1700000000.0)
def test_verify_with_window(self):
"""La fenêtre de tolérance accepte les codes adjacents."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP", interval=30)
ts = 1700000000.0
# Code de l'intervalle précédent
prev_code = totp.generate(timestamp=ts - 30)
assert totp.verify(prev_code, timestamp=ts, window=1)
# Code de l'intervalle suivant
next_code = totp.generate(timestamp=ts + 30)
assert totp.verify(next_code, timestamp=ts, window=1)
def test_verify_window_zero_strict(self):
"""Window=0 n'accepte que le code exact de l'intervalle courant."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP", interval=30)
ts = 1700000000.0
code = totp.generate(timestamp=ts)
assert totp.verify(code, timestamp=ts, window=0)
prev_code = totp.generate(timestamp=ts - 30)
assert not totp.verify(prev_code, timestamp=ts, window=0)
def test_time_remaining_in_range(self):
"""time_remaining() retourne entre 1 et interval."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP", interval=30)
remaining = totp.time_remaining()
assert 1 <= remaining <= 30
def test_8_digits(self):
"""Support des codes à 8 chiffres."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP", digits=8)
code = totp.generate()
assert len(code) == 8
assert code.isdigit()
def test_rfc6238_sha1_vector(self):
"""Vecteur de test RFC 6238 pour SHA1.
Secret de test : "12345678901234567890" (ASCII)
En base32 : "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
Timestamp : 59 → T = 59 // 30 = 1 → code attendu 287082
"""
# Le secret ASCII "12345678901234567890" encodé en base32
secret_b32 = "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
totp = TOTPGenerator(secret_b32, digits=8, interval=30, algorithm="SHA1")
code = totp.generate(timestamp=59)
assert code == "94287082"
def test_rfc6238_sha1_vector_t1111111109(self):
"""Vecteur de test RFC 6238 — T=1111111109."""
secret_b32 = "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
totp = TOTPGenerator(secret_b32, digits=8, interval=30, algorithm="SHA1")
code = totp.generate(timestamp=1111111109)
assert code == "07081804"
def test_rfc6238_sha256_vector(self):
"""Vecteur de test RFC 6238 pour SHA256.
Secret 32 bytes : "12345678901234567890123456789012"
En base32 : "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQGEZA"
Timestamp : 59 → code attendu 46119246
"""
secret_b32 = "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQGEZA"
totp = TOTPGenerator(secret_b32, digits=8, interval=30, algorithm="SHA256")
code = totp.generate(timestamp=59)
assert code == "46119246"
def test_invalid_secret_raises(self):
"""Un secret invalide lève ValueError."""
with pytest.raises(ValueError, match="base32 invalide"):
TOTPGenerator("!!! not base32 !!!")
def test_invalid_algorithm_raises(self):
"""Un algorithme inconnu lève ValueError."""
with pytest.raises(ValueError, match="non supporté"):
TOTPGenerator("JBSWY3DPEHPK3PXP", algorithm="MD5")
def test_secret_with_spaces(self):
"""Les espaces dans le secret sont tolérés."""
totp1 = TOTPGenerator("JBSWY3DPEHPK3PXP")
totp2 = TOTPGenerator("JBSW Y3DP EHPK 3PXP")
ts = 1700000000.0
assert totp1.generate(timestamp=ts) == totp2.generate(timestamp=ts)
def test_zero_padded_code(self):
"""Les codes courts sont zero-padded (ex: 003271 et non 3271)."""
totp = TOTPGenerator("JBSWY3DPEHPK3PXP")
# Tester beaucoup de timestamps pour trouver un code qui commence par 0
for ts in range(1700000000, 1700001000, 30):
code = totp.generate(timestamp=float(ts))
assert len(code) == 6, f"Code {code!r} n'a pas 6 chiffres pour ts={ts}"
# =========================================================================
# Tests CredentialVault
# =========================================================================
class TestCredentialVault:
"""Tests du coffre-fort chiffré."""
def test_create_add_get(self):
"""Créer un vault, ajouter un credential, le récupérer."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path) # Supprimer pour que le vault se crée
vault = CredentialVault(vault_path, "test_password")
vault.add_credential("TestApp", "login", {
"username": "user1",
"password": "pass1",
})
cred = vault.get_credential("TestApp", "login")
assert cred is not None
assert cred["username"] == "user1"
assert cred["password"] == "pass1"
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_save_and_reload(self):
"""Sauvegarder et recharger un vault préserve les données."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "master123")
vault.add_credential("MyApp", "login", {
"username": "admin",
"password": "secret",
})
vault.add_credential("MyApp", "totp_seed", {
"secret": "JBSWY3DPEHPK3PXP",
"digits": 6,
"interval": 30,
"algorithm": "SHA1",
})
vault.save()
# Recharger
vault2 = CredentialVault(vault_path, "master123")
assert vault2.list_apps() == ["MyApp"]
login = vault2.get_credential("MyApp", "login")
assert login["username"] == "admin"
totp = vault2.get_credential("MyApp", "totp_seed")
assert totp["secret"] == "JBSWY3DPEHPK3PXP"
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_remove_credential(self):
"""Supprimer un credential fonctionne."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "pw")
vault.add_credential("App1", "login", {"username": "u", "password": "p"})
assert vault.remove_credential("App1", "login") is True
assert vault.get_credential("App1", "login") is None
assert vault.list_apps() == []
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_remove_nonexistent(self):
"""Supprimer un credential inexistant retourne False."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "pw")
assert vault.remove_credential("NopApp", "login") is False
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_list_apps_sorted(self):
"""list_apps() retourne les apps triées."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "pw")
vault.add_credential("Zebra", "login", {"username": "z", "password": "z"})
vault.add_credential("Alpha", "login", {"username": "a", "password": "a"})
vault.add_credential("Middle", "login", {"username": "m", "password": "m"})
assert vault.list_apps() == ["Alpha", "Middle", "Zebra"]
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_invalid_credential_type(self):
"""Un type de credential invalide lève ValueError."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "pw")
with pytest.raises(ValueError, match="invalide"):
vault.add_credential("App1", "invalid_type", {})
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_encryption_on_disk(self):
"""Le fichier vault sur disque ne contient pas de texte en clair."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "strong_password_42")
vault.add_credential("SecretApp", "login", {
"username": "robot_lea",
"password": "super_secret_password_xyz",
})
vault.save()
# Lire le fichier brut
raw_bytes = open(vault_path, "rb").read()
raw_str = raw_bytes.decode("latin-1") # Pour chercher du texte ASCII
# Les données sensibles ne doivent PAS apparaître en clair
assert "robot_lea" not in raw_str
assert "super_secret_password_xyz" not in raw_str
assert "SecretApp" not in raw_str
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_wrong_password_raises(self):
"""Un mauvais mot de passe empêche le déchiffrement."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "correct_password")
vault.add_credential("App", "login", {"username": "u", "password": "p"})
vault.save()
# Tenter de charger avec un mauvais mot de passe
with pytest.raises(ValueError, match="[Mm]ot de passe|corrompu"):
CredentialVault(vault_path, "wrong_password")
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
def test_multiple_credential_types_per_app(self):
"""Une app peut avoir plusieurs types de credentials."""
with tempfile.NamedTemporaryFile(suffix=".enc", delete=False) as f:
vault_path = f.name
try:
os.unlink(vault_path)
vault = CredentialVault(vault_path, "pw")
vault.add_credential("DPI", "login", {
"username": "lea", "password": "p"
})
vault.add_credential("DPI", "totp_seed", {
"secret": "JBSWY3DPEHPK3PXP"
})
assert vault.list_credential_types("DPI") == ["login", "totp_seed"]
assert vault.get_credential("DPI", "login")["username"] == "lea"
assert vault.get_credential("DPI", "totp_seed")["secret"] == "JBSWY3DPEHPK3PXP"
finally:
if os.path.exists(vault_path):
os.unlink(vault_path)
# =========================================================================
# Tests AuthHandler
# =========================================================================
class TestAuthHandler:
"""Tests du gestionnaire d'authentification."""
@pytest.fixture
def vault_with_creds(self, tmp_path):
"""Vault avec des credentials de test."""
vault_path = str(tmp_path / "test_vault.enc")
vault = CredentialVault(vault_path, "test_pw")
vault.add_credential("DPI_Crossway", "login", {
"username": "robot_lea",
"password": "secret123",
"domain": "HOPITAL",
})
vault.add_credential("DPI_Crossway", "totp_seed", {
"secret": "JBSWY3DPEHPK3PXP",
"digits": 6,
"interval": 30,
"algorithm": "SHA1",
})
vault.add_credential("Outlook", "login", {
"username": "lea@hopital.fr",
"password": "outlook_pass",
})
return vault
@pytest.fixture
def handler(self, vault_with_creds):
return AuthHandler(vault_with_creds)
def test_detect_login_screen(self, handler):
"""Détecter un écran de login classique."""
screen_state = {
"perception": {
"detected_text": [
"Bienvenue sur DPI Crossway",
"Identifiant",
"Mot de passe",
"Se connecter",
],
},
"ui_elements": [
{"type": "text_input", "role": "text", "label": "Identifiant", "center": [500, 300], "element_id": "e1", "tags": []},
{"type": "text_input", "role": "password", "label": "Mot de passe", "center": [500, 350], "element_id": "e2", "tags": []},
{"type": "button", "role": "primary_action", "label": "Se connecter", "center": [500, 420], "element_id": "e3", "tags": []},
],
"window": {"app_name": "DPI_Crossway", "window_title": "DPI Crossway - Connexion"},
}
auth_req = handler.detect_auth_screen(screen_state)
assert auth_req is not None
assert auth_req.auth_type == "login"
assert auth_req.app_name == "DPI_Crossway"
assert auth_req.confidence >= 0.6 # Plusieurs signaux
def test_detect_totp_screen(self, handler):
"""Détecter un écran 2FA/TOTP (sans éléments de login)."""
screen_state = {
"perception": {
"detected_text": [
"Entrez votre code 2FA",
"Code à 6 chiffres",
],
},
"ui_elements": [
{"type": "text_input", "role": "text", "label": "Code OTP", "center": [500, 350], "element_id": "e1", "tags": []},
{"type": "button", "role": "primary_action", "label": "Confirmer", "center": [500, 420], "element_id": "e2", "tags": []},
],
"window": {"app_name": "DPI_Crossway"},
}
auth_req = handler.detect_auth_screen(screen_state)
assert auth_req is not None
assert auth_req.auth_type == "totp"
assert auth_req.confidence >= 0.3
def test_detect_login_and_totp(self, handler):
"""Détecter un écran combiné login + TOTP."""
screen_state = {
"perception": {
"detected_text": [
"Connexion sécurisée",
"Identifiant",
"Mot de passe",
"Code OTP",
],
},
"ui_elements": [
{"type": "text_input", "role": "text", "label": "Identifiant", "center": [500, 300], "element_id": "e1", "tags": []},
{"type": "text_input", "role": "password", "label": "Mot de passe", "center": [500, 350], "element_id": "e2", "tags": []},
{"type": "text_input", "role": "text", "label": "Code OTP", "center": [500, 400], "element_id": "e3", "tags": []},
{"type": "button", "role": "primary_action", "label": "Valider", "center": [500, 450], "element_id": "e4", "tags": []},
],
"window": {"app_name": "DPI_Crossway"},
}
auth_req = handler.detect_auth_screen(screen_state)
assert auth_req is not None
assert auth_req.auth_type == "login_and_totp"
assert auth_req.confidence >= 0.85 # Beaucoup de signaux
def test_no_auth_on_normal_screen(self, handler):
"""Un écran normal ne déclenche pas de détection."""
screen_state = {
"perception": {
"detected_text": ["Patient: Jean Dupont", "Dossier médical", "Résultats"],
},
"ui_elements": [
{"type": "button", "role": "navigation", "label": "Suivant", "center": [500, 500], "element_id": "e1", "tags": []},
],
"window": {"app_name": "DPI_Crossway"},
}
auth_req = handler.detect_auth_screen(screen_state)
assert auth_req is None
def test_get_auth_actions_login(self, handler):
"""Générer les actions pour un login classique."""
auth_req = AuthRequest(
auth_type="login",
app_name="DPI_Crossway",
detected_fields={
"username_field": {"type": "text_input", "label": "Identifiant", "center": [500, 300], "element_id": "e1"},
"password_field": {"type": "text_input", "label": "Mot de passe", "center": [500, 350], "element_id": "e2"},
"submit_button": {"type": "button", "label": "Se connecter", "center": [500, 420], "element_id": "e3"},
},
confidence=0.85,
)
actions = handler.get_auth_actions(auth_req)
assert len(actions) > 0
# Vérifier la séquence : click username, type username, click password, type password, click submit, wait
action_types = [(a["type"], a.get("text", "")) for a in actions]
# Il doit y avoir des clics et des saisies
has_click = any(a["type"] == "click" for a in actions)
has_type = any(a["type"] == "type_text" for a in actions)
has_wait = any(a["type"] == "wait" for a in actions)
assert has_click
assert has_type
assert has_wait
# Vérifier que le username et password sont ceux du vault
typed_texts = [a["text"] for a in actions if a["type"] == "type_text"]
assert "robot_lea" in typed_texts
assert "secret123" in typed_texts
# Toutes les actions ont le flag _auth_action
for action in actions:
assert action.get("_auth_action") is True
def test_get_auth_actions_totp(self, handler):
"""Générer les actions pour une auth TOTP."""
auth_req = AuthRequest(
auth_type="totp",
app_name="DPI_Crossway",
detected_fields={
"otp_field": {"type": "text_input", "label": "Code", "center": [500, 350], "element_id": "e1"},
"submit_button": {"type": "button", "label": "Valider", "center": [500, 420], "element_id": "e2"},
},
confidence=0.85,
)
actions = handler.get_auth_actions(auth_req)
assert len(actions) > 0
# Vérifier qu'un code TOTP est tapé (6 chiffres)
typed_texts = [a["text"] for a in actions if a["type"] == "type_text"]
assert len(typed_texts) >= 1
totp_code = typed_texts[0]
assert len(totp_code) == 6
assert totp_code.isdigit()
def test_get_auth_actions_login_and_totp(self, handler):
"""Générer les actions pour login + TOTP combiné."""
auth_req = AuthRequest(
auth_type="login_and_totp",
app_name="DPI_Crossway",
detected_fields={
"username_field": {"type": "text_input", "label": "Identifiant", "center": [500, 300], "element_id": "e1"},
"password_field": {"type": "text_input", "label": "Mot de passe", "center": [500, 350], "element_id": "e2"},
"otp_field": {"type": "text_input", "label": "Code OTP", "center": [500, 400], "element_id": "e3"},
"submit_button": {"type": "button", "label": "Valider", "center": [500, 450], "element_id": "e4"},
},
confidence=0.95,
)
actions = handler.get_auth_actions(auth_req)
assert len(actions) > 0
typed_texts = [a["text"] for a in actions if a["type"] == "type_text"]
# username + password + TOTP code
assert len(typed_texts) >= 3
assert "robot_lea" in typed_texts
assert "secret123" in typed_texts
# Le 3e est un code TOTP à 6 chiffres
totp_code = typed_texts[2]
assert len(totp_code) == 6
assert totp_code.isdigit()
def test_get_auth_actions_missing_credentials(self, handler):
"""Si le vault n'a pas les credentials, retourne une liste vide."""
auth_req = AuthRequest(
auth_type="login",
app_name="AppInconnue",
detected_fields={
"username_field": {"type": "text_input", "label": "Login", "center": [500, 300], "element_id": "e1"},
"password_field": {"type": "text_input", "label": "Password", "center": [500, 350], "element_id": "e2"},
},
confidence=0.85,
)
actions = handler.get_auth_actions(auth_req)
assert actions == []
def test_detect_english_auth_screen(self, handler):
"""Détecter un écran d'auth en anglais."""
screen_state = {
"perception": {
"detected_text": ["Sign in to your account", "Username", "Password"],
},
"ui_elements": [
{"type": "text_input", "role": "text", "label": "Username", "center": [500, 300], "element_id": "e1", "tags": []},
{"type": "text_input", "role": "password", "label": "Password", "center": [500, 350], "element_id": "e2", "tags": []},
{"type": "button", "role": "primary_action", "label": "Sign in", "center": [500, 420], "element_id": "e3", "tags": []},
],
"window": {"app_name": "Outlook"},
}
auth_req = handler.detect_auth_screen(screen_state)
assert auth_req is not None
assert auth_req.auth_type == "login"
assert auth_req.app_name == "Outlook"
def test_detect_password_tag(self, handler):
"""Détecter un champ password via les tags de l'élément UI."""
screen_state = {
"perception": {"detected_text": []},
"ui_elements": [
{"type": "text_input", "role": "text", "label": "", "center": [500, 300], "element_id": "e1", "tags": ["password"]},
],
"window": {"app_name": "SomeApp"},
}
auth_req = handler.detect_auth_screen(screen_state)
assert auth_req is not None
assert "password_field" in auth_req.detected_fields