#!/usr/bin/env python3 """ API Serveur pour recevoir les uploads de l'agent V0 Endpoints: - POST /api/traces/upload - Upload fichier .enc chiffré - GET /api/traces/status - Status du serveur - GET /api/traces/sessions - Liste des sessions reçues Usage: python api_upload.py # Ou avec uvicorn directement: uvicorn api_upload:app --host 0.0.0.0 --port 8000 """ import os import sys import logging import zipfile from pathlib import Path from datetime import datetime from typing import Optional from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import JSONResponse, Response import uvicorn from prometheus_client import generate_latest, CONTENT_TYPE_LATEST # Ajouter le répertoire parent au path pour importer les modules RPA Vision V3 sys.path.insert(0, str(Path(__file__).parent.parent)) from core.persistence import StorageManager from core.models import RawSession # Fiche #23 - Sécurité/gouvernance API (middleware) from core.security.fastapi_security import install_security_middlewares # Fiche #22 - AutoHeal admin API (optionnel) try: from core.system.api_admin_autoheal import router as autoheal_admin_router AUTOHEAL_API_AVAILABLE = True except Exception as _e: AUTOHEAL_API_AVAILABLE = False autoheal_admin_router = None # Fiche #23 - Security admin API (kill-switch status) try: from core.system.api_admin_security import router as security_admin_router SECURITY_ADMIN_API_AVAILABLE = True except Exception as _e: SECURITY_ADMIN_API_AVAILABLE = False security_admin_router = None # Configuration UPLOAD_DIR = Path("data/training/uploads") SESSIONS_DIR = Path("data/training/sessions") ENVIRONMENT = os.getenv("ENVIRONMENT", "development") # Worker mode (prod): # - thread : worker de processing dans le même process que l'API (défaut / simple) # - external : worker séparé (systemd rpa-vision-v3-worker.service) # - disabled : aucun traitement (API upload only) PROCESSING_WORKER_MODE = os.getenv("RPA_PROCESSING_WORKER", "thread").strip().lower() # Gestion sécurisée du mot de passe ENCRYPTION_PASSWORD = os.getenv("ENCRYPTION_PASSWORD") if not ENCRYPTION_PASSWORD: if ENVIRONMENT == "production": raise ValueError( "ENCRYPTION_PASSWORD must be set in production! " "Set it with: export ENCRYPTION_PASSWORD='your_secure_password'" ) ENCRYPTION_PASSWORD = "rpa_vision_v3_default_key" # Warning sera affiché au démarrage # Créer les répertoires UPLOAD_DIR.mkdir(parents=True, exist_ok=True) SESSIONS_DIR.mkdir(parents=True, exist_ok=True) # Logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s' ) logger = logging.getLogger("api_upload") # FastAPI app app = FastAPI( title="RPA Vision V3 - Agent Upload API", description="API pour recevoir les sessions enregistrées par l'agent V0", version="1.0.0" ) # Installer la sécurité (auth + allowlist + rate-limit + audit + kill-switch) install_security_middlewares(app) # Monter l'API admin AutoHeal (si dispo) if AUTOHEAL_API_AVAILABLE and autoheal_admin_router is not None: app.include_router(autoheal_admin_router, prefix="/admin/autoheal", tags=["autoheal"]) # Monter l'API admin sécurité (si dispo) if SECURITY_ADMIN_API_AVAILABLE and security_admin_router is not None: app.include_router(security_admin_router, prefix="/admin/security", tags=["security"]) # StorageManager storage = StorageManager(base_path="data/training") # Importer le module de déchiffrement try: from storage_encrypted import decrypt_session_file as decrypt_file DECRYPTION_AVAILABLE = True except ImportError: logger.warning("Module storage_encrypted non trouvé, déchiffrement désactivé!") DECRYPTION_AVAILABLE = False def decrypt_file(encrypted_path: str, password: str) -> str: """Fallback si module non disponible.""" logger.error("Déchiffrement non disponible!") return encrypted_path.replace('.enc', '.zip') @app.post("/api/traces/upload") async def upload_session( file: UploadFile = File(...), session_id: str = Form(...) ): """ Upload d'une session enregistrée par l'agent V0. Args: file: Fichier .enc (chiffré) ou .zip session_id: ID de la session Returns: JSON avec status et infos """ try: logger.info(f"Réception upload session: {session_id}") logger.info(f"Fichier: {file.filename}, taille: {file.size} bytes") # Sauvegarder le fichier uploadé file_ext = '.enc' if file.filename.endswith('.enc') else '.zip' upload_path = UPLOAD_DIR / f"{session_id}{file_ext}" with open(upload_path, "wb") as f: content = await file.read() f.write(content) logger.info(f"Fichier sauvegardé: {upload_path}") # Déchiffrer si nécessaire if file_ext == '.enc': if not DECRYPTION_AVAILABLE: raise HTTPException( status_code=500, detail="Déchiffrement non disponible sur le serveur" ) try: zip_path = decrypt_file(str(upload_path), ENCRYPTION_PASSWORD) logger.info(f"Fichier déchiffré: {zip_path}") except Exception as e: logger.error(f"Erreur déchiffrement: {e}") raise HTTPException( status_code=400, detail=f"Erreur déchiffrement: {str(e)}" ) else: zip_path = str(upload_path) # Extraire le ZIP extract_dir = SESSIONS_DIR / session_id extract_dir.mkdir(parents=True, exist_ok=True) try: with zipfile.ZipFile(zip_path, 'r') as zf: zf.extractall(extract_dir) logger.info(f"ZIP extrait dans: {extract_dir}") except Exception as e: logger.error(f"Erreur extraction ZIP: {e}") raise HTTPException( status_code=400, detail=f"Erreur extraction ZIP: {str(e)}" ) # Charger et valider la RawSession json_path = extract_dir / session_id / f"{session_id}.json" if not json_path.exists(): logger.error(f"Fichier JSON introuvable: {json_path}") raise HTTPException( status_code=400, detail=f"Fichier JSON introuvable dans le ZIP" ) try: session = RawSession.load_from_file(json_path) logger.info(f"RawSession chargée: {session.session_id}") logger.info(f" - Événements: {len(session.events)}") logger.info(f" - Screenshots: {len(session.screenshots)}") logger.info(f" - Utilisateur: {session.user}") except Exception as e: logger.error(f"Erreur chargement RawSession: {e}") raise HTTPException( status_code=400, detail=f"Erreur chargement RawSession: {str(e)}" ) # Sauvegarder avec StorageManager try: storage.save_raw_session(session, session_id) logger.info(f"Session sauvegardée dans StorageManager") except Exception as e: logger.warning(f"Erreur sauvegarde StorageManager: {e}") # Pas bloquant, on continue # Ajouter à la queue de processing (traitement asynchrone robuste) try: from processing_queue import add_to_queue add_to_queue(session_id, "data/training") logger.info(f"Session {session_id} ajoutée à la queue de processing") except Exception as e: logger.warning(f"Impossible d'ajouter à la queue: {e}") # Fallback: traitement direct en thread try: from processing_pipeline import process_session_async import threading def process_in_background(): try: logger.info(f"Démarrage pipeline processing pour {session_id}") stats = process_session_async(session_id, "data/training") logger.info(f"Pipeline terminé: {stats}") except Exception as e: logger.error(f"Erreur pipeline: {e}") thread = threading.Thread(target=process_in_background, daemon=True) thread.start() logger.info("Pipeline lancé en arrière-plan (fallback)") except Exception as e2: logger.warning(f"Impossible de lancer le pipeline: {e2}") return JSONResponse({ "status": "success", "session_id": session_id, "events_count": len(session.events), "screenshots_count": len(session.screenshots), "user": session.user, "received_at": datetime.now().isoformat() }) except HTTPException: raise except Exception as e: logger.exception(f"Erreur inattendue: {e}") raise HTTPException( status_code=500, detail=f"Erreur serveur: {str(e)}" ) @app.get("/api/traces/debug-env") async def debug_env(): """Debug endpoint to check environment variables.""" import os return { "RPA_TOKEN_ADMIN": os.getenv("RPA_TOKEN_ADMIN", "NOT_SET"), "RPA_TOKEN_READONLY": os.getenv("RPA_TOKEN_READONLY", "NOT_SET"), "ADMIN_TOKENS": os.getenv("ADMIN_TOKENS", "NOT_SET"), "READ_ONLY_TOKENS": os.getenv("READ_ONLY_TOKENS", "NOT_SET"), } @app.get("/api/traces/debug-auth") async def debug_auth(): """Debug endpoint to check token loading.""" import os from core.security.api_tokens import get_token_manager token_manager = get_token_manager() return { "environment_vars": { "RPA_TOKEN_ADMIN": bool(os.getenv("RPA_TOKEN_ADMIN")), "RPA_TOKEN_READONLY": bool(os.getenv("RPA_TOKEN_READONLY")), "ADMIN_TOKENS": bool(os.getenv("ADMIN_TOKENS")), "READ_ONLY_TOKENS": bool(os.getenv("READ_ONLY_TOKENS")), }, "token_counts": { "admin_tokens": len(token_manager.admin_tokens), "read_only_tokens": len(token_manager.read_only_tokens), }, "admin_tokens_preview": [t[:8] + "..." for t in list(token_manager.admin_tokens)[:3]], "read_only_tokens_preview": [t[:8] + "..." for t in list(token_manager.read_only_tokens)[:3]], } @app.get("/api/traces/status") async def get_status(): """Status du serveur.""" return { "status": "online", "version": "1.0.0", "upload_dir": str(UPLOAD_DIR), "sessions_dir": str(SESSIONS_DIR), "encryption_enabled": ENCRYPTION_PASSWORD != "rpa_vision_v3_default_key" } @app.get("/api/traces/sessions") async def list_sessions(): """Liste des sessions reçues.""" sessions = [] for session_dir in SESSIONS_DIR.iterdir(): if session_dir.is_dir(): json_files = list(session_dir.glob("*/*.json")) if json_files: json_path = json_files[0] try: session = RawSession.load_from_file(json_path) sessions.append({ "session_id": session.session_id, "started_at": session.started_at.isoformat(), "ended_at": session.ended_at.isoformat() if session.ended_at else None, "events_count": len(session.events), "screenshots_count": len(session.screenshots), "user": session.user }) except Exception as e: logger.warning(f"Erreur lecture session {session_dir.name}: {e}") return { "sessions": sessions, "total": len(sessions) } @app.get("/") async def root(): """Page d'accueil.""" return { "message": "RPA Vision V3 - Agent Upload API", "endpoints": { "upload": "POST /api/traces/upload", "status": "GET /api/traces/status", "sessions": "GET /api/traces/sessions" } } @app.get("/healthz") async def healthz(): """Healthcheck ultra simple (k8s/systemd timers).""" # Note: on évite les dépendances lourdes ici. C'est un ping. return { "status": "ok", "service": "rpa-vision-v3-api", "environment": ENVIRONMENT, "worker_mode": PROCESSING_WORKER_MODE, } @app.get("/metrics") async def metrics(): """Prometheus metrics endpoint (public).""" try: return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) except Exception as e: # On ne veut jamais faire tomber l'API si Prometheus se plante. return Response(content=f"# error: {e}\n", media_type="text/plain", status_code=500) @app.on_event("startup") async def startup_event(): """Démarrage du serveur - lance le worker de processing.""" if PROCESSING_WORKER_MODE != "thread": logger.info(f"Processing worker disabled for API (mode={PROCESSING_WORKER_MODE})") return try: from processing_queue import start_processing_worker from processing_pipeline import process_session_async start_processing_worker(process_session_async) logger.info("Worker de processing démarré (thread mode)") except Exception as e: logger.warning(f"Impossible de démarrer le worker: {e}") @app.on_event("shutdown") async def shutdown_event(): """Arrêt du serveur - arrête le worker proprement.""" if PROCESSING_WORKER_MODE != "thread": return try: from processing_queue import stop_processing_worker stop_processing_worker() logger.info("Worker de processing arrêté") except Exception as e: logger.warning(f"Erreur arrêt worker: {e}") @app.get("/api/traces/queue") async def get_queue_status(): """Status de la queue de processing.""" try: from processing_queue import get_queue queue = get_queue() all_items = queue.get_all() return { "total": len(all_items), "pending": sum(1 for i in all_items if i["status"] == "pending"), "processing": sum(1 for i in all_items if i["status"] == "processing"), "completed": sum(1 for i in all_items if i["status"] == "completed"), "failed": sum(1 for i in all_items if i["status"] == "failed"), "items": all_items[-20:] # 20 dernières } except Exception as e: return {"error": str(e)} if __name__ == "__main__": # Valider la sécurité en production from core.security import validate_production_security, get_security_config try: config = get_security_config() validate_production_security(config) except Exception as e: logger.error(f"Security validation failed: {e}") sys.exit(1) # Initialiser le système de cleanup from core.system import initialize_system_cleanup, shutdown_system initialize_system_cleanup() logger.info("Démarrage du serveur API...") logger.info(f"Upload dir: {UPLOAD_DIR.absolute()}") logger.info(f"Sessions dir: {SESSIONS_DIR.absolute()}") logger.info(f"Encryption password: {'***' if ENCRYPTION_PASSWORD != 'rpa_vision_v3_default_key' else 'DEFAULT (changer!)'}") try: uvicorn.run( app, host="0.0.0.0", port=8001, log_level="info" ) except KeyboardInterrupt: logger.info("Received keyboard interrupt, shutting down...") shutdown_system() except Exception as e: logger.error(f"Server error: {e}") shutdown_system() raise