""" Coffre-fort chiffré pour les credentials d'authentification. Stocke de façon sécurisée : - Comptes de service (login/password) - Seeds TOTP pour la 2FA - Tokens de session - Certificats client Le fichier vault est chiffré avec Fernet (AES-128-CBC + HMAC-SHA256). La clé est dérivée d'un mot de passe maître via PBKDF2 (600000 itérations). Choix de sécurité : - PBKDF2 avec 600 000 itérations : recommandation OWASP 2023 pour SHA-256. Compromis acceptable entre temps de dérivation (~0.5s) et résistance au brute-force. - Fernet (AES-128-CBC + HMAC-SHA256) : chiffrement authentifié, empêche les modifications silencieuses du fichier vault. Bibliothèque maintenue et auditée. - Salt aléatoire de 16 bytes : empêche les attaques par rainbow table. Stocké en clair en préfixe du fichier (le salt n'est pas un secret). """ import base64 import json import logging import os import warnings from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # Types de credentials supportés CREDENTIAL_TYPES = {"login", "totp_seed", "session_token", "certificate"} # Taille du salt en bytes SALT_SIZE = 16 # Nombre d'itérations PBKDF2 — recommandation OWASP 2023 pour SHA-256 PBKDF2_ITERATIONS = 600_000 # Tentative d'import de cryptography pour le chiffrement Fernet _HAS_FERNET = False try: from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC _HAS_FERNET = True except ImportError: _HAS_FERNET = False warnings.warn( "Module 'cryptography' non disponible. Le vault utilisera un encodage " "base64 NON SÉCURISÉ. NE PAS utiliser en production.", stacklevel=2, ) class CredentialVault: """Coffre-fort chiffré pour les credentials d'applications. Usage : vault = CredentialVault("/chemin/vault.enc", "mot_de_passe_maitre") vault.add_credential("DPI_Crossway", "login", { "username": "robot_lea", "password": "xxx", "domain": "HOPITAL" }) vault.save() creds = vault.get_credential("DPI_Crossway", "login") """ def __init__(self, vault_path: str, master_password: str): """Charge ou crée un vault chiffré. Args: vault_path: Chemin du fichier vault chiffré sur disque. master_password: Mot de passe maître pour dériver la clé de chiffrement. """ self._vault_path = Path(vault_path) self._master_password = master_password self._data: Dict[str, Any] = { "version": "1.0", "created_at": datetime.now(timezone.utc).isoformat(), "credentials": {}, } if self._vault_path.exists(): self._load() else: logger.info("Vault inexistant, création d'un nouveau vault : %s", vault_path) # ========================================================================= # API publique # ========================================================================= def add_credential( self, app_name: str, credential_type: str, data: Dict[str, Any] ) -> None: """Ajoute ou met à jour un credential pour une application. Args: app_name: Nom de l'application (ex: "DPI_Crossway"). credential_type: Type parmi "login", "totp_seed", "session_token", "certificate". data: Dictionnaire avec les champs spécifiques au type. Raises: ValueError: Si le credential_type n'est pas supporté. """ if credential_type not in CREDENTIAL_TYPES: raise ValueError( f"Type de credential invalide : {credential_type!r}. " f"Types supportés : {CREDENTIAL_TYPES}" ) if app_name not in self._data["credentials"]: self._data["credentials"][app_name] = {} self._data["credentials"][app_name][credential_type] = data logger.info( "Credential ajouté : app=%s type=%s", app_name, credential_type ) def get_credential( self, app_name: str, credential_type: str ) -> Optional[Dict[str, Any]]: """Récupère un credential pour une application. Args: app_name: Nom de l'application. credential_type: Type de credential recherché. Returns: Dictionnaire du credential, ou None si non trouvé. """ app_creds = self._data["credentials"].get(app_name, {}) return app_creds.get(credential_type) def remove_credential(self, app_name: str, credential_type: str) -> bool: """Supprime un credential. Args: app_name: Nom de l'application. credential_type: Type de credential à supprimer. Returns: True si supprimé, False si non trouvé. """ app_creds = self._data["credentials"].get(app_name, {}) if credential_type in app_creds: del app_creds[credential_type] # Nettoyer l'app si plus de credentials if not app_creds: del self._data["credentials"][app_name] logger.info( "Credential supprimé : app=%s type=%s", app_name, credential_type ) return True return False def list_apps(self) -> List[str]: """Liste les noms d'applications configurées. Returns: Liste triée des noms d'applications. """ return sorted(self._data["credentials"].keys()) def list_credential_types(self, app_name: str) -> List[str]: """Liste les types de credentials pour une application. Args: app_name: Nom de l'application. Returns: Liste des types de credentials configurés. """ return list(self._data["credentials"].get(app_name, {}).keys()) def save(self) -> None: """Chiffre et sauvegarde le vault sur disque.""" plaintext = json.dumps(self._data, ensure_ascii=False, indent=2).encode("utf-8") encrypted = self._encrypt(plaintext) # Écriture atomique via fichier temporaire tmp_path = self._vault_path.with_suffix(".tmp") self._vault_path.parent.mkdir(parents=True, exist_ok=True) tmp_path.write_bytes(encrypted) tmp_path.rename(self._vault_path) logger.info("Vault sauvegardé : %s (%d bytes)", self._vault_path, len(encrypted)) # ========================================================================= # Chiffrement / Déchiffrement # ========================================================================= def _derive_key(self, password: str, salt: bytes) -> bytes: """Dérive une clé Fernet à partir du mot de passe maître. Utilise PBKDF2-HMAC-SHA256 avec 600 000 itérations (OWASP 2023). La sortie est encodée en base64 URL-safe pour Fernet (32 bytes → 44 chars). Args: password: Mot de passe maître. salt: Salt aléatoire (16 bytes minimum). Returns: Clé Fernet encodée en base64 URL-safe (44 bytes). """ if _HAS_FERNET: kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=PBKDF2_ITERATIONS, ) key = base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8"))) return key else: # Fallback non sécurisé — simple hash pour le développement import hashlib dk = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS ) return base64.urlsafe_b64encode(dk) def _encrypt(self, plaintext: bytes) -> bytes: """Chiffre les données avec Fernet (ou base64 en fallback). Format du fichier vault : [16 bytes salt][données chiffrées Fernet] Args: plaintext: Données en clair à chiffrer. Returns: Bytes chiffrés avec le salt en préfixe. """ salt = os.urandom(SALT_SIZE) key = self._derive_key(self._master_password, salt) if _HAS_FERNET: fernet = Fernet(key) encrypted = fernet.encrypt(plaintext) else: # Fallback : base64 simple (NON sécurisé) encrypted = base64.urlsafe_b64encode(plaintext) return salt + encrypted def _decrypt(self, encrypted_data: bytes) -> bytes: """Déchiffre les données. Args: encrypted_data: Bytes chiffrés (salt + données Fernet). Returns: Données déchiffrées. Raises: ValueError: Si le mot de passe est incorrect ou les données corrompues. """ if len(encrypted_data) < SALT_SIZE: raise ValueError("Fichier vault corrompu (trop court)") salt = encrypted_data[:SALT_SIZE] ciphertext = encrypted_data[SALT_SIZE:] key = self._derive_key(self._master_password, salt) if _HAS_FERNET: try: fernet = Fernet(key) return fernet.decrypt(ciphertext) except InvalidToken: raise ValueError( "Mot de passe maître incorrect ou fichier vault corrompu" ) else: # Fallback : base64 simple return base64.urlsafe_b64decode(ciphertext) # ========================================================================= # Chargement # ========================================================================= def _load(self) -> None: """Charge et déchiffre le vault depuis le disque.""" try: encrypted_data = self._vault_path.read_bytes() plaintext = self._decrypt(encrypted_data) self._data = json.loads(plaintext.decode("utf-8")) logger.info( "Vault chargé : %s (%d apps)", self._vault_path, len(self._data.get("credentials", {})), ) except (ValueError, json.JSONDecodeError) as e: raise ValueError(f"Impossible de charger le vault : {e}") from e