""" API REST Complète - Expose toutes les fonctionnalités du core RPA Vision V3 Endpoints: - /api/core/capture - Capture d'écran - /api/core/detect - Détection UI - /api/core/embed - Génération d'embeddings - /api/core/match - Matching d'écran - /api/core/faiss - Gestion index FAISS - /api/core/pipeline - Pipeline complet """ import os import sys import json import base64 import tempfile from pathlib import Path from datetime import datetime from flask import Blueprint, jsonify, request, send_file from io import BytesIO # Add parent to path sys.path.insert(0, str(Path(__file__).parent.parent)) api_core = Blueprint('api_core', __name__, url_prefix='/api/core') # Lazy-loaded components _pipeline = None _capturer = None _detector = None def get_pipeline(): global _pipeline if _pipeline is None: from core.pipeline.workflow_pipeline import WorkflowPipeline _pipeline = WorkflowPipeline() return _pipeline def get_capturer(): global _capturer if _capturer is None: from core.capture.screen_capturer import ScreenCapturer _capturer = ScreenCapturer() return _capturer # ============================================================================= # Capture API # ============================================================================= @api_core.route('/capture', methods=['GET']) def capture_screen(): """Capture l'écran actuel et retourne l'image.""" try: capturer = get_capturer() frame = capturer.capture_frame() if frame is None: return jsonify({'error': 'Capture failed'}), 500 # Convert to base64 from PIL import Image img = Image.fromarray(frame.image) buffer = BytesIO() img.save(buffer, format='PNG') img_base64 = base64.b64encode(buffer.getvalue()).decode() return jsonify({ 'success': True, 'frame_id': frame.frame_id, 'timestamp': frame.timestamp.isoformat(), 'resolution': [frame.image.shape[1], frame.image.shape[0]], 'hash': frame.hash, 'changed': frame.changed_from_previous, 'window_info': frame.window_info, 'image_base64': img_base64 }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/capture/stats', methods=['GET']) def capture_stats(): """Statistiques de capture.""" try: capturer = get_capturer() stats = capturer.get_stats() return jsonify({ 'total_captures': stats.total_captures, 'captures_per_second': stats.captures_per_second, 'unchanged_skipped': stats.unchanged_frames_skipped, 'avg_capture_time_ms': stats.average_capture_time_ms, 'buffer_size': stats.buffer_size, 'memory_mb': stats.memory_usage_mb }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/capture/window', methods=['GET']) def get_active_window(): """Info sur la fenêtre active.""" try: capturer = get_capturer() window = capturer.get_active_window() return jsonify({'window': window}) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # Detection API # ============================================================================= @api_core.route('/detect', methods=['POST']) def detect_ui(): """Détecte les éléments UI dans une image.""" try: data = request.get_json() # Accept base64 image or file path if 'image_base64' in data: import base64 from PIL import Image img_data = base64.b64decode(data['image_base64']) img = Image.open(BytesIO(img_data)) # Save to temp file with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: img.save(f.name) image_path = f.name elif 'image_path' in data: image_path = data['image_path'] else: return jsonify({'error': 'image_base64 or image_path required'}), 400 pipeline = get_pipeline() if pipeline.ui_detector is None: return jsonify({'error': 'UI detector not available'}), 503 elements = pipeline.ui_detector.detect(image_path) return jsonify({ 'success': True, 'elements_count': len(elements), 'elements': [ { 'element_id': el.element_id, 'type': el.type, 'role': el.role, 'label': el.label, 'bbox': list(el.bbox), 'confidence': el.confidence } for el in elements ] }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # Embedding API # ============================================================================= @api_core.route('/embed/image', methods=['POST']) def embed_image(): """Génère un embedding pour une image.""" try: data = request.get_json() if 'image_base64' not in data: return jsonify({'error': 'image_base64 required'}), 400 import base64 from PIL import Image import numpy as np img_data = base64.b64decode(data['image_base64']) img = Image.open(BytesIO(img_data)) pipeline = get_pipeline() embedding = pipeline.clip_embedder.embed_image(img) return jsonify({ 'success': True, 'dimensions': len(embedding), 'embedding': embedding.tolist() }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/embed/text', methods=['POST']) def embed_text(): """Génère un embedding pour du texte.""" try: data = request.get_json() text = data.get('text') if not text: return jsonify({'error': 'text required'}), 400 pipeline = get_pipeline() embedding = pipeline.clip_embedder.embed_text(text) return jsonify({ 'success': True, 'dimensions': len(embedding), 'embedding': embedding.tolist() }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/embed/similarity', methods=['POST']) def compute_similarity(): """Calcule la similarité entre deux embeddings.""" try: data = request.get_json() emb1 = data.get('embedding1') emb2 = data.get('embedding2') if not emb1 or not emb2: return jsonify({'error': 'embedding1 and embedding2 required'}), 400 import numpy as np v1 = np.array(emb1) v2 = np.array(emb2) # Cosine similarity similarity = float(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))) return jsonify({ 'success': True, 'similarity': similarity }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # FAISS API # ============================================================================= @api_core.route('/faiss/stats', methods=['GET']) def faiss_stats(): """Statistiques de l'index FAISS.""" try: pipeline = get_pipeline() stats = pipeline.faiss_manager.get_stats() return jsonify(stats) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/faiss/add', methods=['POST']) def faiss_add(): """Ajoute un embedding à l'index FAISS.""" try: data = request.get_json() embedding_id = data.get('embedding_id') embedding = data.get('embedding') metadata = data.get('metadata', {}) if not embedding_id or not embedding: return jsonify({'error': 'embedding_id and embedding required'}), 400 import numpy as np vector = np.array(embedding, dtype=np.float32) pipeline = get_pipeline() pipeline.faiss_manager.add_embedding(embedding_id, vector, metadata) return jsonify({ 'success': True, 'embedding_id': embedding_id, 'total_embeddings': pipeline.faiss_manager.get_stats().get('total_embeddings', 0) }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/faiss/search', methods=['POST']) def faiss_search(): """Recherche les embeddings similaires.""" try: data = request.get_json() query = data.get('query') k = data.get('k', 5) if not query: return jsonify({'error': 'query embedding required'}), 400 import numpy as np query_vector = np.array(query, dtype=np.float32) pipeline = get_pipeline() results = pipeline.faiss_manager.search(query_vector, k=k) return jsonify({ 'success': True, 'results': results }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # Pipeline API # ============================================================================= @api_core.route('/pipeline/match', methods=['POST']) def pipeline_match(): """Match l'écran actuel avec les workflows connus.""" try: data = request.get_json() or {} workflow_id = data.get('workflow_id') # Capture current screen capturer = get_capturer() frame = capturer.capture_frame() if frame is None: return jsonify({'error': 'Capture failed'}), 500 # Save to temp file from PIL import Image with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: img = Image.fromarray(frame.image) img.save(f.name) screenshot_path = f.name pipeline = get_pipeline() match = pipeline.match_current_state( screenshot_path, workflow_id=workflow_id, window_title=frame.window_info.get('title') if frame.window_info else None ) return jsonify({ 'success': True, 'match': match, 'window_info': frame.window_info }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/pipeline/process', methods=['POST']) def pipeline_process(): """Traite une session complète.""" try: data = request.get_json() session_id = data.get('session_id') if not session_id: return jsonify({'error': 'session_id required'}), 400 # This would trigger async processing return jsonify({ 'success': True, 'message': f'Processing started for {session_id}', 'status': 'queued' }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # Workflow Export/Import API # ============================================================================= @api_core.route('/workflows/export/', methods=['GET']) def export_workflow(workflow_id): """Exporte un workflow en JSON.""" try: pipeline = get_pipeline() workflow = pipeline.load_workflow(workflow_id) if not workflow: return jsonify({'error': 'Workflow not found'}), 404 # Return as downloadable JSON workflow_json = workflow.to_json() return jsonify({ 'success': True, 'workflow_id': workflow_id, 'workflow': json.loads(workflow_json) }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/workflows/export//download', methods=['GET']) def download_workflow(workflow_id): """Télécharge un workflow en fichier JSON.""" try: pipeline = get_pipeline() workflow = pipeline.load_workflow(workflow_id) if not workflow: return jsonify({'error': 'Workflow not found'}), 404 workflow_json = workflow.to_json() buffer = BytesIO(workflow_json.encode('utf-8')) buffer.seek(0) return send_file( buffer, mimetype='application/json', as_attachment=True, download_name=f'{workflow_id}.json' ) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/workflows/import', methods=['POST']) def import_workflow(): """Importe un workflow depuis JSON.""" try: data = request.get_json() workflow_data = data.get('workflow') overwrite = data.get('overwrite', False) if not workflow_data: return jsonify({'error': 'workflow data required'}), 400 from core.models.workflow_graph import Workflow workflow = Workflow.from_dict(workflow_data) workflow_id = workflow.workflow_id # Check if exists workflows_dir = Path('data/training/workflows') workflows_dir.mkdir(parents=True, exist_ok=True) workflow_path = workflows_dir / f'{workflow_id}.json' if workflow_path.exists() and not overwrite: return jsonify({ 'error': f'Workflow {workflow_id} already exists. Set overwrite=true to replace.' }), 409 # Save workflow.save_to_file(workflow_path) return jsonify({ 'success': True, 'workflow_id': workflow_id, 'message': 'Workflow imported successfully' }) except Exception as e: return jsonify({'error': str(e)}), 500 @api_core.route('/workflows/clone/', methods=['POST']) def clone_workflow(workflow_id): """Clone un workflow avec un nouveau nom.""" try: data = request.get_json() or {} new_id = data.get('new_id', f'{workflow_id}_copy') new_name = data.get('new_name') pipeline = get_pipeline() workflow = pipeline.load_workflow(workflow_id) if not workflow: return jsonify({'error': 'Workflow not found'}), 404 # Clone workflow_dict = workflow.to_dict() workflow_dict['workflow_id'] = new_id if new_name: workflow_dict['name'] = new_name workflow_dict['created_at'] = datetime.now().isoformat() workflow_dict['updated_at'] = datetime.now().isoformat() from core.models.workflow_graph import Workflow new_workflow = Workflow.from_dict(workflow_dict) # Save workflows_dir = Path('data/training/workflows') new_workflow.save_to_file(workflows_dir / f'{new_id}.json') return jsonify({ 'success': True, 'original_id': workflow_id, 'new_id': new_id }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================= # Health & Info # ============================================================================= @api_core.route('/health', methods=['GET']) def health(): """Health check.""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat() }) @api_core.route('/info', methods=['GET']) def info(): """Informations sur l'API.""" return jsonify({ 'name': 'RPA Vision V3 Core API', 'version': '1.0.0', 'endpoints': [ '/api/core/capture', '/api/core/capture/stats', '/api/core/capture/window', '/api/core/detect', '/api/core/embed/image', '/api/core/embed/text', '/api/core/embed/similarity', '/api/core/faiss/stats', '/api/core/faiss/add', '/api/core/faiss/search', '/api/core/pipeline/match', '/api/core/pipeline/process', '/api/core/workflows/export/', '/api/core/workflows/export//download', '/api/core/workflows/import', '/api/core/workflows/clone/', '/api/core/health', '/api/core/info' ] })