Files
rpa_vision_v3/core/auth/credential_vault.py
Dom d5deac3029 feat: replay visuel VLM-first, worker séparé, package Léa, AZERTY, sécurité HTTPS
Pipeline replay visuel :
- VLM-first : l'agent appelle Ollama directement pour trouver les éléments
- Template matching en fallback (seuil strict 0.90)
- Stop immédiat si élément non trouvé (pas de clic blind)
- Replay depuis session brute (/replay-session) sans attendre le VLM
- Vérification post-action (screenshot hash avant/après)
- Gestion des popups (Enter/Escape/Tab+Enter)

Worker VLM séparé :
- run_worker.py : process distinct du serveur HTTP
- Communication par fichiers (_worker_queue.txt + _replay_active.lock)
- Le serveur HTTP ne fait plus jamais de VLM → toujours réactif
- Service systemd rpa-worker.service

Capture clavier :
- raw_keys (vk + press/release) pour replay exact indépendant du layout
- Fix AZERTY : ToUnicodeEx + AltGr detection
- Enter capturé comme \n, Tab comme \t
- Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites)
- Fusion text_input consécutifs, dédup key_combo

Sécurité & Internet :
- HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design)
- Token API fixe dans .env.local
- HTTP Basic Auth sur VWB
- Security headers (HSTS, CSP, nosniff)
- CORS domaines publics, plus de wildcard

Infrastructure :
- DPI awareness (SetProcessDpiAwareness) Python + Rust
- Métadonnées système (dpi_scale, window_bounds, monitors, os_theme)
- Template matching multi-scale [0.5, 2.0]
- Résolution dynamique (plus de hardcode 1920x1080)
- VLM prefill fix (47x speedup, 3.5s au lieu de 180s)

Modules :
- core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler
- core/federation/ : LearningPack export/import anonymisé, FAISS global
- deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt)

UX :
- Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant)
- Bibliothèque persistante (cache local + SQLite)
- Clustering hybride (titre fenêtre + DBSCAN)
- EdgeConstraints + PostConditions peuplés
- GraphBuilder compound actions (toutes les frappes)

Agent Rust :
- Token Bearer auth (network.rs)
- sysinfo.rs (DPI, résolution, window bounds via Win32 API)
- config.txt lu automatiquement
- Support Chrome/Brave/Firefox (pas que Edge)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:19:18 +01:00

299 lines
10 KiB
Python

"""
Coffre-fort chiffré pour les credentials d'authentification.
Stocke de façon sécurisée :
- Comptes de service (login/password)
- Seeds TOTP pour la 2FA
- Tokens de session
- Certificats client
Le fichier vault est chiffré avec Fernet (AES-128-CBC + HMAC-SHA256).
La clé est dérivée d'un mot de passe maître via PBKDF2 (600000 itérations).
Choix de sécurité :
- PBKDF2 avec 600 000 itérations : recommandation OWASP 2023 pour SHA-256.
Compromis acceptable entre temps de dérivation (~0.5s) et résistance au brute-force.
- Fernet (AES-128-CBC + HMAC-SHA256) : chiffrement authentifié, empêche les
modifications silencieuses du fichier vault. Bibliothèque maintenue et auditée.
- Salt aléatoire de 16 bytes : empêche les attaques par rainbow table.
Stocké en clair en préfixe du fichier (le salt n'est pas un secret).
"""
import base64
import json
import logging
import os
import warnings
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Types de credentials supportés
CREDENTIAL_TYPES = {"login", "totp_seed", "session_token", "certificate"}
# Taille du salt en bytes
SALT_SIZE = 16
# Nombre d'itérations PBKDF2 — recommandation OWASP 2023 pour SHA-256
PBKDF2_ITERATIONS = 600_000
# Tentative d'import de cryptography pour le chiffrement Fernet
_HAS_FERNET = False
try:
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
_HAS_FERNET = True
except ImportError:
_HAS_FERNET = False
warnings.warn(
"Module 'cryptography' non disponible. Le vault utilisera un encodage "
"base64 NON SÉCURISÉ. NE PAS utiliser en production.",
stacklevel=2,
)
class CredentialVault:
"""Coffre-fort chiffré pour les credentials d'applications.
Usage :
vault = CredentialVault("/chemin/vault.enc", "mot_de_passe_maitre")
vault.add_credential("DPI_Crossway", "login", {
"username": "robot_lea", "password": "xxx", "domain": "HOPITAL"
})
vault.save()
creds = vault.get_credential("DPI_Crossway", "login")
"""
def __init__(self, vault_path: str, master_password: str):
"""Charge ou crée un vault chiffré.
Args:
vault_path: Chemin du fichier vault chiffré sur disque.
master_password: Mot de passe maître pour dériver la clé de chiffrement.
"""
self._vault_path = Path(vault_path)
self._master_password = master_password
self._data: Dict[str, Any] = {
"version": "1.0",
"created_at": datetime.now(timezone.utc).isoformat(),
"credentials": {},
}
if self._vault_path.exists():
self._load()
else:
logger.info("Vault inexistant, création d'un nouveau vault : %s", vault_path)
# =========================================================================
# API publique
# =========================================================================
def add_credential(
self, app_name: str, credential_type: str, data: Dict[str, Any]
) -> None:
"""Ajoute ou met à jour un credential pour une application.
Args:
app_name: Nom de l'application (ex: "DPI_Crossway").
credential_type: Type parmi "login", "totp_seed", "session_token", "certificate".
data: Dictionnaire avec les champs spécifiques au type.
Raises:
ValueError: Si le credential_type n'est pas supporté.
"""
if credential_type not in CREDENTIAL_TYPES:
raise ValueError(
f"Type de credential invalide : {credential_type!r}. "
f"Types supportés : {CREDENTIAL_TYPES}"
)
if app_name not in self._data["credentials"]:
self._data["credentials"][app_name] = {}
self._data["credentials"][app_name][credential_type] = data
logger.info(
"Credential ajouté : app=%s type=%s", app_name, credential_type
)
def get_credential(
self, app_name: str, credential_type: str
) -> Optional[Dict[str, Any]]:
"""Récupère un credential pour une application.
Args:
app_name: Nom de l'application.
credential_type: Type de credential recherché.
Returns:
Dictionnaire du credential, ou None si non trouvé.
"""
app_creds = self._data["credentials"].get(app_name, {})
return app_creds.get(credential_type)
def remove_credential(self, app_name: str, credential_type: str) -> bool:
"""Supprime un credential.
Args:
app_name: Nom de l'application.
credential_type: Type de credential à supprimer.
Returns:
True si supprimé, False si non trouvé.
"""
app_creds = self._data["credentials"].get(app_name, {})
if credential_type in app_creds:
del app_creds[credential_type]
# Nettoyer l'app si plus de credentials
if not app_creds:
del self._data["credentials"][app_name]
logger.info(
"Credential supprimé : app=%s type=%s", app_name, credential_type
)
return True
return False
def list_apps(self) -> List[str]:
"""Liste les noms d'applications configurées.
Returns:
Liste triée des noms d'applications.
"""
return sorted(self._data["credentials"].keys())
def list_credential_types(self, app_name: str) -> List[str]:
"""Liste les types de credentials pour une application.
Args:
app_name: Nom de l'application.
Returns:
Liste des types de credentials configurés.
"""
return list(self._data["credentials"].get(app_name, {}).keys())
def save(self) -> None:
"""Chiffre et sauvegarde le vault sur disque."""
plaintext = json.dumps(self._data, ensure_ascii=False, indent=2).encode("utf-8")
encrypted = self._encrypt(plaintext)
# Écriture atomique via fichier temporaire
tmp_path = self._vault_path.with_suffix(".tmp")
self._vault_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(encrypted)
tmp_path.rename(self._vault_path)
logger.info("Vault sauvegardé : %s (%d bytes)", self._vault_path, len(encrypted))
# =========================================================================
# Chiffrement / Déchiffrement
# =========================================================================
def _derive_key(self, password: str, salt: bytes) -> bytes:
"""Dérive une clé Fernet à partir du mot de passe maître.
Utilise PBKDF2-HMAC-SHA256 avec 600 000 itérations (OWASP 2023).
La sortie est encodée en base64 URL-safe pour Fernet (32 bytes → 44 chars).
Args:
password: Mot de passe maître.
salt: Salt aléatoire (16 bytes minimum).
Returns:
Clé Fernet encodée en base64 URL-safe (44 bytes).
"""
if _HAS_FERNET:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=PBKDF2_ITERATIONS,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))
return key
else:
# Fallback non sécurisé — simple hash pour le développement
import hashlib
dk = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS
)
return base64.urlsafe_b64encode(dk)
def _encrypt(self, plaintext: bytes) -> bytes:
"""Chiffre les données avec Fernet (ou base64 en fallback).
Format du fichier vault :
[16 bytes salt][données chiffrées Fernet]
Args:
plaintext: Données en clair à chiffrer.
Returns:
Bytes chiffrés avec le salt en préfixe.
"""
salt = os.urandom(SALT_SIZE)
key = self._derive_key(self._master_password, salt)
if _HAS_FERNET:
fernet = Fernet(key)
encrypted = fernet.encrypt(plaintext)
else:
# Fallback : base64 simple (NON sécurisé)
encrypted = base64.urlsafe_b64encode(plaintext)
return salt + encrypted
def _decrypt(self, encrypted_data: bytes) -> bytes:
"""Déchiffre les données.
Args:
encrypted_data: Bytes chiffrés (salt + données Fernet).
Returns:
Données déchiffrées.
Raises:
ValueError: Si le mot de passe est incorrect ou les données corrompues.
"""
if len(encrypted_data) < SALT_SIZE:
raise ValueError("Fichier vault corrompu (trop court)")
salt = encrypted_data[:SALT_SIZE]
ciphertext = encrypted_data[SALT_SIZE:]
key = self._derive_key(self._master_password, salt)
if _HAS_FERNET:
try:
fernet = Fernet(key)
return fernet.decrypt(ciphertext)
except InvalidToken:
raise ValueError(
"Mot de passe maître incorrect ou fichier vault corrompu"
)
else:
# Fallback : base64 simple
return base64.urlsafe_b64decode(ciphertext)
# =========================================================================
# Chargement
# =========================================================================
def _load(self) -> None:
"""Charge et déchiffre le vault depuis le disque."""
try:
encrypted_data = self._vault_path.read_bytes()
plaintext = self._decrypt(encrypted_data)
self._data = json.loads(plaintext.decode("utf-8"))
logger.info(
"Vault chargé : %s (%d apps)",
self._vault_path,
len(self._data.get("credentials", {})),
)
except (ValueError, json.JSONDecodeError) as e:
raise ValueError(f"Impossible de charger le vault : {e}") from e