585 lines
21 KiB
Python
585 lines
21 KiB
Python
"""
|
|
Logger chiffré pour RPA Vision V2
|
|
Gère la journalisation sécurisée avec chiffrement AES-256 de toutes les actions,
|
|
corrections et transitions de mode du système.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import padding, hashes, hmac
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
import secrets
|
|
|
|
try:
|
|
from .config import get_data_paths, get_security_config
|
|
except ImportError:
|
|
# Pour tests standalone
|
|
import sys
|
|
from pathlib import Path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from config import get_data_paths, get_security_config
|
|
|
|
|
|
class Logger:
|
|
"""
|
|
Logger chiffré avec AES-256-CBC pour journalisation sécurisée
|
|
|
|
Attributes:
|
|
log_dir: Répertoire de stockage des logs
|
|
key_path: Chemin du fichier de clé de chiffrement
|
|
encryption_key: Clé AES-256 (32 bytes)
|
|
hmac_key: Clé HMAC pour vérification d'intégrité
|
|
"""
|
|
|
|
def __init__(self, log_dir: Optional[str] = None, key_path: Optional[str] = None):
|
|
"""
|
|
Initialise le logger avec génération ou chargement des clés AES
|
|
|
|
Args:
|
|
log_dir: Répertoire pour les logs (utilise config par défaut si None)
|
|
key_path: Chemin du fichier de clés (utilise config par défaut si None)
|
|
"""
|
|
# Configuration des chemins
|
|
data_paths = get_data_paths()
|
|
self.log_dir = Path(log_dir) if log_dir else Path(data_paths["logs"])
|
|
self.key_path = Path(key_path) if key_path else Path(data_paths["encryption_keys"])
|
|
|
|
# Créer les répertoires si nécessaire
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
self.key_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Charger ou générer les clés de chiffrement
|
|
self.encryption_key, self.hmac_key = self._load_or_generate_keys()
|
|
|
|
# Configuration de sécurité
|
|
self.security_config = get_security_config()
|
|
|
|
def _load_or_generate_keys(self) -> tuple[bytes, bytes]:
|
|
"""
|
|
Charge les clés existantes ou en génère de nouvelles
|
|
|
|
Returns:
|
|
Tuple (encryption_key, hmac_key)
|
|
"""
|
|
key_file = self.key_path / "encryption.key"
|
|
|
|
if key_file.exists():
|
|
# Charger les clés existantes
|
|
with open(key_file, 'rb') as f:
|
|
key_data = f.read()
|
|
# Les 32 premiers bytes sont la clé de chiffrement
|
|
# Les 32 suivants sont la clé HMAC
|
|
encryption_key = key_data[:32]
|
|
hmac_key = key_data[32:64]
|
|
else:
|
|
# Générer de nouvelles clés
|
|
encryption_key = secrets.token_bytes(32) # 256 bits
|
|
hmac_key = secrets.token_bytes(32) # 256 bits
|
|
|
|
# S'assurer que le répertoire parent existe
|
|
key_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Sauvegarder les clés de manière sécurisée
|
|
with open(key_file, 'wb') as f:
|
|
f.write(encryption_key + hmac_key)
|
|
|
|
# Restreindre les permissions (lecture/écriture propriétaire uniquement)
|
|
os.chmod(key_file, 0o600)
|
|
|
|
return encryption_key, hmac_key
|
|
|
|
def encrypt_entry(self, data: Dict[str, Any]) -> bytes:
|
|
"""
|
|
Chiffre une entrée de log avec AES-256-CBC
|
|
|
|
Args:
|
|
data: Dictionnaire contenant les données à chiffrer
|
|
|
|
Returns:
|
|
Données chiffrées (IV + ciphertext + HMAC)
|
|
"""
|
|
# Convertir les données en JSON
|
|
json_data = json.dumps(data, ensure_ascii=False, default=str)
|
|
plaintext = json_data.encode('utf-8')
|
|
|
|
# Générer un IV aléatoire (16 bytes pour AES)
|
|
iv = secrets.token_bytes(16)
|
|
|
|
# Padding PKCS7
|
|
padder = padding.PKCS7(128).padder()
|
|
padded_data = padder.update(plaintext) + padder.finalize()
|
|
|
|
# Chiffrement AES-256-CBC
|
|
cipher = Cipher(
|
|
algorithms.AES(self.encryption_key),
|
|
modes.CBC(iv),
|
|
backend=default_backend()
|
|
)
|
|
encryptor = cipher.encryptor()
|
|
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
|
|
|
|
# Calculer HMAC pour vérification d'intégrité
|
|
h = hmac.HMAC(self.hmac_key, hashes.SHA256(), backend=default_backend())
|
|
h.update(iv + ciphertext)
|
|
mac = h.finalize()
|
|
|
|
# Retourner IV + ciphertext + HMAC
|
|
return iv + ciphertext + mac
|
|
|
|
def decrypt_entry(self, encrypted_data: bytes) -> Dict[str, Any]:
|
|
"""
|
|
Déchiffre une entrée de log
|
|
|
|
Args:
|
|
encrypted_data: Données chiffrées (IV + ciphertext + HMAC)
|
|
|
|
Returns:
|
|
Dictionnaire contenant les données déchiffrées
|
|
|
|
Raises:
|
|
ValueError: Si la vérification HMAC échoue
|
|
"""
|
|
# Extraire IV, ciphertext et HMAC
|
|
iv = encrypted_data[:16]
|
|
mac = encrypted_data[-32:]
|
|
ciphertext = encrypted_data[16:-32]
|
|
|
|
# Vérifier l'intégrité avec HMAC
|
|
h = hmac.HMAC(self.hmac_key, hashes.SHA256(), backend=default_backend())
|
|
h.update(iv + ciphertext)
|
|
try:
|
|
h.verify(mac)
|
|
except Exception:
|
|
raise ValueError("HMAC verification failed - data may be corrupted or tampered")
|
|
|
|
# Déchiffrement AES-256-CBC
|
|
cipher = Cipher(
|
|
algorithms.AES(self.encryption_key),
|
|
modes.CBC(iv),
|
|
backend=default_backend()
|
|
)
|
|
decryptor = cipher.decryptor()
|
|
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
|
|
|
|
# Retirer le padding PKCS7
|
|
unpadder = padding.PKCS7(128).unpadder()
|
|
plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()
|
|
|
|
# Convertir JSON en dictionnaire
|
|
json_data = plaintext.decode('utf-8')
|
|
return json.loads(json_data)
|
|
|
|
def _get_log_file_path(self, date: Optional[datetime] = None) -> Path:
|
|
"""
|
|
Génère le chemin du fichier de log pour une date donnée
|
|
|
|
Args:
|
|
date: Date pour le fichier de log (utilise aujourd'hui si None)
|
|
|
|
Returns:
|
|
Chemin du fichier de log
|
|
"""
|
|
if date is None:
|
|
date = datetime.now()
|
|
|
|
# Format: logs_YYYY-MM-DD.enc
|
|
filename = f"logs_{date.strftime('%Y-%m-%d')}.enc"
|
|
return self.log_dir / filename
|
|
|
|
def _write_encrypted_entry(self, encrypted_data: bytes):
|
|
"""
|
|
Écrit une entrée chiffrée dans le fichier de log du jour
|
|
|
|
Args:
|
|
encrypted_data: Données chiffrées à écrire
|
|
"""
|
|
log_file = self._get_log_file_path()
|
|
|
|
# Encoder en base64 pour stockage texte
|
|
encoded_data = base64.b64encode(encrypted_data).decode('ascii')
|
|
|
|
# Ajouter au fichier (mode append)
|
|
with open(log_file, 'a') as f:
|
|
f.write(encoded_data + '\n')
|
|
|
|
def log_action(self, action_data: Dict[str, Any]):
|
|
"""
|
|
Enregistre une action (sans chiffrement pour MVP)
|
|
|
|
Args:
|
|
action_data: Dictionnaire contenant les données de l'action
|
|
"""
|
|
# Ajouter métadonnées système
|
|
log_entry = {
|
|
"type": "action",
|
|
"timestamp": datetime.now().isoformat(),
|
|
**action_data
|
|
}
|
|
|
|
# Écrire en JSON simple (pas de chiffrement pour MVP)
|
|
self._write_plain_entry(log_entry)
|
|
|
|
def _write_plain_entry(self, entry: Dict[str, Any]):
|
|
"""Écrit une entrée en JSON simple (MVP - pas de chiffrement)."""
|
|
log_file = self.log_dir / f"logs_{datetime.now().strftime('%Y-%m-%d')}.json"
|
|
|
|
# Écrire en mode append
|
|
with open(log_file, 'a') as f:
|
|
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
|
|
|
|
def log_correction(self, correction_data: Dict[str, Any]):
|
|
"""
|
|
Enregistre une correction utilisateur (sans chiffrement pour MVP)
|
|
|
|
Args:
|
|
correction_data: Dictionnaire contenant les données de correction
|
|
Champs attendus:
|
|
- task_id: Identifiant de la tâche
|
|
- incorrect_element: Élément incorrectement détecté
|
|
- correct_element: Élément correct fourni par l'utilisateur
|
|
- incorrect_bbox: Bounding box incorrecte
|
|
- correct_bbox: Bounding box correcte
|
|
- window: Titre de la fenêtre
|
|
- mode: Mode opérationnel au moment de la correction
|
|
"""
|
|
# Ajouter métadonnées système
|
|
log_entry = {
|
|
"type": "correction",
|
|
"timestamp": datetime.now().isoformat(),
|
|
**correction_data
|
|
}
|
|
|
|
# Écrire en JSON simple (pas de chiffrement pour MVP)
|
|
self._write_plain_entry(log_entry)
|
|
|
|
def log_mode_transition(self, task_id: str, from_mode: str, to_mode: str, reason: str):
|
|
"""
|
|
Enregistre une transition de mode pour une tâche (sans chiffrement pour MVP)
|
|
|
|
Args:
|
|
task_id: Identifiant de la tâche
|
|
from_mode: Mode d'origine ("shadow", "assist", "auto")
|
|
to_mode: Nouveau mode ("shadow", "assist", "auto")
|
|
reason: Raison de la transition (ex: "low_confidence", "threshold_met")
|
|
"""
|
|
log_entry = {
|
|
"type": "mode_transition",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"task_id": task_id,
|
|
"from_mode": from_mode,
|
|
"to_mode": to_mode,
|
|
"reason": reason
|
|
}
|
|
|
|
# Écrire en JSON simple (pas de chiffrement pour MVP)
|
|
self._write_plain_entry(log_entry)
|
|
|
|
def log_security_event(self, event_data: Dict[str, Any]):
|
|
"""
|
|
Enregistre un événement de sécurité (ex: violation de liste blanche)
|
|
|
|
Args:
|
|
event_data: Dictionnaire contenant les données de l'événement
|
|
Champs attendus:
|
|
- event_type: Type d'événement ("whitelist_violation", "rollback", etc.)
|
|
- window: Titre de la fenêtre concernée
|
|
- action_attempted: Action tentée
|
|
- details: Détails additionnels
|
|
"""
|
|
log_entry = {
|
|
"type": "security_event",
|
|
"timestamp": datetime.now().isoformat(),
|
|
**event_data
|
|
}
|
|
|
|
# Chiffrer et écrire
|
|
encrypted = self.encrypt_entry(log_entry)
|
|
self._write_encrypted_entry(encrypted)
|
|
|
|
def get_logs(
|
|
self,
|
|
task_id: Optional[str] = None,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
log_type: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Interroge les logs avec filtres optionnels
|
|
|
|
Args:
|
|
task_id: Filtrer par identifiant de tâche
|
|
start_time: Filtrer par date de début
|
|
end_time: Filtrer par date de fin
|
|
log_type: Filtrer par type de log ("action", "correction", "mode_transition", "security_event")
|
|
|
|
Returns:
|
|
Liste des entrées de log correspondant aux critères
|
|
"""
|
|
results = []
|
|
|
|
# Déterminer les fichiers de log à lire
|
|
if start_time is None:
|
|
start_time = datetime.now() - timedelta(days=self.security_config["log_retention_days"])
|
|
if end_time is None:
|
|
end_time = datetime.now()
|
|
|
|
# Parcourir les jours dans la plage
|
|
current_date = start_time.date()
|
|
end_date = end_time.date()
|
|
|
|
while current_date <= end_date:
|
|
log_file = self._get_log_file_path(datetime.combine(current_date, datetime.min.time()))
|
|
|
|
if log_file.exists():
|
|
# Lire et déchiffrer les entrées
|
|
with open(log_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
# Décoder base64 et déchiffrer
|
|
encrypted_data = base64.b64decode(line)
|
|
entry = self.decrypt_entry(encrypted_data)
|
|
|
|
# Appliquer les filtres
|
|
entry_time = datetime.fromisoformat(entry["timestamp"])
|
|
|
|
if entry_time < start_time or entry_time > end_time:
|
|
continue
|
|
|
|
if task_id and entry.get("task_id") != task_id:
|
|
continue
|
|
|
|
if log_type and entry.get("type") != log_type:
|
|
continue
|
|
|
|
results.append(entry)
|
|
|
|
except Exception as e:
|
|
# Logger l'erreur mais continuer le traitement
|
|
print(f"Warning: Failed to decrypt log entry: {e}")
|
|
continue
|
|
|
|
# Passer au jour suivant
|
|
current_date += timedelta(days=1)
|
|
|
|
# Trier par timestamp
|
|
results.sort(key=lambda x: x["timestamp"])
|
|
|
|
return results
|
|
|
|
def cleanup_old_logs(self):
|
|
"""
|
|
Supprime les logs plus anciens que la période de rétention configurée
|
|
"""
|
|
retention_days = self.security_config["log_retention_days"]
|
|
cutoff_date = datetime.now() - timedelta(days=retention_days)
|
|
|
|
# Parcourir tous les fichiers de log
|
|
for log_file in self.log_dir.glob("logs_*.enc"):
|
|
try:
|
|
# Extraire la date du nom de fichier
|
|
date_str = log_file.stem.replace("logs_", "")
|
|
file_date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
|
|
# Supprimer si trop ancien
|
|
if file_date < cutoff_date:
|
|
log_file.unlink()
|
|
print(f"Deleted old log file: {log_file.name}")
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Failed to process log file {log_file.name}: {e}")
|
|
|
|
def get_task_statistics(self, task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Calcule des statistiques pour une tâche spécifique
|
|
|
|
Args:
|
|
task_id: Identifiant de la tâche
|
|
|
|
Returns:
|
|
Dictionnaire contenant les statistiques:
|
|
- total_actions: Nombre total d'actions
|
|
- success_count: Nombre d'actions réussies
|
|
- failed_count: Nombre d'actions échouées
|
|
- correction_count: Nombre de corrections
|
|
- mode_transitions: Liste des transitions de mode
|
|
- avg_confidence: Confiance moyenne
|
|
- last_execution: Dernière exécution
|
|
"""
|
|
# Récupérer tous les logs pour cette tâche
|
|
logs = self.get_logs(task_id=task_id)
|
|
|
|
stats = {
|
|
"task_id": task_id,
|
|
"total_actions": 0,
|
|
"success_count": 0,
|
|
"failed_count": 0,
|
|
"correction_count": 0,
|
|
"mode_transitions": [],
|
|
"avg_confidence": 0.0,
|
|
"last_execution": None
|
|
}
|
|
|
|
confidence_sum = 0.0
|
|
confidence_count = 0
|
|
|
|
for entry in logs:
|
|
entry_type = entry.get("type")
|
|
|
|
if entry_type == "action":
|
|
stats["total_actions"] += 1
|
|
|
|
result = entry.get("result")
|
|
if result == "success":
|
|
stats["success_count"] += 1
|
|
elif result == "failed":
|
|
stats["failed_count"] += 1
|
|
|
|
if "confidence" in entry:
|
|
confidence_sum += entry["confidence"]
|
|
confidence_count += 1
|
|
|
|
# Mettre à jour la dernière exécution
|
|
timestamp = entry.get("timestamp")
|
|
if timestamp:
|
|
if stats["last_execution"] is None or timestamp > stats["last_execution"]:
|
|
stats["last_execution"] = timestamp
|
|
|
|
elif entry_type == "correction":
|
|
stats["correction_count"] += 1
|
|
|
|
elif entry_type == "mode_transition":
|
|
stats["mode_transitions"].append({
|
|
"timestamp": entry.get("timestamp"),
|
|
"from_mode": entry.get("from_mode"),
|
|
"to_mode": entry.get("to_mode"),
|
|
"reason": entry.get("reason")
|
|
})
|
|
|
|
# Calculer la confiance moyenne
|
|
if confidence_count > 0:
|
|
stats["avg_confidence"] = confidence_sum / confidence_count
|
|
|
|
return stats
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Tests du logger
|
|
print("Test du Logger chiffré RPA Vision V2")
|
|
print("=" * 50)
|
|
|
|
# Créer un logger de test
|
|
import tempfile
|
|
import shutil
|
|
|
|
test_dir = tempfile.mkdtemp()
|
|
test_log_dir = os.path.join(test_dir, "logs")
|
|
test_key_dir = os.path.join(test_dir, "keys")
|
|
|
|
try:
|
|
logger = Logger(log_dir=test_log_dir, key_path=test_key_dir)
|
|
print(f"✓ Logger initialisé")
|
|
print(f" Log dir: {logger.log_dir}")
|
|
print(f" Key path: {logger.key_path}")
|
|
|
|
# Test 1: Log d'action
|
|
print("\n1. Test log_action:")
|
|
logger.log_action({
|
|
"window": "Dolibarr - Facturation",
|
|
"action": "click",
|
|
"element": "valider_button",
|
|
"bbox": [450, 320, 120, 40],
|
|
"confidence": 0.97,
|
|
"mode": "auto",
|
|
"result": "success",
|
|
"task_id": "ouvrir_facture_001"
|
|
})
|
|
print(" ✓ Action loggée")
|
|
|
|
# Test 2: Log de correction
|
|
print("\n2. Test log_correction:")
|
|
logger.log_correction({
|
|
"task_id": "ouvrir_facture_001",
|
|
"incorrect_element": "annuler_button",
|
|
"correct_element": "valider_button",
|
|
"incorrect_bbox": [350, 320, 120, 40],
|
|
"correct_bbox": [450, 320, 120, 40],
|
|
"window": "Dolibarr - Facturation",
|
|
"mode": "assist"
|
|
})
|
|
print(" ✓ Correction loggée")
|
|
|
|
# Test 3: Log de transition de mode
|
|
print("\n3. Test log_mode_transition:")
|
|
logger.log_mode_transition(
|
|
task_id="ouvrir_facture_001",
|
|
from_mode="assist",
|
|
to_mode="auto",
|
|
reason="threshold_met"
|
|
)
|
|
print(" ✓ Transition de mode loggée")
|
|
|
|
# Test 4: Log d'événement de sécurité
|
|
print("\n4. Test log_security_event:")
|
|
logger.log_security_event({
|
|
"event_type": "whitelist_violation",
|
|
"window": "Unknown Application",
|
|
"action_attempted": "click",
|
|
"details": "Window not in whitelist"
|
|
})
|
|
print(" ✓ Événement de sécurité loggé")
|
|
|
|
# Test 5: Récupération des logs
|
|
print("\n5. Test get_logs:")
|
|
all_logs = logger.get_logs()
|
|
print(f" ✓ {len(all_logs)} entrées récupérées")
|
|
|
|
# Filtrer par task_id
|
|
task_logs = logger.get_logs(task_id="ouvrir_facture_001")
|
|
print(f" ✓ {len(task_logs)} entrées pour task_id='ouvrir_facture_001'")
|
|
|
|
# Filtrer par type
|
|
action_logs = logger.get_logs(log_type="action")
|
|
print(f" ✓ {len(action_logs)} entrées de type 'action'")
|
|
|
|
# Test 6: Statistiques de tâche
|
|
print("\n6. Test get_task_statistics:")
|
|
stats = logger.get_task_statistics("ouvrir_facture_001")
|
|
print(f" ✓ Statistiques calculées:")
|
|
print(f" - Total actions: {stats['total_actions']}")
|
|
print(f" - Succès: {stats['success_count']}")
|
|
print(f" - Corrections: {stats['correction_count']}")
|
|
print(f" - Transitions: {len(stats['mode_transitions'])}")
|
|
print(f" - Confiance moyenne: {stats['avg_confidence']:.2f}")
|
|
|
|
# Test 7: Chiffrement/Déchiffrement
|
|
print("\n7. Test encrypt/decrypt:")
|
|
test_data = {
|
|
"test": "data",
|
|
"number": 42,
|
|
"nested": {"key": "value"}
|
|
}
|
|
encrypted = logger.encrypt_entry(test_data)
|
|
print(f" ✓ Données chiffrées ({len(encrypted)} bytes)")
|
|
|
|
decrypted = logger.decrypt_entry(encrypted)
|
|
print(f" ✓ Données déchiffrées")
|
|
assert decrypted == test_data, "Decryption mismatch!"
|
|
print(f" ✓ Vérification: données identiques")
|
|
|
|
print("\n✓ Tous les tests réussis!")
|
|
|
|
finally:
|
|
# Nettoyer les fichiers de test
|
|
shutil.rmtree(test_dir)
|
|
print(f"\n✓ Fichiers de test nettoyés")
|