#!/usr/bin/env python3 """ Script de correction des problèmes de logs corrompus et screenshots manquants. Problèmes identifiés : 1. Logs avec noms de fichiers corrompus (caractères Unicode invalides) 2. Screenshots capturés localement mais pas uploadés vers le serveur 3. Système de logging défaillant dans core/monitoring/logger.py Corrections appliquées : 1. Nettoyage des logs corrompus 2. Correction du système de logging 3. Amélioration de l'upload pour inclure les screenshots """ import os import shutil import logging import json from pathlib import Path from typing import List, Dict, Any def clean_corrupted_logs(): """Nettoie les fichiers de logs avec noms corrompus.""" logs_dir = Path("logs") if not logs_dir.exists(): return print("🧹 Nettoyage des logs corrompus...") # Compter les fichiers corrompus corrupted_count = 0 valid_logs = [] for log_file in logs_dir.iterdir(): if log_file.is_file() and log_file.suffix == '.log': filename = log_file.name # Vérifier si le nom contient des caractères corrompus try: # Tenter d'encoder/décoder pour détecter les caractères invalides filename.encode('ascii') # Si on arrive ici, le nom est valide valid_logs.append(log_file) except UnicodeEncodeError: # Nom corrompu corrupted_count += 1 try: log_file.unlink() print(f" ❌ Supprimé : {filename[:50]}...") except Exception as e: print(f" ⚠️ Erreur suppression {filename[:20]}... : {e}") print(f"✅ Nettoyage terminé : {corrupted_count} logs corrompus supprimés, {len(valid_logs)} logs valides conservés") def fix_logger_system(): """Corrige le système de logging pour éviter les noms corrompus.""" logger_file = Path("core/monitoring/logger.py") if not logger_file.exists(): print("❌ Fichier core/monitoring/logger.py non trouvé") return print("🔧 Correction du système de logging...") # Lire le contenu actuel with open(logger_file, 'r', encoding='utf-8') as f: content = f.read() # Ajouter une fonction de nettoyage des noms de composants fixed_content = content.replace( 'def __init__(self, component: str, log_file: Optional[str] = None):', '''def __init__(self, component: str, log_file: Optional[str] = None):''' ) # Ajouter la fonction de nettoyage au début de la classe if '_clean_component_name' not in content: fixed_content = fixed_content.replace( 'class RPALogger:', '''class RPALogger: """Logger centralisé pour le système RPA.""" @staticmethod def _clean_component_name(component: str) -> str: """ Nettoie le nom du composant pour éviter les caractères corrompus. Args: component: Nom du composant brut Returns: Nom nettoyé sans caractères problématiques """ if not component: return "unknown" # Remplacer les caractères non-ASCII par des underscores cleaned = "" for char in component: if char.isalnum() or char in ['_', '-', '.']: cleaned += char else: cleaned += "_" # Limiter la longueur et éviter les noms vides cleaned = cleaned[:50].strip('_') return cleaned or "unknown"''' ) # Utiliser le nom nettoyé fixed_content = fixed_content.replace( 'self.component = component', 'self.component = self._clean_component_name(component)' ) fixed_content = fixed_content.replace( 'self.log_file = log_file or f"logs/{component}.log"', 'self.log_file = log_file or f"logs/{self.component}.log"' ) # Sauvegarder les corrections with open(logger_file, 'w', encoding='utf-8') as f: f.write(fixed_content) print("✅ Système de logging corrigé") def analyze_upload_issue(): """Analyse le problème d'upload des screenshots.""" print("🔍 Analyse du problème d'upload des screenshots...") # Vérifier les sessions locales de l'agent agent_sessions = Path("agent_v0/sessions") if not agent_sessions.exists(): print("❌ Répertoire agent_v0/sessions non trouvé") return # Compter les sessions avec screenshots sessions_with_screenshots = 0 total_screenshots = 0 for session_dir in agent_sessions.iterdir(): if session_dir.is_dir() and session_dir.name.startswith('sess_'): shots_dir = session_dir / "shots" if shots_dir.exists(): screenshot_count = len(list(shots_dir.glob("*.png"))) if screenshot_count > 0: sessions_with_screenshots += 1 total_screenshots += screenshot_count print(f" 📸 {session_dir.name}: {screenshot_count} screenshots") print(f"📊 Résumé local : {sessions_with_screenshots} sessions avec {total_screenshots} screenshots") # Vérifier les sessions uploadées sur le serveur server_sessions = Path("data/training/sessions") if server_sessions.exists(): uploaded_screenshots = 0 for session_file in server_sessions.rglob("*.json"): try: with open(session_file, 'r') as f: session_data = json.load(f) screenshot_count = len(session_data.get('screenshots', [])) if screenshot_count > 0: # Vérifier si les fichiers PNG existent session_dir = session_file.parent actual_screenshots = len(list(session_dir.rglob("*.png"))) uploaded_screenshots += actual_screenshots if actual_screenshots != screenshot_count: print(f" ⚠️ {session_file.name}: {screenshot_count} référencés, {actual_screenshots} fichiers") except Exception as e: print(f" ❌ Erreur lecture {session_file}: {e}") print(f"📊 Résumé serveur : {uploaded_screenshots} screenshots uploadés") if total_screenshots > uploaded_screenshots: print(f"🚨 PROBLÈME IDENTIFIÉ : {total_screenshots - uploaded_screenshots} screenshots manquants sur le serveur") return True return False def create_upload_fix(): """Crée un script de correction pour l'upload des screenshots.""" print("🔧 Création du script de correction d'upload...") fix_script = '''#!/usr/bin/env python3 """ Script de correction pour l'upload des screenshots manquants. Ce script : 1. Identifie les sessions avec screenshots non uploadés 2. Recrée les archives ZIP avec les screenshots inclus 3. Relance l'upload vers le serveur """ import os import json import zipfile from pathlib import Path import sys # Ajouter le répertoire agent_v0 au path sys.path.insert(0, 'agent_v0') from uploader import upload_session_zip def fix_session_upload(session_dir: Path) -> bool: """ Corrige l'upload d'une session en incluant les screenshots. Args: session_dir: Répertoire de la session Returns: True si correction réussie """ session_id = session_dir.name json_file = session_dir / f"{session_id}.json" shots_dir = session_dir / "shots" if not json_file.exists(): print(f"❌ JSON manquant pour {session_id}") return False if not shots_dir.exists() or not list(shots_dir.glob("*.png")): print(f"⚠️ Pas de screenshots pour {session_id}") return False # Créer un nouveau ZIP avec screenshots zip_path = session_dir.parent / f"{session_id}_fixed.zip" try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: # Ajouter le JSON zf.write(json_file, f"{session_id}.json") # Ajouter tous les screenshots for screenshot in shots_dir.glob("*.png"): zf.write(screenshot, f"shots/{screenshot.name}") print(f"📦 Archive créée : {zip_path}") # Uploader la nouvelle archive if upload_session_zip(str(zip_path), session_id): print(f"✅ Upload réussi pour {session_id}") # Nettoyer l'archive temporaire zip_path.unlink() return True else: print(f"❌ Upload échoué pour {session_id}") return False except Exception as e: print(f"❌ Erreur création ZIP pour {session_id}: {e}") return False def main(): """Point d'entrée principal.""" print("🚀 Correction des uploads de screenshots...") agent_sessions = Path("agent_v0/sessions") if not agent_sessions.exists(): print("❌ Répertoire agent_v0/sessions non trouvé") return fixed_count = 0 total_count = 0 for session_dir in agent_sessions.iterdir(): if session_dir.is_dir() and session_dir.name.startswith('sess_'): total_count += 1 if fix_session_upload(session_dir): fixed_count += 1 print(f"📊 Résultat : {fixed_count}/{total_count} sessions corrigées") if __name__ == "__main__": main() ''' with open("fix_screenshot_upload.py", 'w', encoding='utf-8') as f: f.write(fix_script) os.chmod("fix_screenshot_upload.py", 0o755) print("✅ Script fix_screenshot_upload.py créé") def main(): """Point d'entrée principal.""" print("🚀 CORRECTION DES PROBLÈMES DE LOGS ET SCREENSHOTS") print("=" * 60) # 1. Nettoyer les logs corrompus clean_corrupted_logs() print() # 2. Corriger le système de logging fix_logger_system() print() # 3. Analyser le problème d'upload has_upload_issue = analyze_upload_issue() print() # 4. Créer le script de correction si nécessaire if has_upload_issue: create_upload_fix() print() print("🎯 PROCHAINES ÉTAPES :") print("1. Exécuter : python fix_screenshot_upload.py") print("2. Vérifier le dashboard pour confirmer les screenshots") print("3. Redémarrer l'agent si nécessaire") else: print("✅ Pas de problème d'upload détecté") print() print("✅ DIAGNOSTIC ET CORRECTIONS TERMINÉS") if __name__ == "__main__": main()