#!/usr/bin/env python3 """ Application Flask pour le Dashboard Web RPA Vision V3 Fonctionnalités: - Monitoring temps réel avec WebSocket - Gestion des workflows - Visualisation des sessions et screenshots - Graphiques de performance """ import os import sys import json import subprocess import threading import time from pathlib import Path from datetime import datetime from flask import Flask, render_template, jsonify, request, send_file, Response from flask_socketio import SocketIO, emit from prometheus_client import generate_latest, CONTENT_TYPE_LATEST # Ajouter le répertoire parent au path sys.path.insert(0, str(Path(__file__).parent.parent)) from core.persistence import StorageManager from core.models import RawSession from core.monitoring import get_logger, api_logger from core.monitoring.chain_manager import ChainManager from core.monitoring.trigger_manager import TriggerManager from core.monitoring.log_exporter import LogExporter from core.monitoring.automation_scheduler import AutomationScheduler # Modules pour backup et versioning from core.system.backup_exporter import get_backup_exporter from core.system.version_manager import get_version_manager app = Flask(__name__) app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-key-change-in-production') socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') @app.get('/healthz') def healthz(): """Healthcheck minimal (systemd/k8s).""" return jsonify({ 'status': 'ok', 'service': 'rpa-vision-v3-dashboard', 'timestamp': datetime.now().isoformat(), }) # Chemins BASE_PATH = Path(__file__).parent.parent DATA_PATH = BASE_PATH / "data" / "training" SESSIONS_PATH = DATA_PATH / "sessions" WORKFLOWS_PATH = DATA_PATH / "workflows" LOGS_PATH = BASE_PATH / "logs" # StorageManager storage = StorageManager(base_path=str(DATA_PATH)) # Monitoring managers chain_manager = ChainManager(storage_path=DATA_PATH / "chains") trigger_manager = TriggerManager(storage_path=DATA_PATH / "triggers") log_exporter = LogExporter(logs_path=LOGS_PATH) # Automation scheduler automation_scheduler = AutomationScheduler( trigger_manager=trigger_manager, chain_manager=chain_manager, check_interval=1.0 # Vérifier toutes les secondes ) # Démarrer le scheduler automatiquement automation_scheduler.start() api_logger.info("Automation scheduler started") # État global pour le monitoring temps réel execution_state = { "running": False, "workflow_id": None, "current_node": None, "steps_executed": 0, "last_confidence": 0.0, "mode": "idle", "history": [] } # Métriques de performance performance_metrics = { "faiss_searches": [], "embedding_times": [], "capture_times": [], "total_embeddings": 0, "cache_hits": 0, "cache_misses": 0 } # ============================================================================= # Processing Status Tracker # ============================================================================= PROCESSING_STATUS_FILE = DATA_PATH / "processing_status.json" def load_processing_status(): """Charge les statuts de traitement depuis le fichier.""" try: if PROCESSING_STATUS_FILE.exists(): with open(PROCESSING_STATUS_FILE, 'r') as f: return json.load(f) except Exception: pass return {} def save_processing_status(status_dict): """Sauvegarde les statuts de traitement.""" try: PROCESSING_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(PROCESSING_STATUS_FILE, 'w') as f: json.dump(status_dict, f, indent=2) except Exception as e: api_logger.error(f"Erreur sauvegarde processing status: {e}") def get_session_processing_status(session_id): """Retourne le statut de traitement d'une session.""" statuses = load_processing_status() return statuses.get(session_id, {"state": "pending", "processed_at": None}) def set_session_processing_status(session_id, state, stats=None, error=None): """Met à jour le statut de traitement d'une session.""" statuses = load_processing_status() statuses[session_id] = { "state": state, # pending, processing, completed, failed "updated_at": datetime.now().isoformat(), "processed_at": datetime.now().isoformat() if state == "completed" else None, "stats": stats, "error": str(error) if error else None } save_processing_status(statuses) # ============================================================================= # Routes principales # ============================================================================= @app.route('/') def index(): """Page d'accueil du dashboard.""" return render_template('index.html') # ============================================================================= # API Système # ============================================================================= @app.route('/api/system/status') def system_status(): """Statut du système.""" try: sessions_count = len(list(SESSIONS_PATH.glob('*'))) if SESSIONS_PATH.exists() else 0 workflows_count = len(list(WORKFLOWS_PATH.glob('*.json'))) if WORKFLOWS_PATH.exists() else 0 tests_path = BASE_PATH / "tests" unit_tests = len(list(tests_path.glob('unit/test_*.py'))) if tests_path.exists() else 0 integration_tests = len(list(tests_path.glob('integration/test_*.py'))) if tests_path.exists() else 0 dependencies_ok = True try: import torch import faiss except ImportError: dependencies_ok = False return jsonify({ 'status': 'online', 'sessions_count': sessions_count, 'workflows_count': workflows_count, 'tests': { 'total': unit_tests + integration_tests, 'unit': unit_tests, 'integration': integration_tests }, 'dependencies_ok': dependencies_ok, 'execution': execution_state, 'timestamp': datetime.now().isoformat() }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/system/performance') def system_performance(): """Métriques de performance.""" try: # Essayer de récupérer les vraies stats faiss_stats = {} embedding_stats = {} try: from core.embedding.faiss_manager import FAISSManager # Charger l'index FAISS existant depuis le disque faiss_index_path = DATA_PATH / "faiss_index" / "main.index" faiss_metadata_path = DATA_PATH / "faiss_index" / "main.metadata" if faiss_index_path.exists() and faiss_metadata_path.exists(): fm = FAISSManager.load(faiss_index_path, faiss_metadata_path) faiss_stats = fm.get_stats() else: faiss_stats = {"total_vectors": 0, "status": "index_not_found"} except Exception as e: faiss_stats = {"error": str(e)} try: from core.embedding.embedding_cache import EmbeddingCache cache = EmbeddingCache() embedding_stats = cache.get_stats() except Exception: pass return jsonify({ 'faiss': faiss_stats, 'embedding_cache': embedding_stats, 'metrics': performance_metrics, 'timestamp': datetime.now().isoformat() }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/system/faiss/test', methods=['POST']) def test_faiss_index(): """Teste l'index FAISS avec une recherche aléatoire.""" try: import numpy as np import time from core.embedding.faiss_manager import FAISSManager faiss_index_path = DATA_PATH / "faiss_index" / "main.index" faiss_metadata_path = DATA_PATH / "faiss_index" / "main.metadata" if not faiss_index_path.exists() or not faiss_metadata_path.exists(): return jsonify({ 'success': False, 'error': 'Index FAISS non trouvé', 'recommendation': 'Traitez des sessions pour créer l\'index' }) # Charger l'index start_load = time.time() fm = FAISSManager.load(faiss_index_path, faiss_metadata_path) load_time = (time.time() - start_load) * 1000 stats = fm.get_stats() total_vectors = stats.get('total_vectors', 0) if total_vectors == 0: return jsonify({ 'success': False, 'error': 'Index vide (0 vecteurs)', 'recommendation': 'Traitez des sessions pour ajouter des vecteurs' }) # Créer un vecteur de test aléatoire dimensions = stats.get('dimensions', 512) test_vector = np.random.rand(dimensions).astype(np.float32) test_vector = test_vector / np.linalg.norm(test_vector) # Normaliser # Rechercher les plus proches voisins start_search = time.time() results = fm.search_similar(test_vector, k=min(5, total_vectors)) search_time = (time.time() - start_search) * 1000 # Analyser les résultats (SearchResult: embedding_id, similarity, distance, metadata) result_details = [] for i, sr in enumerate(results): meta = sr.metadata or {} result_details.append({ 'rank': i + 1, 'score': round(sr.similarity, 4), 'target_id': meta.get('target_id', sr.embedding_id[:20] if sr.embedding_id else 'N/A'), 'session_id': meta.get('session_id', '')[:20] if meta.get('session_id') else 'N/A' }) # Recommandations basées sur les stats recommendations = [] if total_vectors > 10000 and stats.get('index_type') == 'Flat': recommendations.append('Passez à un index IVF pour de meilleures performances') if not stats.get('use_gpu') and total_vectors > 50000: recommendations.append('Activez le GPU pour accélérer les recherches') if search_time > 100: recommendations.append('Temps de recherche élevé - optimisez l\'index') return jsonify({ 'success': True, 'load_time_ms': round(load_time, 2), 'search_time_ms': round(search_time, 2), 'total_vectors': total_vectors, 'results_count': len(results), 'results': result_details, 'index_healthy': search_time < 100 and len(results) > 0, 'recommendations': recommendations, 'timestamp': datetime.now().isoformat() }) except Exception as e: return jsonify({ 'success': False, 'error': str(e), 'recommendation': 'Vérifiez les logs pour plus de détails' }), 500 # ============================================================================= # API Sessions # ============================================================================= @app.route('/api/agent/sessions') def list_sessions(): """Liste toutes les sessions agent.""" try: sessions = [] hide_empty = request.args.get('hide_empty', 'true').lower() == 'true' if not SESSIONS_PATH.exists(): return jsonify({'sessions': [], 'total': 0, 'hidden_empty': 0}) for session_dir in SESSIONS_PATH.iterdir(): if not session_dir.is_dir(): continue # Chercher les fichiers JSON dans le répertoire et ses sous-répertoires json_files = list(session_dir.glob('*.json')) + list(session_dir.glob('*/*.json')) if not json_files: continue # Traiter chaque fichier JSON comme une session séparée for json_path in json_files: try: # Essayer de charger avec RawSession, sinon charger comme JSON brut try: session = RawSession.load_from_file(json_path) session_id = session.session_id started_at = session.started_at ended_at = session.ended_at events_count = len(session.events) user = session.user context = session.context except Exception: # Fallback: charger comme JSON brut with open(json_path, 'r') as f: data = json.load(f) session_id = data.get('session_id', json_path.stem) started_at = datetime.fromisoformat(data.get('started_at', '2025-01-01T00:00:00')) ended_at = datetime.fromisoformat(data.get('ended_at')) if data.get('ended_at') else None events_count = len(data.get('events', [])) user = data.get('user', 'unknown') context = data.get('context', {}) # Calculer la taille du fichier JSON size_bytes = json_path.stat().st_size # Chercher les screenshots dans différents emplacements possibles # Utiliser json_path.parent car les shots sont au même niveau que le JSON json_parent = json_path.parent screenshots_dir = json_parent / "screenshots" # Structure standard shots_dir = json_parent / "shots" # Structure agent_v0 screenshot_files = [] if screenshots_dir.exists(): screenshot_files.extend(list(screenshots_dir.glob('*.png'))) if shots_dir.exists(): screenshot_files.extend(list(shots_dir.glob('*.png'))) # Ajouter la taille des screenshots for img_file in screenshot_files: size_bytes += img_file.stat().st_size size_mb = round(size_bytes / (1024 * 1024), 2) # Récupérer le statut de traitement processing_info = get_session_processing_status(session_id) sessions.append({ 'session_id': session_id, 'started_at': started_at.isoformat(), 'ended_at': ended_at.isoformat() if ended_at else None, 'events_count': events_count, 'screenshots_count': len(screenshot_files), 'user': user, 'context': context, 'size_mb': size_mb, 'path': str(json_path.parent), 'json_path': str(json_path), 'processing_state': processing_info.get('state', 'pending'), 'processed_at': processing_info.get('processed_at'), 'processing_stats': processing_info.get('stats') }) except Exception as e: print(f"Erreur lecture session {json_path.name}: {e}") continue sessions.sort(key=lambda x: x['started_at'], reverse=True) # Filtrer les sessions vides si demandé if hide_empty: all_sessions = sessions sessions = [s for s in sessions if s['screenshots_count'] > 0] hidden_empty = len(all_sessions) - len(sessions) else: hidden_empty = len([s for s in sessions if s['screenshots_count'] == 0]) return jsonify({ 'sessions': sessions, 'total': len(sessions), 'hidden_empty': hidden_empty }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/agent/sessions/cleanup-empty', methods=['POST']) def cleanup_empty_sessions(): """Supprime les sessions sans screenshots.""" try: deleted = [] errors = [] if not SESSIONS_PATH.exists(): return jsonify({'deleted': 0, 'errors': []}) for session_dir in SESSIONS_PATH.iterdir(): if not session_dir.is_dir(): continue # Compter les screenshots screenshots = list(session_dir.glob('**/*.png')) if len(screenshots) == 0: # Vérifier qu'il y a au moins un JSON (c'est bien une session) json_files = list(session_dir.glob('**/*.json')) if json_files: try: import shutil shutil.rmtree(session_dir) deleted.append(session_dir.name) except Exception as e: errors.append({'path': str(session_dir), 'error': str(e)}) return jsonify({ 'success': True, 'deleted_count': len(deleted), 'deleted': deleted, 'errors': errors }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/agent/sessions/') def get_session(session_id): """Détails d'une session.""" try: # Chercher la session dans tous les répertoires # Collecter tous les matches et préférer celui avec screenshots candidates = [] for dir_path in SESSIONS_PATH.iterdir(): if not dir_path.is_dir(): continue # Chercher dans le répertoire et ses sous-répertoires json_files = list(dir_path.glob('*.json')) + list(dir_path.glob('*/*.json')) for json_path in json_files: try: session = RawSession.load_from_file(json_path) if session.session_id == session_id: session_dir = json_path.parent has_shots = (session_dir / "shots").exists() or (session_dir / "screenshots").exists() candidates.append((json_path, session_dir, has_shots)) except Exception: continue if not candidates: return jsonify({'error': 'Session non trouvée'}), 404 # Préférer la version avec screenshots candidates.sort(key=lambda x: x[2], reverse=True) session_file, session_dir, _ = candidates[0] session = RawSession.load_from_file(session_file) # Lister les screenshots avec URLs screenshots = [] # Chercher dans différents emplacements possibles screenshots_dir = session_dir / "screenshots" shots_dir = session_dir / "shots" for img_dir in [screenshots_dir, shots_dir]: if img_dir.exists(): for img_file in sorted(img_dir.glob('*.png')): screenshots.append({ 'filename': img_file.name, 'url': f'/api/agent/sessions/{session_id}/screenshot/{img_file.name}', 'size': img_file.stat().st_size }) return jsonify({ 'session': { 'session_id': session.session_id, 'started_at': session.started_at.isoformat(), 'ended_at': session.ended_at.isoformat() if session.ended_at else None, 'user': session.user, 'context': session.context, 'environment': session.environment, 'events': [ { 'type': e.type, 'timestamp': e.t if hasattr(e, 't') else 0, 'window': e.window, 'screenshot_id': e.screenshot_id } for e in session.events ] }, 'screenshots': screenshots, 'screenshots_count': len(screenshots) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/agent/sessions//screenshot/') def get_screenshot(session_id, filename): """Récupère un screenshot.""" try: # Chercher le screenshot dans tous les répertoires screenshot_path = None for dir_path in SESSIONS_PATH.iterdir(): if not dir_path.is_dir(): continue # Chercher dans différents emplacements possibles possible_paths = [ dir_path / "screenshots" / filename, dir_path / "shots" / filename, ] # Chercher aussi dans les sous-répertoires for subdir in dir_path.iterdir(): if subdir.is_dir(): possible_paths.extend([ subdir / "screenshots" / filename, subdir / "shots" / filename, ]) for path in possible_paths: if path.exists(): screenshot_path = path break if screenshot_path: break if not screenshot_path: return jsonify({'error': 'Screenshot non trouvé'}), 404 return send_file(screenshot_path, mimetype='image/png') except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/agent/sessions//process', methods=['POST']) def process_session(session_id): """Lance le traitement d'une session.""" try: session_dir = SESSIONS_PATH / session_id if not session_dir.exists(): return jsonify({'error': 'Session non trouvée'}), 404 # Marquer comme "en cours" set_session_processing_status(session_id, "processing") from server.processing_pipeline import process_session_async def process_in_background(): try: stats = process_session_async(session_id, str(DATA_PATH)) # Marquer comme "terminé" avec les stats set_session_processing_status(session_id, "completed", stats=stats) socketio.emit('processing_complete', { 'session_id': session_id, 'stats': stats }) except Exception as e: # Marquer comme "échoué" set_session_processing_status(session_id, "failed", error=e) socketio.emit('processing_error', { 'session_id': session_id, 'error': str(e) }) thread = threading.Thread(target=process_in_background, daemon=True) thread.start() return jsonify({'status': 'started', 'session_id': session_id}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/agent/sessions//rename', methods=['PATCH', 'POST']) def rename_session_workflow(session_id): """Renomme le workflow (training_label) d'une session.""" try: data = request.get_json() if not data or 'name' not in data: return jsonify({'error': 'Le champ "name" est requis'}), 400 new_name = data['name'].strip() if not new_name: return jsonify({'error': 'Le nom ne peut pas être vide'}), 400 # Sanitizer le nom import re new_name = re.sub(r'[<>:"/\\|?*]', '_', new_name) new_name = re.sub(r'_+', '_', new_name).strip('_') if len(new_name) > 60: new_name = new_name[:57] + "..." # Trouver le fichier JSON de la session session_file = None for dir_path in SESSIONS_PATH.iterdir(): if not dir_path.is_dir(): continue json_files = list(dir_path.glob('*.json')) + list(dir_path.glob('*/*.json')) for json_path in json_files: try: session = RawSession.load_from_file(json_path) if session.session_id == session_id: session_file = json_path break except Exception: continue if session_file: break if not session_file: return jsonify({'error': 'Session non trouvée'}), 404 # Charger et mettre à jour la session session = RawSession.load_from_file(session_file) old_name = session.context.get('training_label', '') session.context['training_label'] = new_name # Sauvegarder with open(session_file, 'w', encoding='utf-8') as f: json.dump(session.to_json(), f, ensure_ascii=False, indent=2) return jsonify({ 'success': True, 'session_id': session_id, 'old_name': old_name, 'new_name': new_name }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # API Workflows # ============================================================================= @app.route('/api/workflows') def list_workflows(): """Liste tous les workflows.""" try: workflows = [] hide_unnamed = request.args.get('hide_unnamed', 'true').lower() == 'true' if not WORKFLOWS_PATH.exists(): WORKFLOWS_PATH.mkdir(parents=True, exist_ok=True) return jsonify({'workflows': [], 'total': 0, 'hidden_unnamed': 0}) for wf_file in WORKFLOWS_PATH.glob('*.json'): try: with open(wf_file, 'r') as f: wf_data = json.load(f) workflows.append({ 'workflow_id': wf_data.get('workflow_id', wf_file.stem), 'name': wf_data.get('name', wf_file.stem), 'description': wf_data.get('description', ''), 'nodes_count': len(wf_data.get('nodes', [])), 'edges_count': len(wf_data.get('edges', [])), 'learning_state': wf_data.get('learning_state', 'OBSERVATION'), 'created_at': wf_data.get('created_at', ''), 'updated_at': wf_data.get('updated_at', ''), 'execution_count': wf_data.get('execution_count', 0), 'file_path': str(wf_file) }) except Exception as e: print(f"Erreur lecture workflow {wf_file}: {e}") # Filtrer les workflows "Unnamed" si demandé if hide_unnamed: all_workflows = workflows workflows = [w for w in workflows if w['name'] != 'Unnamed Workflow'] hidden_unnamed = len(all_workflows) - len(workflows) else: hidden_unnamed = len([w for w in workflows if w['name'] == 'Unnamed Workflow']) return jsonify({ 'workflows': workflows, 'total': len(workflows), 'hidden_unnamed': hidden_unnamed }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows/cleanup-unnamed', methods=['POST']) def cleanup_unnamed_workflows(): """Supprime les workflows sans nom (Unnamed Workflow).""" try: deleted = [] errors = [] if not WORKFLOWS_PATH.exists(): return jsonify({'deleted': 0, 'errors': []}) for wf_file in WORKFLOWS_PATH.glob('*.json'): try: with open(wf_file, 'r') as f: wf_data = json.load(f) if wf_data.get('name') == 'Unnamed Workflow': wf_file.unlink() deleted.append(wf_file.name) except Exception as e: errors.append({'file': str(wf_file), 'error': str(e)}) return jsonify({ 'success': True, 'deleted_count': len(deleted), 'deleted': deleted, 'errors': errors }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows/') def get_workflow(workflow_id): """Détails d'un workflow.""" try: wf_file = WORKFLOWS_PATH / f"{workflow_id}.json" if not wf_file.exists(): return jsonify({'error': 'Workflow non trouvé'}), 404 with open(wf_file, 'r') as f: wf_data = json.load(f) return jsonify({'workflow': wf_data}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows//execute', methods=['POST']) def execute_workflow(workflow_id): """Lance l'exécution d'un workflow.""" global execution_state try: data = request.get_json() or {} mode = data.get('mode', 'supervised') wf_file = WORKFLOWS_PATH / f"{workflow_id}.json" if not wf_file.exists(): return jsonify({'error': 'Workflow non trouvé'}), 404 # Mettre à jour l'état execution_state = { "running": True, "workflow_id": workflow_id, "current_node": None, "steps_executed": 0, "last_confidence": 0.0, "mode": mode, "started_at": datetime.now().isoformat(), "history": [] } # Notifier via WebSocket socketio.emit('execution_started', execution_state) # Lancer l'exécution en arrière-plan def run_workflow(): global execution_state try: from core.pipeline.workflow_pipeline import WorkflowPipeline from core.execution.execution_loop import ExecutionLoop, ExecutionMode pipeline = WorkflowPipeline() loop = ExecutionLoop(pipeline) # Callback pour les mises à jour def on_step(step_result): global execution_state execution_state["current_node"] = step_result.node_id execution_state["steps_executed"] += 1 execution_state["last_confidence"] = step_result.match_confidence execution_state["history"].append({ "node_id": step_result.node_id, "success": step_result.success, "confidence": step_result.match_confidence, "timestamp": datetime.now().isoformat() }) socketio.emit('execution_step', execution_state) loop.on_step_complete(on_step) mode_map = { 'observation': ExecutionMode.OBSERVATION, 'coaching': ExecutionMode.COACHING, 'supervised': ExecutionMode.SUPERVISED, 'automatic': ExecutionMode.AUTOMATIC } loop.start(workflow_id, mode=mode_map.get(mode, ExecutionMode.SUPERVISED)) except Exception as e: execution_state["running"] = False execution_state["error"] = str(e) socketio.emit('execution_error', execution_state) thread = threading.Thread(target=run_workflow, daemon=True) thread.start() return jsonify({'status': 'started', 'execution': execution_state}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows//stop', methods=['POST']) def stop_workflow(workflow_id): """Arrête l'exécution d'un workflow.""" global execution_state execution_state["running"] = False socketio.emit('execution_stopped', execution_state) return jsonify({'status': 'stopped'}) # ============================================================================= # API Tests # ============================================================================= @app.route('/api/tests') def list_tests(): """Liste tous les tests disponibles.""" try: tests = [] tests_path = BASE_PATH / "tests" if not tests_path.exists(): return jsonify({'tests': [], 'total': 0}) for test_type in ['unit', 'integration', 'performance']: type_path = tests_path / test_type if type_path.exists(): for test_file in type_path.glob('test_*.py'): tests.append({ 'name': test_file.stem, 'path': str(test_file.relative_to(BASE_PATH)), 'type': test_type }) return jsonify({ 'tests': tests, 'total': len(tests) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/tests/run', methods=['POST']) def run_test(): """Lance un test spécifique.""" try: data = request.get_json() test_path = data.get('test_path') if not test_path: return jsonify({'error': 'test_path requis'}), 400 result = subprocess.run( ['pytest', test_path, '-v', '--tb=short'], cwd=BASE_PATH, capture_output=True, text=True, timeout=120 ) return jsonify({ 'success': result.returncode == 0, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode }) except subprocess.TimeoutExpired: return jsonify({'error': 'Timeout (120s)'}), 408 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/tests/run-all', methods=['POST']) def run_all_tests(): """Lance tous les tests d'un type.""" try: data = request.get_json() test_type = data.get('type', 'unit') tests_path = BASE_PATH / "tests" / test_type if not tests_path.exists(): return jsonify({'error': f'Répertoire {test_type} non trouvé'}), 404 result = subprocess.run( ['pytest', str(tests_path), '-v', '--tb=short'], cwd=BASE_PATH, capture_output=True, text=True, timeout=300 ) return jsonify({ 'success': result.returncode == 0, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode }) except subprocess.TimeoutExpired: return jsonify({'error': 'Timeout (300s)'}), 408 except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # API Chains # ============================================================================= @app.route('/api/chains') def api_chains(): """Liste toutes les chaînes de workflows.""" try: chains = chain_manager.list_chains() return jsonify({ 'chains': [ { 'chain_id': chain.chain_id, 'name': chain.name, 'workflows': chain.workflows, 'status': chain.status, 'created_at': chain.created_at.isoformat(), 'last_execution': chain.last_execution.isoformat() if chain.last_execution else None, 'success_rate': chain.success_rate } for chain in chains ], 'total': len(chains) }) except Exception as e: api_logger.error(f"Error listing chains: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/chains', methods=['POST']) def create_chain(): """Crée une nouvelle chaîne.""" try: data = request.get_json() name = data.get('name') workflows = data.get('workflows', []) if not name or not workflows: return jsonify({'error': 'name and workflows required'}), 400 chain = chain_manager.create_chain(name, workflows) api_logger.info(f"Created chain {chain.chain_id}") return jsonify({ 'chain': { 'chain_id': chain.chain_id, 'name': chain.name, 'workflows': chain.workflows, 'status': chain.status } }), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: api_logger.error(f"Error creating chain: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/chains//execute', methods=['POST']) def execute_chain_route(chain_id): """Lance l'exécution d'une chaîne.""" try: def on_progress(workflow_id, current, total): socketio.emit('chain_progress', { 'chain_id': chain_id, 'workflow_id': workflow_id, 'current': current, 'total': total }) result = chain_manager.execute_chain(chain_id, on_progress=on_progress) socketio.emit('chain_complete', { 'chain_id': chain_id, 'success': result.success, 'duration': result.duration }) return jsonify({ 'result': { 'chain_id': result.chain_id, 'success': result.success, 'duration': result.duration, 'workflows_executed': result.workflows_executed, 'failed_at': result.failed_at, 'error_message': result.error_message } }) except ValueError as e: return jsonify({'error': str(e)}), 404 except Exception as e: api_logger.error(f"Error executing chain {chain_id}: {e}") return jsonify({'error': str(e)}), 500 # ============================================================================= # API Triggers # ============================================================================= @app.route('/api/triggers') def api_triggers(): """Liste tous les triggers.""" try: triggers = trigger_manager.list_triggers() return jsonify({ 'triggers': [ { 'trigger_id': trigger.trigger_id, 'trigger_type': trigger.trigger_type, 'workflow_id': trigger.workflow_id, 'config': trigger.config, 'enabled': trigger.enabled, 'created_at': trigger.created_at.isoformat(), 'last_fired': trigger.last_fired.isoformat() if trigger.last_fired else None, 'fire_count': trigger.fire_count } for trigger in triggers ], 'total': len(triggers) }) except Exception as e: api_logger.error(f"Error listing triggers: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/triggers', methods=['POST']) def create_trigger(): """Crée un nouveau trigger.""" try: data = request.get_json() trigger_type = data.get('trigger_type') workflow_id = data.get('workflow_id') config = data.get('config', {}) if not trigger_type or not workflow_id: return jsonify({'error': 'trigger_type and workflow_id required'}), 400 trigger = trigger_manager.create_trigger(trigger_type, workflow_id, config) api_logger.info(f"Created trigger {trigger.trigger_id}") return jsonify({ 'trigger': { 'trigger_id': trigger.trigger_id, 'trigger_type': trigger.trigger_type, 'workflow_id': trigger.workflow_id, 'config': trigger.config, 'enabled': trigger.enabled } }), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: api_logger.error(f"Error creating trigger: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/triggers//toggle', methods=['POST']) def toggle_trigger(trigger_id): """Active/désactive un trigger.""" try: trigger = trigger_manager.get_trigger(trigger_id) if not trigger: return jsonify({'error': 'Trigger not found'}), 404 if trigger.enabled: trigger_manager.disable_trigger(trigger_id) api_logger.info(f"Disabled trigger {trigger_id}") else: trigger_manager.enable_trigger(trigger_id) api_logger.info(f"Enabled trigger {trigger_id}") trigger = trigger_manager.get_trigger(trigger_id) return jsonify({ 'trigger': { 'trigger_id': trigger.trigger_id, 'enabled': trigger.enabled } }) except Exception as e: api_logger.error(f"Error toggling trigger {trigger_id}: {e}") return jsonify({'error': str(e)}), 500 # ============================================================================= # API Logs # ============================================================================= @app.route('/api/logs') def get_logs(): """Récupère les logs récents.""" try: logs = [] if not LOGS_PATH.exists(): return jsonify({'logs': [], 'total': 0}) for log_file in LOGS_PATH.glob('*.log'): try: with open(log_file, 'r') as f: lines = f.readlines() for line in lines[-100:]: logs.append({ 'file': log_file.name, 'message': line.strip() }) except Exception as e: print(f"Erreur lecture log {log_file}: {e}") return jsonify({ 'logs': logs[-200:], 'total': len(logs) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/logs/download') def download_logs(): """Télécharge les logs en format ZIP.""" try: # Récupérer les paramètres de date optionnels start_time = request.args.get('start_time') end_time = request.args.get('end_time') start_dt = datetime.fromisoformat(start_time) if start_time else None end_dt = datetime.fromisoformat(end_time) if end_time else None # Générer le ZIP zip_file = log_exporter.export_to_zip(start_dt, end_dt) api_logger.info(f"Logs downloaded (start={start_time}, end={end_time})") return send_file( zip_file, mimetype='application/zip', as_attachment=True, download_name=f'rpa_logs_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: api_logger.error(f"Error downloading logs: {e}") return jsonify({'error': str(e)}), 500 # ============================================================================= # API Automation # ============================================================================= @app.route('/api/automation/status') def automation_status(): """Statut du scheduler d'automatisation.""" try: status = automation_scheduler.get_status() return jsonify(status) except Exception as e: api_logger.error(f"Error getting automation status: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/automation/start', methods=['POST']) def automation_start(): """Démarre le scheduler.""" try: automation_scheduler.start() return jsonify({'status': 'started'}) except Exception as e: api_logger.error(f"Error starting automation: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/automation/stop', methods=['POST']) def automation_stop(): """Arrête le scheduler.""" try: automation_scheduler.stop() return jsonify({'status': 'stopped'}) except Exception as e: api_logger.error(f"Error stopping automation: {e}") return jsonify({'error': str(e)}), 500 # ============================================================================= # API Backup & Export - Sauvegardes client # ============================================================================= @app.route('/api/backup/stats') def backup_stats(): """Statistiques des données disponibles pour backup.""" try: exporter = get_backup_exporter() stats = exporter.get_backup_stats() return jsonify({ 'stats': stats, 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error getting backup stats: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/backup/full', methods=['POST']) def backup_full(): """Génère et télécharge un backup complet.""" try: data = request.get_json() or {} include_models = data.get('include_models', False) exporter = get_backup_exporter() zip_path = exporter.export_full_backup(include_models=include_models) api_logger.info(f"Full backup created (include_models={include_models})") return send_file( zip_path, mimetype='application/zip', as_attachment=True, download_name=f'rpa_vision_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: api_logger.error(f"Error creating full backup: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/backup/workflows') def backup_workflows(): """Génère et télécharge un backup des workflows uniquement.""" try: exporter = get_backup_exporter() zip_path = exporter.export_workflows() api_logger.info("Workflows backup created") return send_file( zip_path, mimetype='application/zip', as_attachment=True, download_name=f'rpa_workflows_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: api_logger.error(f"Error creating workflows backup: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/backup/correction-packs') def backup_correction_packs(): """Génère et télécharge un backup des correction packs.""" try: exporter = get_backup_exporter() zip_path = exporter.export_correction_packs() api_logger.info("Correction packs backup created") return send_file( zip_path, mimetype='application/zip', as_attachment=True, download_name=f'rpa_correction_packs_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: api_logger.error(f"Error creating correction packs backup: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/backup/trained-models') def backup_trained_models(): """ Génère et télécharge les modèles entraînés (opt-in). Ces modèles sont anonymisés et contiennent uniquement les patterns appris, pas les données brutes ou les screenshots. """ try: exporter = get_backup_exporter() zip_path = exporter.export_trained_models() api_logger.info("Trained models backup created (opt-in)") return send_file( zip_path, mimetype='application/zip', as_attachment=True, download_name=f'rpa_trained_models_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: api_logger.error(f"Error creating trained models backup: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/backup/config') def backup_config(): """Génère et télécharge la configuration (sanitisée).""" try: exporter = get_backup_exporter() zip_path = exporter.export_config() api_logger.info("Config backup created (sanitized)") return send_file( zip_path, mimetype='application/zip', as_attachment=True, download_name=f'rpa_config_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: api_logger.error(f"Error creating config backup: {e}") return jsonify({'error': str(e)}), 500 # ============================================================================= # API Version & Updates - Gestion des versions # ============================================================================= @app.route('/api/version') def get_version(): """Retourne la version actuelle du système.""" try: vm = get_version_manager() version_info = vm.get_current_version() return jsonify({ 'version': version_info.to_dict(), 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error getting version: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/version/system-info') def get_system_info(): """Retourne les informations système complètes.""" try: vm = get_version_manager() system_info = vm.get_system_info() return jsonify({ 'system_info': system_info, 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error getting system info: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/version/check-update') def check_update(): """Vérifie si une mise à jour est disponible.""" try: vm = get_version_manager() update = vm.check_for_updates() if update: return jsonify({ 'update_available': True, 'update': update.to_dict(), 'timestamp': datetime.now().isoformat() }) else: return jsonify({ 'update_available': False, 'message': 'Système à jour', 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error checking update: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/version/backups') def list_version_backups(): """Liste les backups de version disponibles pour rollback.""" try: vm = get_version_manager() backups = vm.list_backups() return jsonify({ 'backups': backups, 'total': len(backups), 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error listing version backups: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/version/create-backup', methods=['POST']) def create_version_backup(): """Crée un backup de la version actuelle (point de restauration).""" try: data = request.get_json() or {} label = data.get('label') vm = get_version_manager() backup_path = vm.create_backup(label=label) api_logger.info(f"Version backup created: {backup_path}") return jsonify({ 'success': True, 'backup_path': str(backup_path), 'label': label, 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error creating version backup: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/version/upload-update', methods=['POST']) def upload_update_package(): """ Upload d'un package de mise à jour. Le package doit être un fichier ZIP contenant: - update_manifest.json - Les fichiers de mise à jour """ try: if 'file' not in request.files: return jsonify({'error': 'Aucun fichier fourni'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'Nom de fichier vide'}), 400 if not file.filename.endswith('.zip'): return jsonify({'error': 'Le fichier doit être un ZIP'}), 400 # Sauvegarder dans le répertoire updates vm = get_version_manager() update_path = vm.updates_dir / file.filename file.save(update_path) # Extraire et valider le manifest import zipfile with zipfile.ZipFile(update_path, 'r') as zf: if 'update_manifest.json' not in zf.namelist(): update_path.unlink() return jsonify({'error': 'Package invalide: manifest manquant'}), 400 # Extraire le manifest zf.extract('update_manifest.json', vm.updates_dir) with open(vm.updates_dir / 'update_manifest.json', 'r') as f: manifest = json.load(f) api_logger.info(f"Update package uploaded: {file.filename} (version {manifest.get('version')})") return jsonify({ 'success': True, 'filename': file.filename, 'manifest': manifest, 'message': 'Package uploadé. Utilisez /api/version/check-update pour vérifier.', 'timestamp': datetime.now().isoformat() }) except Exception as e: api_logger.error(f"Error uploading update package: {e}") return jsonify({'error': str(e)}), 500 # ============================================================================= # API Metrics (Prometheus) # ============================================================================= @app.route('/metrics') def prometheus_metrics(): """Endpoint Prometheus pour les métriques.""" try: return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST) except Exception as e: api_logger.error(f"Error generating metrics: {e}") return Response(f"# Error: {e}\n", mimetype='text/plain'), 500 # ============================================================================= # WebSocket Events # ============================================================================= @socketio.on('connect') def handle_connect(): """Client connecté.""" print(f"Client connecté: {request.sid}") emit('connected', {'status': 'ok', 'execution': execution_state}) @socketio.on('disconnect') def handle_disconnect(): """Client déconnecté.""" print(f"Client déconnecté: {request.sid}") @socketio.on('subscribe_execution') def handle_subscribe(): """S'abonner aux mises à jour d'exécution.""" emit('execution_state', execution_state) @socketio.on('get_performance') def handle_get_performance(): """Demande les métriques de performance.""" emit('performance_update', performance_metrics) # ============================================================================= # Background tasks # ============================================================================= def broadcast_metrics(): """Diffuse les métriques périodiquement.""" while True: time.sleep(5) try: socketio.emit('metrics_update', { 'execution': execution_state, 'performance': performance_metrics, 'timestamp': datetime.now().isoformat() }) except Exception: pass # Démarrer le thread de broadcast metrics_thread = threading.Thread(target=broadcast_metrics, daemon=True) metrics_thread.start() # ============================================================================= # API Service Manager - Panneau de contrôle pour démos # ============================================================================= # Configuration des services RPA Vision V3 SERVICES_CONFIG = { "agent_chat": { "name": "Agent Chat (LLM)", "description": "Interface conversationnelle avec Ollama", "port": 5002, "start_cmd": "cd {base} && ./venv_v3/bin/python -m agent_chat.app", "url": "http://localhost:5002", "icon": "🤖" }, "vwb_backend": { "name": "VWB Backend", "description": "API Visual Workflow Builder", "port": 5000, "start_cmd": "cd {base}/visual_workflow_builder/backend && {base}/venv_v3/bin/python app.py", "url": "http://localhost:5000", "icon": "⚙️" }, "vwb_frontend": { "name": "VWB Frontend", "description": "Interface React du Workflow Builder", "port": 3000, "start_cmd": "cd {base}/visual_workflow_builder/frontend && npm start", "url": "http://localhost:3000", "icon": "🎨" }, "web_dashboard": { "name": "Dashboard (ce service)", "description": "Panneau de contrôle RPA Vision V3", "port": 5001, "start_cmd": None, # Déjà en cours "url": "http://localhost:5001", "icon": "📊" } } # Processus en cours (PID tracking) _running_processes = {} def _check_port(port: int) -> bool: """Vérifie si un port est en écoute.""" import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1) try: s.connect(('127.0.0.1', port)) return True except (socket.timeout, ConnectionRefusedError): return False def _get_service_status(service_id: str) -> dict: """Retourne le statut détaillé d'un service.""" config = SERVICES_CONFIG.get(service_id) if not config: return {"error": "Service inconnu"} port = config["port"] is_running = _check_port(port) pid = _running_processes.get(service_id) return { "service_id": service_id, "name": config["name"], "description": config["description"], "icon": config["icon"], "port": port, "url": config["url"], "status": "running" if is_running else "stopped", "pid": pid, "can_start": config["start_cmd"] is not None, "can_stop": service_id != "web_dashboard" } @app.route('/api/services') def api_services(): """Liste tous les services avec leur statut.""" services = [] for service_id in SERVICES_CONFIG: services.append(_get_service_status(service_id)) return jsonify({ "services": services, "timestamp": datetime.now().isoformat() }) @app.route('/api/services//status') def api_service_status(service_id): """Statut d'un service spécifique.""" status = _get_service_status(service_id) if "error" in status: return jsonify(status), 404 return jsonify(status) @app.route('/api/services//start', methods=['POST']) def api_service_start(service_id): """Démarre un service.""" config = SERVICES_CONFIG.get(service_id) if not config: return jsonify({"error": "Service inconnu"}), 404 if not config["start_cmd"]: return jsonify({"error": "Ce service ne peut pas être démarré via l'API"}), 400 # Vérifier si déjà en cours if _check_port(config["port"]): return jsonify({"status": "already_running", "message": f"{config['name']} est déjà en cours"}) try: # Préparer la commande cmd = config["start_cmd"].format(base=str(BASE_PATH)) # Lancer en arrière-plan process = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True ) _running_processes[service_id] = process.pid # Attendre un peu que le service démarre time.sleep(2) # Vérifier si le service a démarré is_running = _check_port(config["port"]) api_logger.info(f"Service {service_id} started (PID: {process.pid}, running: {is_running})") return jsonify({ "status": "started" if is_running else "starting", "pid": process.pid, "message": f"{config['name']} démarré" if is_running else "Démarrage en cours...", "url": config["url"] }) except Exception as e: api_logger.error(f"Error starting service {service_id}: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/services//stop', methods=['POST']) def api_service_stop(service_id): """Arrête un service.""" config = SERVICES_CONFIG.get(service_id) if not config: return jsonify({"error": "Service inconnu"}), 404 if service_id == "web_dashboard": return jsonify({"error": "Impossible d'arrêter le dashboard depuis lui-même"}), 400 try: port = config["port"] # Trouver et tuer le processus sur ce port result = subprocess.run( f"lsof -ti :{port} | xargs -r kill -TERM", shell=True, capture_output=True, text=True ) # Nettoyer le tracking if service_id in _running_processes: del _running_processes[service_id] # Attendre que le port se libère time.sleep(1) is_stopped = not _check_port(port) api_logger.info(f"Service {service_id} stopped (success: {is_stopped})") return jsonify({ "status": "stopped" if is_stopped else "stopping", "message": f"{config['name']} arrêté" if is_stopped else "Arrêt en cours..." }) except Exception as e: api_logger.error(f"Error stopping service {service_id}: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/services//restart', methods=['POST']) def api_service_restart(service_id): """Redémarre un service.""" # Arrêter stop_response = api_service_stop(service_id) if stop_response[1] if isinstance(stop_response, tuple) else 200 >= 400: return stop_response # Attendre time.sleep(2) # Redémarrer return api_service_start(service_id) @app.route('/api/services/start-all', methods=['POST']) def api_services_start_all(): """Démarre tous les services.""" results = {} for service_id, config in SERVICES_CONFIG.items(): if config["start_cmd"]: try: if not _check_port(config["port"]): cmd = config["start_cmd"].format(base=str(BASE_PATH)) process = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True ) _running_processes[service_id] = process.pid results[service_id] = {"status": "starting", "pid": process.pid} else: results[service_id] = {"status": "already_running"} except Exception as e: results[service_id] = {"status": "error", "error": str(e)} # Attendre que les services démarrent time.sleep(3) # Vérifier les statuts for service_id in results: if results[service_id].get("status") == "starting": port = SERVICES_CONFIG[service_id]["port"] results[service_id]["status"] = "running" if _check_port(port) else "failed" return jsonify({"results": results, "timestamp": datetime.now().isoformat()}) @app.route('/api/services/stop-all', methods=['POST']) def api_services_stop_all(): """Arrête tous les services (sauf le dashboard).""" results = {} for service_id, config in SERVICES_CONFIG.items(): if service_id != "web_dashboard" and config["start_cmd"]: try: port = config["port"] subprocess.run(f"lsof -ti :{port} | xargs -r kill -TERM", shell=True) results[service_id] = {"status": "stopped"} except Exception as e: results[service_id] = {"status": "error", "error": str(e)} _running_processes.clear() return jsonify({"results": results, "timestamp": datetime.now().isoformat()}) # ============================================================================= # Main # ============================================================================= if __name__ == '__main__': # Valider la sécurité en production (non-bloquant pour le dashboard) from core.security import get_security_config try: from core.security import validate_production_security config = get_security_config() validate_production_security(config) print("✅ Security validation passed") except Exception as e: print(f"⚠️ Security validation warning: {e}") print("Dashboard will start anyway (monitoring service)") # Ne pas arrêter le dashboard pour des problèmes de sécurité mineurs # Initialiser le système de cleanup try: from core.system import initialize_system_cleanup, shutdown_system initialize_system_cleanup() except ImportError: print("⚠️ System cleanup not available") print("=" * 50) print("RPA Vision V3 - Dashboard Web") print("=" * 50) print(f"Base path: {BASE_PATH}") print(f"Data path: {DATA_PATH}") print(f"Sessions path: {SESSIONS_PATH}") print(f"Workflows path: {WORKFLOWS_PATH}") print("") print("Démarrage sur http://0.0.0.0:5001") print("WebSocket activé") print("=" * 50) try: socketio.run( app, host='0.0.0.0', port=5001, debug=False, allow_unsafe_werkzeug=True ) except KeyboardInterrupt: print("\nReceived keyboard interrupt, shutting down...") try: shutdown_system() except: pass except Exception as e: print(f"Dashboard error: {e}") try: shutdown_system() except: pass raise