""" Logger chiffré pour RPA Vision V2 Gère la journalisation sécurisée avec chiffrement AES-256 de toutes les actions, corrections et transitions de mode du système. """ import os import json import base64 from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, List, Optional from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding, hashes, hmac from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import secrets try: from .config import get_data_paths, get_security_config except ImportError: # Pour tests standalone import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from config import get_data_paths, get_security_config class Logger: """ Logger chiffré avec AES-256-CBC pour journalisation sécurisée Attributes: log_dir: Répertoire de stockage des logs key_path: Chemin du fichier de clé de chiffrement encryption_key: Clé AES-256 (32 bytes) hmac_key: Clé HMAC pour vérification d'intégrité """ def __init__(self, log_dir: Optional[str] = None, key_path: Optional[str] = None): """ Initialise le logger avec génération ou chargement des clés AES Args: log_dir: Répertoire pour les logs (utilise config par défaut si None) key_path: Chemin du fichier de clés (utilise config par défaut si None) """ # Configuration des chemins data_paths = get_data_paths() self.log_dir = Path(log_dir) if log_dir else Path(data_paths["logs"]) self.key_path = Path(key_path) if key_path else Path(data_paths["encryption_keys"]) # Créer les répertoires si nécessaire self.log_dir.mkdir(parents=True, exist_ok=True) self.key_path.parent.mkdir(parents=True, exist_ok=True) # Charger ou générer les clés de chiffrement self.encryption_key, self.hmac_key = self._load_or_generate_keys() # Configuration de sécurité self.security_config = get_security_config() def _load_or_generate_keys(self) -> tuple[bytes, bytes]: """ Charge les clés existantes ou en génère de nouvelles Returns: Tuple (encryption_key, hmac_key) """ key_file = self.key_path / "encryption.key" if key_file.exists(): # Charger les clés existantes with open(key_file, 'rb') as f: key_data = f.read() # Les 32 premiers bytes sont la clé de chiffrement # Les 32 suivants sont la clé HMAC encryption_key = key_data[:32] hmac_key = key_data[32:64] else: # Générer de nouvelles clés encryption_key = secrets.token_bytes(32) # 256 bits hmac_key = secrets.token_bytes(32) # 256 bits # S'assurer que le répertoire parent existe key_file.parent.mkdir(parents=True, exist_ok=True) # Sauvegarder les clés de manière sécurisée with open(key_file, 'wb') as f: f.write(encryption_key + hmac_key) # Restreindre les permissions (lecture/écriture propriétaire uniquement) os.chmod(key_file, 0o600) return encryption_key, hmac_key def encrypt_entry(self, data: Dict[str, Any]) -> bytes: """ Chiffre une entrée de log avec AES-256-CBC Args: data: Dictionnaire contenant les données à chiffrer Returns: Données chiffrées (IV + ciphertext + HMAC) """ # Convertir les données en JSON json_data = json.dumps(data, ensure_ascii=False, default=str) plaintext = json_data.encode('utf-8') # Générer un IV aléatoire (16 bytes pour AES) iv = secrets.token_bytes(16) # Padding PKCS7 padder = padding.PKCS7(128).padder() padded_data = padder.update(plaintext) + padder.finalize() # Chiffrement AES-256-CBC cipher = Cipher( algorithms.AES(self.encryption_key), modes.CBC(iv), backend=default_backend() ) encryptor = cipher.encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() # Calculer HMAC pour vérification d'intégrité h = hmac.HMAC(self.hmac_key, hashes.SHA256(), backend=default_backend()) h.update(iv + ciphertext) mac = h.finalize() # Retourner IV + ciphertext + HMAC return iv + ciphertext + mac def decrypt_entry(self, encrypted_data: bytes) -> Dict[str, Any]: """ Déchiffre une entrée de log Args: encrypted_data: Données chiffrées (IV + ciphertext + HMAC) Returns: Dictionnaire contenant les données déchiffrées Raises: ValueError: Si la vérification HMAC échoue """ # Extraire IV, ciphertext et HMAC iv = encrypted_data[:16] mac = encrypted_data[-32:] ciphertext = encrypted_data[16:-32] # Vérifier l'intégrité avec HMAC h = hmac.HMAC(self.hmac_key, hashes.SHA256(), backend=default_backend()) h.update(iv + ciphertext) try: h.verify(mac) except Exception: raise ValueError("HMAC verification failed - data may be corrupted or tampered") # Déchiffrement AES-256-CBC cipher = Cipher( algorithms.AES(self.encryption_key), modes.CBC(iv), backend=default_backend() ) decryptor = cipher.decryptor() padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Retirer le padding PKCS7 unpadder = padding.PKCS7(128).unpadder() plaintext = unpadder.update(padded_plaintext) + unpadder.finalize() # Convertir JSON en dictionnaire json_data = plaintext.decode('utf-8') return json.loads(json_data) def _get_log_file_path(self, date: Optional[datetime] = None) -> Path: """ Génère le chemin du fichier de log pour une date donnée Args: date: Date pour le fichier de log (utilise aujourd'hui si None) Returns: Chemin du fichier de log """ if date is None: date = datetime.now() # Format: logs_YYYY-MM-DD.enc filename = f"logs_{date.strftime('%Y-%m-%d')}.enc" return self.log_dir / filename def _write_encrypted_entry(self, encrypted_data: bytes): """ Écrit une entrée chiffrée dans le fichier de log du jour Args: encrypted_data: Données chiffrées à écrire """ log_file = self._get_log_file_path() # Encoder en base64 pour stockage texte encoded_data = base64.b64encode(encrypted_data).decode('ascii') # Ajouter au fichier (mode append) with open(log_file, 'a') as f: f.write(encoded_data + '\n') def log_action(self, action_data: Dict[str, Any]): """ Enregistre une action (sans chiffrement pour MVP) Args: action_data: Dictionnaire contenant les données de l'action """ # Ajouter métadonnées système log_entry = { "type": "action", "timestamp": datetime.now().isoformat(), **action_data } # Écrire en JSON simple (pas de chiffrement pour MVP) self._write_plain_entry(log_entry) def _write_plain_entry(self, entry: Dict[str, Any]): """Écrit une entrée en JSON simple (MVP - pas de chiffrement).""" log_file = self.log_dir / f"logs_{datetime.now().strftime('%Y-%m-%d')}.json" # Écrire en mode append with open(log_file, 'a') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') def log_correction(self, correction_data: Dict[str, Any]): """ Enregistre une correction utilisateur (sans chiffrement pour MVP) Args: correction_data: Dictionnaire contenant les données de correction Champs attendus: - task_id: Identifiant de la tâche - incorrect_element: Élément incorrectement détecté - correct_element: Élément correct fourni par l'utilisateur - incorrect_bbox: Bounding box incorrecte - correct_bbox: Bounding box correcte - window: Titre de la fenêtre - mode: Mode opérationnel au moment de la correction """ # Ajouter métadonnées système log_entry = { "type": "correction", "timestamp": datetime.now().isoformat(), **correction_data } # Écrire en JSON simple (pas de chiffrement pour MVP) self._write_plain_entry(log_entry) def log_mode_transition(self, task_id: str, from_mode: str, to_mode: str, reason: str): """ Enregistre une transition de mode pour une tâche (sans chiffrement pour MVP) Args: task_id: Identifiant de la tâche from_mode: Mode d'origine ("shadow", "assist", "auto") to_mode: Nouveau mode ("shadow", "assist", "auto") reason: Raison de la transition (ex: "low_confidence", "threshold_met") """ log_entry = { "type": "mode_transition", "timestamp": datetime.now().isoformat(), "task_id": task_id, "from_mode": from_mode, "to_mode": to_mode, "reason": reason } # Écrire en JSON simple (pas de chiffrement pour MVP) self._write_plain_entry(log_entry) def log_security_event(self, event_data: Dict[str, Any]): """ Enregistre un événement de sécurité (ex: violation de liste blanche) Args: event_data: Dictionnaire contenant les données de l'événement Champs attendus: - event_type: Type d'événement ("whitelist_violation", "rollback", etc.) - window: Titre de la fenêtre concernée - action_attempted: Action tentée - details: Détails additionnels """ log_entry = { "type": "security_event", "timestamp": datetime.now().isoformat(), **event_data } # Chiffrer et écrire encrypted = self.encrypt_entry(log_entry) self._write_encrypted_entry(encrypted) def get_logs( self, task_id: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, log_type: Optional[str] = None ) -> List[Dict[str, Any]]: """ Interroge les logs avec filtres optionnels Args: task_id: Filtrer par identifiant de tâche start_time: Filtrer par date de début end_time: Filtrer par date de fin log_type: Filtrer par type de log ("action", "correction", "mode_transition", "security_event") Returns: Liste des entrées de log correspondant aux critères """ results = [] # Déterminer les fichiers de log à lire if start_time is None: start_time = datetime.now() - timedelta(days=self.security_config["log_retention_days"]) if end_time is None: end_time = datetime.now() # Parcourir les jours dans la plage current_date = start_time.date() end_date = end_time.date() while current_date <= end_date: log_file = self._get_log_file_path(datetime.combine(current_date, datetime.min.time())) if log_file.exists(): # Lire et déchiffrer les entrées with open(log_file, 'r') as f: for line in f: line = line.strip() if not line: continue try: # Décoder base64 et déchiffrer encrypted_data = base64.b64decode(line) entry = self.decrypt_entry(encrypted_data) # Appliquer les filtres entry_time = datetime.fromisoformat(entry["timestamp"]) if entry_time < start_time or entry_time > end_time: continue if task_id and entry.get("task_id") != task_id: continue if log_type and entry.get("type") != log_type: continue results.append(entry) except Exception as e: # Logger l'erreur mais continuer le traitement print(f"Warning: Failed to decrypt log entry: {e}") continue # Passer au jour suivant current_date += timedelta(days=1) # Trier par timestamp results.sort(key=lambda x: x["timestamp"]) return results def cleanup_old_logs(self): """ Supprime les logs plus anciens que la période de rétention configurée """ retention_days = self.security_config["log_retention_days"] cutoff_date = datetime.now() - timedelta(days=retention_days) # Parcourir tous les fichiers de log for log_file in self.log_dir.glob("logs_*.enc"): try: # Extraire la date du nom de fichier date_str = log_file.stem.replace("logs_", "") file_date = datetime.strptime(date_str, "%Y-%m-%d") # Supprimer si trop ancien if file_date < cutoff_date: log_file.unlink() print(f"Deleted old log file: {log_file.name}") except Exception as e: print(f"Warning: Failed to process log file {log_file.name}: {e}") def get_task_statistics(self, task_id: str) -> Dict[str, Any]: """ Calcule des statistiques pour une tâche spécifique Args: task_id: Identifiant de la tâche Returns: Dictionnaire contenant les statistiques: - total_actions: Nombre total d'actions - success_count: Nombre d'actions réussies - failed_count: Nombre d'actions échouées - correction_count: Nombre de corrections - mode_transitions: Liste des transitions de mode - avg_confidence: Confiance moyenne - last_execution: Dernière exécution """ # Récupérer tous les logs pour cette tâche logs = self.get_logs(task_id=task_id) stats = { "task_id": task_id, "total_actions": 0, "success_count": 0, "failed_count": 0, "correction_count": 0, "mode_transitions": [], "avg_confidence": 0.0, "last_execution": None } confidence_sum = 0.0 confidence_count = 0 for entry in logs: entry_type = entry.get("type") if entry_type == "action": stats["total_actions"] += 1 result = entry.get("result") if result == "success": stats["success_count"] += 1 elif result == "failed": stats["failed_count"] += 1 if "confidence" in entry: confidence_sum += entry["confidence"] confidence_count += 1 # Mettre à jour la dernière exécution timestamp = entry.get("timestamp") if timestamp: if stats["last_execution"] is None or timestamp > stats["last_execution"]: stats["last_execution"] = timestamp elif entry_type == "correction": stats["correction_count"] += 1 elif entry_type == "mode_transition": stats["mode_transitions"].append({ "timestamp": entry.get("timestamp"), "from_mode": entry.get("from_mode"), "to_mode": entry.get("to_mode"), "reason": entry.get("reason") }) # Calculer la confiance moyenne if confidence_count > 0: stats["avg_confidence"] = confidence_sum / confidence_count return stats if __name__ == "__main__": # Tests du logger print("Test du Logger chiffré RPA Vision V2") print("=" * 50) # Créer un logger de test import tempfile import shutil test_dir = tempfile.mkdtemp() test_log_dir = os.path.join(test_dir, "logs") test_key_dir = os.path.join(test_dir, "keys") try: logger = Logger(log_dir=test_log_dir, key_path=test_key_dir) print(f"✓ Logger initialisé") print(f" Log dir: {logger.log_dir}") print(f" Key path: {logger.key_path}") # Test 1: Log d'action print("\n1. Test log_action:") logger.log_action({ "window": "Dolibarr - Facturation", "action": "click", "element": "valider_button", "bbox": [450, 320, 120, 40], "confidence": 0.97, "mode": "auto", "result": "success", "task_id": "ouvrir_facture_001" }) print(" ✓ Action loggée") # Test 2: Log de correction print("\n2. Test log_correction:") logger.log_correction({ "task_id": "ouvrir_facture_001", "incorrect_element": "annuler_button", "correct_element": "valider_button", "incorrect_bbox": [350, 320, 120, 40], "correct_bbox": [450, 320, 120, 40], "window": "Dolibarr - Facturation", "mode": "assist" }) print(" ✓ Correction loggée") # Test 3: Log de transition de mode print("\n3. Test log_mode_transition:") logger.log_mode_transition( task_id="ouvrir_facture_001", from_mode="assist", to_mode="auto", reason="threshold_met" ) print(" ✓ Transition de mode loggée") # Test 4: Log d'événement de sécurité print("\n4. Test log_security_event:") logger.log_security_event({ "event_type": "whitelist_violation", "window": "Unknown Application", "action_attempted": "click", "details": "Window not in whitelist" }) print(" ✓ Événement de sécurité loggé") # Test 5: Récupération des logs print("\n5. Test get_logs:") all_logs = logger.get_logs() print(f" ✓ {len(all_logs)} entrées récupérées") # Filtrer par task_id task_logs = logger.get_logs(task_id="ouvrir_facture_001") print(f" ✓ {len(task_logs)} entrées pour task_id='ouvrir_facture_001'") # Filtrer par type action_logs = logger.get_logs(log_type="action") print(f" ✓ {len(action_logs)} entrées de type 'action'") # Test 6: Statistiques de tâche print("\n6. Test get_task_statistics:") stats = logger.get_task_statistics("ouvrir_facture_001") print(f" ✓ Statistiques calculées:") print(f" - Total actions: {stats['total_actions']}") print(f" - Succès: {stats['success_count']}") print(f" - Corrections: {stats['correction_count']}") print(f" - Transitions: {len(stats['mode_transitions'])}") print(f" - Confiance moyenne: {stats['avg_confidence']:.2f}") # Test 7: Chiffrement/Déchiffrement print("\n7. Test encrypt/decrypt:") test_data = { "test": "data", "number": 42, "nested": {"key": "value"} } encrypted = logger.encrypt_entry(test_data) print(f" ✓ Données chiffrées ({len(encrypted)} bytes)") decrypted = logger.decrypt_entry(encrypted) print(f" ✓ Données déchiffrées") assert decrypted == test_data, "Decryption mismatch!" print(f" ✓ Vérification: données identiques") print("\n✓ Tous les tests réussis!") finally: # Nettoyer les fichiers de test shutil.rmtree(test_dir) print(f"\n✓ Fichiers de test nettoyés")