feat: replay visuel VLM-first, worker séparé, package Léa, AZERTY, sécurité HTTPS

Pipeline replay visuel :
- VLM-first : l'agent appelle Ollama directement pour trouver les éléments
- Template matching en fallback (seuil strict 0.90)
- Stop immédiat si élément non trouvé (pas de clic blind)
- Replay depuis session brute (/replay-session) sans attendre le VLM
- Vérification post-action (screenshot hash avant/après)
- Gestion des popups (Enter/Escape/Tab+Enter)

Worker VLM séparé :
- run_worker.py : process distinct du serveur HTTP
- Communication par fichiers (_worker_queue.txt + _replay_active.lock)
- Le serveur HTTP ne fait plus jamais de VLM → toujours réactif
- Service systemd rpa-worker.service

Capture clavier :
- raw_keys (vk + press/release) pour replay exact indépendant du layout
- Fix AZERTY : ToUnicodeEx + AltGr detection
- Enter capturé comme \n, Tab comme \t
- Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites)
- Fusion text_input consécutifs, dédup key_combo

Sécurité & Internet :
- HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design)
- Token API fixe dans .env.local
- HTTP Basic Auth sur VWB
- Security headers (HSTS, CSP, nosniff)
- CORS domaines publics, plus de wildcard

Infrastructure :
- DPI awareness (SetProcessDpiAwareness) Python + Rust
- Métadonnées système (dpi_scale, window_bounds, monitors, os_theme)
- Template matching multi-scale [0.5, 2.0]
- Résolution dynamique (plus de hardcode 1920x1080)
- VLM prefill fix (47x speedup, 3.5s au lieu de 180s)

Modules :
- core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler
- core/federation/ : LearningPack export/import anonymisé, FAISS global
- deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt)

UX :
- Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant)
- Bibliothèque persistante (cache local + SQLite)
- Clustering hybride (titre fenêtre + DBSCAN)
- EdgeConstraints + PostConditions peuplés
- GraphBuilder compound actions (toutes les frappes)

Agent Rust :
- Token Bearer auth (network.rs)
- sysinfo.rs (DPI, résolution, window bounds via Win32 API)
- config.txt lu automatiquement
- Support Chrome/Brave/Firefox (pas que Edge)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-26 10:19:18 +01:00
parent fe5e0ba83d
commit d5deac3029
162 changed files with 25669 additions and 557 deletions

6
core/auth/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
# core/auth — Module d'authentification automatique pour Léa
#
# Fournit :
# - CredentialVault : coffre-fort chiffré pour les credentials
# - TOTPGenerator : générateur TOTP RFC 6238 (sans dépendance externe)
# - AuthHandler : détection d'écrans d'auth et injection automatique

523
core/auth/auth_handler.py Normal file
View File

@@ -0,0 +1,523 @@
"""
Gestionnaire d'authentification automatique pendant le replay.
Détecte les écrans d'authentification et injecte les credentials appropriés.
Fonctionne avec le ScreenState du core pipeline et le CredentialVault chiffré.
Stratégie de détection :
1. Analyse OCR : cherche des patterns textuels indicatifs d'un écran d'auth
("mot de passe", "identifiant", "code de vérification", etc.)
2. Analyse UI : cherche des éléments sémantiques typiques (champ password,
bouton "Se connecter", etc.)
3. Identification de l'application : via window_title du ScreenState
La confiance est calculée selon le nombre de signaux détectés :
- 1 signal = 0.3 (faible)
- 2 signaux = 0.6 (moyen)
- 3+ signaux = 0.85+ (élevé)
"""
import logging
import re
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from .credential_vault import CredentialVault
from .totp_generator import TOTPGenerator
logger = logging.getLogger(__name__)
# =========================================================================
# Patterns de détection d'écrans d'authentification
# =========================================================================
# Patterns OCR (texte visible sur l'écran) — FR + EN pour support bilingue
_AUTH_TEXT_PATTERNS = [
# Français
r"mot\s+de\s+passe",
r"identifiant",
r"nom\s+d'utilisateur",
r"connexion",
r"se\s+connecter",
r"authentification",
r"code\s+de\s+v[ée]rification",
r"code\s+otp",
r"double\s+authentification",
r"v[ée]rification\s+en\s+deux\s+[ée]tapes",
# Anglais
r"password",
r"username",
r"sign\s+in",
r"log\s*in",
r"verification\s+code",
r"two.factor",
r"2fa",
r"one.time\s+password",
r"enter\s+your\s+code",
]
# Patterns pour identifier spécifiquement un écran TOTP/2FA
_TOTP_TEXT_PATTERNS = [
r"code\s+de\s+v[ée]rification",
r"code\s+otp",
r"double\s+authentification",
r"v[ée]rification\s+en\s+deux",
r"two.factor",
r"2fa",
r"one.time\s+password",
r"enter\s+your\s+code",
r"code\s+[àa]\s+\d+\s+chiffres",
r"authenticator",
]
# Libellés de boutons de validation
_SUBMIT_BUTTON_PATTERNS = [
r"se\s+connecter",
r"connexion",
r"valider",
r"envoyer",
r"confirmer",
r"sign\s+in",
r"log\s*in",
r"submit",
r"verify",
r"ok",
]
# Compilations pour performance
_AUTH_REGEXES = [re.compile(p, re.IGNORECASE) for p in _AUTH_TEXT_PATTERNS]
_TOTP_REGEXES = [re.compile(p, re.IGNORECASE) for p in _TOTP_TEXT_PATTERNS]
_SUBMIT_REGEXES = [re.compile(p, re.IGNORECASE) for p in _SUBMIT_BUTTON_PATTERNS]
@dataclass
class AuthRequest:
"""Requête d'authentification détectée sur un écran.
Attributes:
auth_type: Type d'authentification détecté ("login", "totp", "login_and_totp").
app_name: Application identifiée (depuis window_title).
detected_fields: Champs détectés sur l'écran (positions, types).
confidence: Confiance de la détection (0.0 à 1.0).
"""
auth_type: str # "login", "totp", "login_and_totp"
app_name: str # App identifiée (depuis window_title)
detected_fields: Dict[str, Any] = field(default_factory=dict)
confidence: float = 0.0
class AuthHandler:
"""Gestionnaire d'authentification automatique pour le replay.
Analyse les ScreenStates pour détecter les écrans d'authentification
et génère les actions de replay correspondantes.
Usage :
handler = AuthHandler(vault)
auth_req = handler.detect_auth_screen(screen_state)
if auth_req:
actions = handler.get_auth_actions(auth_req)
# Injecter les actions dans la queue de replay
"""
def __init__(self, vault: CredentialVault):
"""Initialise le gestionnaire d'authentification.
Args:
vault: Instance du coffre-fort de credentials.
"""
self._vault = vault
def detect_auth_screen(self, screen_state: Any) -> Optional[AuthRequest]:
"""Analyse un ScreenState pour détecter un écran d'authentification.
La détection combine plusieurs signaux :
- Textes OCR correspondant à des patterns d'auth
- Éléments UI de type password/text_input
- Boutons de validation ("Se connecter", "Valider")
Args:
screen_state: ScreenState du core pipeline (ou dict compatible).
Returns:
AuthRequest si un écran d'auth est détecté avec confiance > 0.3,
None sinon.
"""
# Extraire les textes détectés et les éléments UI
texts = self._extract_texts(screen_state)
ui_elements = self._extract_ui_elements(screen_state)
app_name = self._extract_app_name(screen_state)
# Compteur de signaux de détection
signals: Dict[str, Any] = {}
# Signal 1 : Patterns textuels d'authentification
auth_text_matches = []
for text in texts:
for regex in _AUTH_REGEXES:
if regex.search(text):
auth_text_matches.append(regex.pattern)
if auth_text_matches:
signals["auth_text"] = auth_text_matches
# Signal 2 : Patterns textuels TOTP/2FA
totp_text_matches = []
for text in texts:
for regex in _TOTP_REGEXES:
if regex.search(text):
totp_text_matches.append(regex.pattern)
if totp_text_matches:
signals["totp_text"] = totp_text_matches
# Signal 3 : Champs UI de type password
password_fields = []
username_fields = []
submit_buttons = []
otp_fields = []
for elem in ui_elements:
elem_type = self._get_elem_attr(elem, "type", "")
elem_role = self._get_elem_attr(elem, "role", "")
elem_label = self._get_elem_attr(elem, "label", "").lower()
elem_tags = self._get_elem_attr(elem, "tags", [])
# Champ mot de passe
if elem_role == "password" or "password" in elem_tags:
password_fields.append(elem)
elif elem_type == "text_input" and any(
p in elem_label for p in ("mot de passe", "password", "mdp")
):
password_fields.append(elem)
# Champ identifiant/username
if elem_type == "text_input" and any(
p in elem_label
for p in ("identifiant", "username", "utilisateur", "login", "email", "e-mail")
):
username_fields.append(elem)
# Champ OTP
if elem_type == "text_input" and any(
p in elem_label for p in ("code", "otp", "vérification", "verification")
):
otp_fields.append(elem)
# Bouton de validation
if elem_type == "button":
for regex in _SUBMIT_REGEXES:
if regex.search(elem_label):
submit_buttons.append(elem)
break
if password_fields:
signals["password_field"] = len(password_fields)
if username_fields:
signals["username_field"] = len(username_fields)
if submit_buttons:
signals["submit_button"] = len(submit_buttons)
if otp_fields:
signals["otp_field"] = len(otp_fields)
# Pas assez de signaux → pas d'écran d'auth
if not signals:
return None
# Déterminer le type d'auth
# Les signaux textuels "auth_text" peuvent contenir des patterns ambigus
# (ex: "2fa" apparaît dans les deux listes). On ne compte comme signal
# login que les patterns auth_text qui ne sont PAS aussi des patterns TOTP.
auth_only_text = set(signals.get("auth_text", [])) - set(signals.get("totp_text", []))
has_login_signals = bool(
password_fields
or auth_only_text
or username_fields
)
has_totp_signals = bool(
otp_fields
or "totp_text" in signals
)
if has_login_signals and has_totp_signals:
auth_type = "login_and_totp"
elif has_totp_signals:
auth_type = "totp"
else:
auth_type = "login"
# Calculer la confiance (nombre de signaux distincts)
num_signals = len(signals)
if num_signals >= 4:
confidence = 0.95
elif num_signals >= 3:
confidence = 0.85
elif num_signals >= 2:
confidence = 0.6
else:
confidence = 0.3
# Construire les champs détectés
detected_fields: Dict[str, Any] = {}
if username_fields:
detected_fields["username_field"] = self._elem_to_dict(username_fields[0])
if password_fields:
detected_fields["password_field"] = self._elem_to_dict(password_fields[0])
if otp_fields:
detected_fields["otp_field"] = self._elem_to_dict(otp_fields[0])
if submit_buttons:
detected_fields["submit_button"] = self._elem_to_dict(submit_buttons[0])
auth_request = AuthRequest(
auth_type=auth_type,
app_name=app_name,
detected_fields=detected_fields,
confidence=confidence,
)
logger.info(
"Écran d'authentification détecté : type=%s app=%s confiance=%.2f signaux=%s",
auth_type,
app_name,
confidence,
list(signals.keys()),
)
return auth_request
def get_auth_actions(self, auth_request: AuthRequest) -> List[Dict[str, Any]]:
"""Génère les actions de replay pour s'authentifier.
Produit une séquence d'actions que l'Agent V1 peut exécuter :
- click sur le champ username, type le login
- click sur le champ password, type le mot de passe
- (optionnel) type le code TOTP
- click sur le bouton de validation
Args:
auth_request: Requête d'authentification détectée.
Returns:
Liste d'actions de replay (format compatible avec la queue de replay).
Liste vide si les credentials ne sont pas trouvés dans le vault.
"""
actions: List[Dict[str, Any]] = []
app_name = auth_request.app_name
fields = auth_request.detected_fields
# Générer un préfixe unique pour les action_ids
prefix = f"auth_{uuid.uuid4().hex[:6]}"
# ---- Login : username + password ----
if auth_request.auth_type in ("login", "login_and_totp"):
login_creds = self._vault.get_credential(app_name, "login")
if not login_creds:
logger.warning(
"Pas de credential 'login' pour l'app '%s' dans le vault",
app_name,
)
return []
# Action : cliquer sur le champ username et taper
username_field = fields.get("username_field")
if username_field:
actions.append({
"action_id": f"{prefix}_click_username",
"type": "click",
"target": username_field.get("center", [0, 0]),
"description": f"Clic champ identifiant ({app_name})",
"_auth_action": True,
})
actions.append({
"action_id": f"{prefix}_type_username",
"type": "type_text",
"text": login_creds.get("username", ""),
"description": f"Saisie identifiant ({app_name})",
"_auth_action": True,
})
# Action : cliquer sur le champ password et taper
password_field = fields.get("password_field")
if password_field:
actions.append({
"action_id": f"{prefix}_click_password",
"type": "click",
"target": password_field.get("center", [0, 0]),
"description": f"Clic champ mot de passe ({app_name})",
"_auth_action": True,
})
actions.append({
"action_id": f"{prefix}_type_password",
"type": "type_text",
"text": login_creds.get("password", ""),
"description": f"Saisie mot de passe ({app_name})",
"_auth_action": True,
})
# ---- TOTP : générer et taper le code ----
if auth_request.auth_type in ("totp", "login_and_totp"):
totp_creds = self._vault.get_credential(app_name, "totp_seed")
if not totp_creds:
logger.warning(
"Pas de credential 'totp_seed' pour l'app '%s' dans le vault",
app_name,
)
# On continue quand même si le login a été fait
if not actions:
return []
else:
totp = TOTPGenerator(
secret=totp_creds["secret"],
digits=totp_creds.get("digits", 6),
interval=totp_creds.get("interval", 30),
algorithm=totp_creds.get("algorithm", "SHA1"),
)
# Attendre si le code expire dans moins de 5 secondes
remaining = totp.time_remaining()
if remaining < 5:
actions.append({
"action_id": f"{prefix}_wait_totp",
"type": "wait",
"duration_ms": (remaining + 1) * 1000,
"reason": "attente_nouveau_code_totp",
"description": f"Attente nouveau code TOTP ({remaining}s restantes)",
"_auth_action": True,
})
code = totp.generate()
otp_field = fields.get("otp_field")
if otp_field:
actions.append({
"action_id": f"{prefix}_click_otp",
"type": "click",
"target": otp_field.get("center", [0, 0]),
"description": f"Clic champ OTP ({app_name})",
"_auth_action": True,
})
actions.append({
"action_id": f"{prefix}_type_totp",
"type": "type_text",
"text": code,
"description": f"Saisie code TOTP ({app_name})",
"_auth_action": True,
})
# ---- Bouton de validation ----
submit_button = fields.get("submit_button")
if submit_button and actions:
actions.append({
"action_id": f"{prefix}_click_submit",
"type": "click",
"target": submit_button.get("center", [0, 0]),
"description": f"Clic validation ({app_name})",
"_auth_action": True,
})
# Pause après validation pour laisser l'app charger
if actions:
actions.append({
"action_id": f"{prefix}_wait_after_auth",
"type": "wait",
"duration_ms": 2000,
"reason": "attente_chargement_post_auth",
"description": f"Attente post-authentification ({app_name})",
"_auth_action": True,
})
logger.info(
"Actions d'authentification générées : %d actions pour %s (type=%s)",
len(actions),
app_name,
auth_request.auth_type,
)
return actions
# =========================================================================
# Méthodes d'extraction internes
# =========================================================================
def _extract_texts(self, screen_state: Any) -> List[str]:
"""Extrait tous les textes détectés depuis un ScreenState.
Supporte les objets ScreenState du core et les dicts bruts.
"""
texts: List[str] = []
# ScreenState core (dataclass)
if hasattr(screen_state, "perception") and hasattr(
screen_state.perception, "detected_text"
):
texts.extend(screen_state.perception.detected_text)
# Dict brut (sessions streaming)
elif isinstance(screen_state, dict):
perception = screen_state.get("perception", {})
if isinstance(perception, dict):
texts.extend(perception.get("detected_text", []))
# Texte OCR brut
if "ocr_text" in screen_state:
texts.append(screen_state["ocr_text"])
# Textes des éléments UI
for elem in screen_state.get("ui_elements", []):
label = elem.get("label", "")
if label:
texts.append(label)
# Textes des éléments UI (objets)
if hasattr(screen_state, "ui_elements"):
for elem in screen_state.ui_elements:
label = self._get_elem_attr(elem, "label", "")
if label:
texts.append(label)
return texts
def _extract_ui_elements(self, screen_state: Any) -> List[Any]:
"""Extrait les éléments UI depuis un ScreenState."""
if hasattr(screen_state, "ui_elements"):
return list(screen_state.ui_elements)
if isinstance(screen_state, dict):
return screen_state.get("ui_elements", [])
return []
def _extract_app_name(self, screen_state: Any) -> str:
"""Extrait le nom de l'application depuis un ScreenState."""
# ScreenState core
if hasattr(screen_state, "window") and hasattr(screen_state.window, "app_name"):
return screen_state.window.app_name
# Dict brut
if isinstance(screen_state, dict):
window = screen_state.get("window", {})
if isinstance(window, dict):
return window.get("app_name", "unknown")
return "unknown"
@staticmethod
def _get_elem_attr(elem: Any, attr: str, default: Any = None) -> Any:
"""Récupère un attribut d'un élément UI (objet ou dict)."""
if isinstance(elem, dict):
return elem.get(attr, default)
return getattr(elem, attr, default)
@staticmethod
def _elem_to_dict(elem: Any) -> Dict[str, Any]:
"""Convertit un élément UI en dict minimal pour les detected_fields."""
if isinstance(elem, dict):
return {
"type": elem.get("type", ""),
"label": elem.get("label", ""),
"center": elem.get("center", [0, 0]),
"element_id": elem.get("element_id", ""),
}
return {
"type": getattr(elem, "type", ""),
"label": getattr(elem, "label", ""),
"center": list(getattr(elem, "center", (0, 0))),
"element_id": getattr(elem, "element_id", ""),
}

View File

@@ -0,0 +1,298 @@
"""
Coffre-fort chiffré pour les credentials d'authentification.
Stocke de façon sécurisée :
- Comptes de service (login/password)
- Seeds TOTP pour la 2FA
- Tokens de session
- Certificats client
Le fichier vault est chiffré avec Fernet (AES-128-CBC + HMAC-SHA256).
La clé est dérivée d'un mot de passe maître via PBKDF2 (600000 itérations).
Choix de sécurité :
- PBKDF2 avec 600 000 itérations : recommandation OWASP 2023 pour SHA-256.
Compromis acceptable entre temps de dérivation (~0.5s) et résistance au brute-force.
- Fernet (AES-128-CBC + HMAC-SHA256) : chiffrement authentifié, empêche les
modifications silencieuses du fichier vault. Bibliothèque maintenue et auditée.
- Salt aléatoire de 16 bytes : empêche les attaques par rainbow table.
Stocké en clair en préfixe du fichier (le salt n'est pas un secret).
"""
import base64
import json
import logging
import os
import warnings
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Types de credentials supportés
CREDENTIAL_TYPES = {"login", "totp_seed", "session_token", "certificate"}
# Taille du salt en bytes
SALT_SIZE = 16
# Nombre d'itérations PBKDF2 — recommandation OWASP 2023 pour SHA-256
PBKDF2_ITERATIONS = 600_000
# Tentative d'import de cryptography pour le chiffrement Fernet
_HAS_FERNET = False
try:
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
_HAS_FERNET = True
except ImportError:
_HAS_FERNET = False
warnings.warn(
"Module 'cryptography' non disponible. Le vault utilisera un encodage "
"base64 NON SÉCURISÉ. NE PAS utiliser en production.",
stacklevel=2,
)
class CredentialVault:
"""Coffre-fort chiffré pour les credentials d'applications.
Usage :
vault = CredentialVault("/chemin/vault.enc", "mot_de_passe_maitre")
vault.add_credential("DPI_Crossway", "login", {
"username": "robot_lea", "password": "xxx", "domain": "HOPITAL"
})
vault.save()
creds = vault.get_credential("DPI_Crossway", "login")
"""
def __init__(self, vault_path: str, master_password: str):
"""Charge ou crée un vault chiffré.
Args:
vault_path: Chemin du fichier vault chiffré sur disque.
master_password: Mot de passe maître pour dériver la clé de chiffrement.
"""
self._vault_path = Path(vault_path)
self._master_password = master_password
self._data: Dict[str, Any] = {
"version": "1.0",
"created_at": datetime.now(timezone.utc).isoformat(),
"credentials": {},
}
if self._vault_path.exists():
self._load()
else:
logger.info("Vault inexistant, création d'un nouveau vault : %s", vault_path)
# =========================================================================
# API publique
# =========================================================================
def add_credential(
self, app_name: str, credential_type: str, data: Dict[str, Any]
) -> None:
"""Ajoute ou met à jour un credential pour une application.
Args:
app_name: Nom de l'application (ex: "DPI_Crossway").
credential_type: Type parmi "login", "totp_seed", "session_token", "certificate".
data: Dictionnaire avec les champs spécifiques au type.
Raises:
ValueError: Si le credential_type n'est pas supporté.
"""
if credential_type not in CREDENTIAL_TYPES:
raise ValueError(
f"Type de credential invalide : {credential_type!r}. "
f"Types supportés : {CREDENTIAL_TYPES}"
)
if app_name not in self._data["credentials"]:
self._data["credentials"][app_name] = {}
self._data["credentials"][app_name][credential_type] = data
logger.info(
"Credential ajouté : app=%s type=%s", app_name, credential_type
)
def get_credential(
self, app_name: str, credential_type: str
) -> Optional[Dict[str, Any]]:
"""Récupère un credential pour une application.
Args:
app_name: Nom de l'application.
credential_type: Type de credential recherché.
Returns:
Dictionnaire du credential, ou None si non trouvé.
"""
app_creds = self._data["credentials"].get(app_name, {})
return app_creds.get(credential_type)
def remove_credential(self, app_name: str, credential_type: str) -> bool:
"""Supprime un credential.
Args:
app_name: Nom de l'application.
credential_type: Type de credential à supprimer.
Returns:
True si supprimé, False si non trouvé.
"""
app_creds = self._data["credentials"].get(app_name, {})
if credential_type in app_creds:
del app_creds[credential_type]
# Nettoyer l'app si plus de credentials
if not app_creds:
del self._data["credentials"][app_name]
logger.info(
"Credential supprimé : app=%s type=%s", app_name, credential_type
)
return True
return False
def list_apps(self) -> List[str]:
"""Liste les noms d'applications configurées.
Returns:
Liste triée des noms d'applications.
"""
return sorted(self._data["credentials"].keys())
def list_credential_types(self, app_name: str) -> List[str]:
"""Liste les types de credentials pour une application.
Args:
app_name: Nom de l'application.
Returns:
Liste des types de credentials configurés.
"""
return list(self._data["credentials"].get(app_name, {}).keys())
def save(self) -> None:
"""Chiffre et sauvegarde le vault sur disque."""
plaintext = json.dumps(self._data, ensure_ascii=False, indent=2).encode("utf-8")
encrypted = self._encrypt(plaintext)
# Écriture atomique via fichier temporaire
tmp_path = self._vault_path.with_suffix(".tmp")
self._vault_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_bytes(encrypted)
tmp_path.rename(self._vault_path)
logger.info("Vault sauvegardé : %s (%d bytes)", self._vault_path, len(encrypted))
# =========================================================================
# Chiffrement / Déchiffrement
# =========================================================================
def _derive_key(self, password: str, salt: bytes) -> bytes:
"""Dérive une clé Fernet à partir du mot de passe maître.
Utilise PBKDF2-HMAC-SHA256 avec 600 000 itérations (OWASP 2023).
La sortie est encodée en base64 URL-safe pour Fernet (32 bytes → 44 chars).
Args:
password: Mot de passe maître.
salt: Salt aléatoire (16 bytes minimum).
Returns:
Clé Fernet encodée en base64 URL-safe (44 bytes).
"""
if _HAS_FERNET:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=PBKDF2_ITERATIONS,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))
return key
else:
# Fallback non sécurisé — simple hash pour le développement
import hashlib
dk = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS
)
return base64.urlsafe_b64encode(dk)
def _encrypt(self, plaintext: bytes) -> bytes:
"""Chiffre les données avec Fernet (ou base64 en fallback).
Format du fichier vault :
[16 bytes salt][données chiffrées Fernet]
Args:
plaintext: Données en clair à chiffrer.
Returns:
Bytes chiffrés avec le salt en préfixe.
"""
salt = os.urandom(SALT_SIZE)
key = self._derive_key(self._master_password, salt)
if _HAS_FERNET:
fernet = Fernet(key)
encrypted = fernet.encrypt(plaintext)
else:
# Fallback : base64 simple (NON sécurisé)
encrypted = base64.urlsafe_b64encode(plaintext)
return salt + encrypted
def _decrypt(self, encrypted_data: bytes) -> bytes:
"""Déchiffre les données.
Args:
encrypted_data: Bytes chiffrés (salt + données Fernet).
Returns:
Données déchiffrées.
Raises:
ValueError: Si le mot de passe est incorrect ou les données corrompues.
"""
if len(encrypted_data) < SALT_SIZE:
raise ValueError("Fichier vault corrompu (trop court)")
salt = encrypted_data[:SALT_SIZE]
ciphertext = encrypted_data[SALT_SIZE:]
key = self._derive_key(self._master_password, salt)
if _HAS_FERNET:
try:
fernet = Fernet(key)
return fernet.decrypt(ciphertext)
except InvalidToken:
raise ValueError(
"Mot de passe maître incorrect ou fichier vault corrompu"
)
else:
# Fallback : base64 simple
return base64.urlsafe_b64decode(ciphertext)
# =========================================================================
# Chargement
# =========================================================================
def _load(self) -> None:
"""Charge et déchiffre le vault depuis le disque."""
try:
encrypted_data = self._vault_path.read_bytes()
plaintext = self._decrypt(encrypted_data)
self._data = json.loads(plaintext.decode("utf-8"))
logger.info(
"Vault chargé : %s (%d apps)",
self._vault_path,
len(self._data.get("credentials", {})),
)
except (ValueError, json.JSONDecodeError) as e:
raise ValueError(f"Impossible de charger le vault : {e}") from e

213
core/auth/manage_vault.py Normal file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
CLI de gestion du coffre-fort de credentials (vault).
Usage :
# Ajouter un login
python -m core.auth.manage_vault --vault /path/to/vault.enc --action add \
--app "DPI_Crossway" --type login \
--username "robot_lea" --password "xxx"
# Ajouter un seed TOTP
python -m core.auth.manage_vault --vault /path/to/vault.enc --action add \
--app "DPI_Crossway" --type totp_seed \
--secret "JBSWY3DPEHPK3PXP"
# Lister les applications configurées
python -m core.auth.manage_vault --vault /path/to/vault.enc --action list
# Générer un code TOTP
python -m core.auth.manage_vault --vault /path/to/vault.enc --action generate-totp \
--app "DPI_Crossway"
# Supprimer un credential
python -m core.auth.manage_vault --vault /path/to/vault.enc --action remove \
--app "DPI_Crossway" --type login
Le mot de passe maître est demandé interactivement via getpass.
"""
import argparse
import getpass
import sys
from .credential_vault import CredentialVault
from .totp_generator import TOTPGenerator
def main():
parser = argparse.ArgumentParser(
description="Gestionnaire de coffre-fort de credentials pour Léa.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--vault",
required=True,
help="Chemin du fichier vault chiffré",
)
parser.add_argument(
"--action",
required=True,
choices=["add", "list", "remove", "generate-totp", "show"],
help="Action à effectuer",
)
parser.add_argument("--app", help="Nom de l'application")
parser.add_argument(
"--type",
dest="cred_type",
choices=["login", "totp_seed", "session_token", "certificate"],
help="Type de credential",
)
# Champs pour le type "login"
parser.add_argument("--username", help="Nom d'utilisateur (type login)")
parser.add_argument("--password", help="Mot de passe (type login)")
parser.add_argument("--domain", help="Domaine Windows (type login, optionnel)")
# Champs pour le type "totp_seed"
parser.add_argument("--secret", help="Secret base32 (type totp_seed)")
parser.add_argument(
"--digits", type=int, default=6, help="Nombre de chiffres TOTP (défaut: 6)"
)
parser.add_argument(
"--interval", type=int, default=30, help="Intervalle TOTP en secondes (défaut: 30)"
)
parser.add_argument(
"--algorithm", default="SHA1", help="Algorithme HMAC (défaut: SHA1)"
)
args = parser.parse_args()
# Demander le mot de passe maître
master_password = getpass.getpass("Mot de passe maître : ")
if not master_password:
print("Erreur : mot de passe maître requis.", file=sys.stderr)
sys.exit(1)
try:
vault = CredentialVault(args.vault, master_password)
except ValueError as e:
print(f"Erreur d'ouverture du vault : {e}", file=sys.stderr)
sys.exit(1)
# ---- Actions ----
if args.action == "list":
apps = vault.list_apps()
if not apps:
print("Vault vide — aucune application configurée.")
else:
print(f"Applications configurées ({len(apps)}) :")
for app in apps:
types = vault.list_credential_types(app)
print(f" {app} : {', '.join(types)}")
elif args.action == "add":
if not args.app:
print("Erreur : --app requis pour l'action 'add'.", file=sys.stderr)
sys.exit(1)
if not args.cred_type:
print("Erreur : --type requis pour l'action 'add'.", file=sys.stderr)
sys.exit(1)
if args.cred_type == "login":
if not args.username:
args.username = input("Username : ")
if not args.password:
args.password = getpass.getpass("Password : ")
data = {"username": args.username, "password": args.password}
if args.domain:
data["domain"] = args.domain
elif args.cred_type == "totp_seed":
if not args.secret:
args.secret = input("Secret base32 : ")
data = {
"secret": args.secret,
"digits": args.digits,
"interval": args.interval,
"algorithm": args.algorithm,
}
elif args.cred_type == "session_token":
token = input("Token de session : ")
data = {"token": token}
elif args.cred_type == "certificate":
cert_path = input("Chemin du certificat : ")
key_path = input("Chemin de la clé privée : ")
data = {"cert_path": cert_path, "key_path": key_path}
else:
print(f"Type non géré : {args.cred_type}", file=sys.stderr)
sys.exit(1)
vault.add_credential(args.app, args.cred_type, data)
vault.save()
print(f"Credential ajouté : {args.app} / {args.cred_type}")
elif args.action == "remove":
if not args.app or not args.cred_type:
print(
"Erreur : --app et --type requis pour l'action 'remove'.",
file=sys.stderr,
)
sys.exit(1)
removed = vault.remove_credential(args.app, args.cred_type)
if removed:
vault.save()
print(f"Credential supprimé : {args.app} / {args.cred_type}")
else:
print(f"Credential non trouvé : {args.app} / {args.cred_type}")
elif args.action == "generate-totp":
if not args.app:
print(
"Erreur : --app requis pour l'action 'generate-totp'.",
file=sys.stderr,
)
sys.exit(1)
totp_creds = vault.get_credential(args.app, "totp_seed")
if not totp_creds:
print(
f"Pas de seed TOTP configuré pour '{args.app}'.",
file=sys.stderr,
)
sys.exit(1)
totp = TOTPGenerator(
secret=totp_creds["secret"],
digits=totp_creds.get("digits", 6),
interval=totp_creds.get("interval", 30),
algorithm=totp_creds.get("algorithm", "SHA1"),
)
code = totp.generate()
remaining = totp.time_remaining()
print(f"Code TOTP : {code}")
print(f"Expire dans : {remaining}s")
elif args.action == "show":
if not args.app:
print(
"Erreur : --app requis pour l'action 'show'.",
file=sys.stderr,
)
sys.exit(1)
types = vault.list_credential_types(args.app)
if not types:
print(f"Aucun credential pour '{args.app}'.")
else:
print(f"Credentials pour '{args.app}' :")
for cred_type in types:
cred = vault.get_credential(args.app, cred_type)
# Masquer les mots de passe et secrets
display = {}
for k, v in (cred or {}).items():
if k in ("password", "secret", "token"):
display[k] = v[:3] + "***" if len(str(v)) > 3 else "***"
else:
display[k] = v
print(f" {cred_type} : {display}")
if __name__ == "__main__":
main()

183
core/auth/totp_generator.py Normal file
View File

@@ -0,0 +1,183 @@
"""
Générateur TOTP (Time-based One-Time Password) pour l'authentification 2FA.
Implémente RFC 6238 directement, sans dépendance externe.
Compatible avec FreeOTP, Google Authenticator, Microsoft Authenticator.
Algorithme (RFC 6238 / RFC 4226) :
1. Décoder le secret partagé depuis base32
2. Calculer le compteur temporel T = floor(unix_time / interval)
3. Encoder T en big-endian 8 bytes
4. Calculer HMAC-SHA1(secret, T) (ou SHA-256/SHA-512 selon config)
5. Extraction dynamique (dynamic truncation) :
- offset = dernier octet du HMAC & 0x0F
- extraire 4 bytes à partir de offset
- masquer le bit de signe (& 0x7FFFFFFF)
- modulo 10^digits pour obtenir le code
"""
import base64
import hashlib
import hmac
import logging
import struct
import time
logger = logging.getLogger(__name__)
# Mapping des algorithmes supportés
_HASH_ALGORITHMS = {
"SHA1": hashlib.sha1,
"SHA256": hashlib.sha256,
"SHA512": hashlib.sha512,
}
class TOTPGenerator:
"""Générateur de codes TOTP conformes à la RFC 6238.
Usage :
totp = TOTPGenerator("JBSWY3DPEHPK3PXP")
code = totp.generate() # "492039"
remaining = totp.time_remaining() # 17 (secondes)
valid = totp.verify("492039") # True
"""
def __init__(
self,
secret: str,
digits: int = 6,
interval: int = 30,
algorithm: str = "SHA1",
):
"""Initialise le générateur TOTP.
Args:
secret: Clé secrète encodée en base32 (standard TOTP).
digits: Nombre de chiffres du code (6 ou 8, défaut 6).
interval: Intervalle en secondes entre deux codes (défaut 30).
algorithm: Algorithme HMAC ("SHA1", "SHA256", "SHA512").
Raises:
ValueError: Si le secret n'est pas du base32 valide ou l'algorithme inconnu.
"""
# Normaliser et décoder le secret base32
# Les secrets TOTP peuvent contenir des espaces pour la lisibilité
clean_secret = secret.upper().replace(" ", "")
# Ajouter du padding base32 si nécessaire
padding = (8 - len(clean_secret) % 8) % 8
clean_secret += "=" * padding
try:
self._secret_bytes = base64.b32decode(clean_secret)
except Exception as e:
raise ValueError(f"Secret base32 invalide : {e}") from e
if algorithm.upper() not in _HASH_ALGORITHMS:
raise ValueError(
f"Algorithme non supporté : {algorithm!r}. "
f"Valeurs acceptées : {list(_HASH_ALGORITHMS.keys())}"
)
self._digits = digits
self._interval = interval
self._algorithm = algorithm.upper()
def generate(self, timestamp: float | None = None) -> str:
"""Génère le code TOTP pour l'instant présent (ou un timestamp donné).
Args:
timestamp: Timestamp Unix optionnel (pour les tests). Si None, utilise time.time().
Returns:
Code TOTP sous forme de chaîne zero-padded (ex: "003271").
"""
if timestamp is None:
timestamp = time.time()
counter = int(timestamp) // self._interval
return self._generate_hotp(counter)
def time_remaining(self) -> int:
"""Nombre de secondes avant expiration du code actuel.
Returns:
Secondes restantes (entre 1 et interval).
"""
return self._interval - (int(time.time()) % self._interval)
def verify(self, code: str, timestamp: float | None = None, window: int = 1) -> bool:
"""Vérifie un code TOTP avec une fenêtre de tolérance.
La fenêtre permet de compenser le décalage d'horloge entre client et serveur.
Avec window=1, on vérifie le code actuel, le précédent et le suivant.
Args:
code: Code TOTP à vérifier.
timestamp: Timestamp Unix optionnel.
window: Nombre d'intervalles de tolérance de chaque côté (défaut 1).
Returns:
True si le code correspond à un intervalle dans la fenêtre.
"""
if timestamp is None:
timestamp = time.time()
counter = int(timestamp) // self._interval
for offset in range(-window, window + 1):
check_counter = counter + offset
if check_counter < 0:
continue # Compteur négatif impossible
expected = self._generate_hotp(check_counter)
# Comparaison en temps constant pour éviter les timing attacks
if hmac.compare_digest(code, expected):
return True
return False
# =========================================================================
# Implémentation interne HOTP (RFC 4226)
# =========================================================================
def _generate_hotp(self, counter: int) -> str:
"""Génère un code HOTP pour un compteur donné.
Implémentation conforme à la RFC 4226 section 5.3 :
1. Encoder le compteur en big-endian 8 bytes
2. HMAC avec l'algorithme configuré
3. Truncation dynamique
4. Réduction modulo 10^digits
Args:
counter: Valeur du compteur (entier 64 bits).
Returns:
Code HOTP zero-padded.
"""
# Étape 1 : Compteur en big-endian 8 bytes
counter_bytes = struct.pack(">Q", counter)
# Étape 2 : HMAC
hash_func = _HASH_ALGORITHMS[self._algorithm]
hmac_digest = hmac.new(
self._secret_bytes, counter_bytes, hash_func
).digest()
# Étape 3 : Truncation dynamique (RFC 4226 section 5.4)
# L'offset est déterminé par les 4 bits de poids faible du dernier octet
offset = hmac_digest[-1] & 0x0F
# Extraire 4 bytes à partir de l'offset et masquer le bit de signe
truncated = (
((hmac_digest[offset] & 0x7F) << 24)
| ((hmac_digest[offset + 1] & 0xFF) << 16)
| ((hmac_digest[offset + 2] & 0xFF) << 8)
| (hmac_digest[offset + 3] & 0xFF)
)
# Étape 4 : Réduction modulo pour obtenir le nombre de chiffres voulu
code = truncated % (10 ** self._digits)
# Zero-padding pour garantir la longueur
return str(code).zfill(self._digits)