diff --git a/agent_chat/__init__.py b/agent_chat/__init__.py index 33ed4717f..f81b22526 100644 --- a/agent_chat/__init__.py +++ b/agent_chat/__init__.py @@ -43,6 +43,15 @@ from .conversation_manager import ( get_conversation_manager ) +from .autonomous_planner import ( + AutonomousPlanner, + ExecutionPlan, + PlannedAction, + ActionResult, + ActionType, + get_autonomous_planner +) + __all__ = [ # Intent Parser "IntentParser", @@ -66,4 +75,11 @@ __all__ = [ "ConversationTurn", "ConversationContext", "get_conversation_manager", + # Autonomous Planner (Agent Libre) + "AutonomousPlanner", + "ExecutionPlan", + "PlannedAction", + "ActionResult", + "ActionType", + "get_autonomous_planner", ] diff --git a/agent_chat/app.py b/agent_chat/app.py index 0ef82abf6..eb599a0cf 100644 --- a/agent_chat/app.py +++ b/agent_chat/app.py @@ -41,6 +41,7 @@ from .intent_parser import IntentParser, IntentType, get_intent_parser from .confirmation import ConfirmationLoop, ConfirmationStatus, RiskLevel, get_confirmation_loop from .response_generator import ResponseGenerator, get_response_generator from .conversation_manager import ConversationManager, get_conversation_manager +from .autonomous_planner import AutonomousPlanner, get_autonomous_planner, ExecutionPlan # GPU Resource Manager (optional) try: @@ -74,6 +75,7 @@ intent_parser: Optional[IntentParser] = None confirmation_loop: Optional[ConfirmationLoop] = None response_generator: Optional[ResponseGenerator] = None conversation_manager: Optional[ConversationManager] = None +autonomous_planner: Optional[AutonomousPlanner] = None # Execution components workflow_pipeline = None @@ -95,6 +97,7 @@ def init_system(): """Initialiser tous les composants du système.""" global matcher, gpu_manager global intent_parser, confirmation_loop, response_generator, conversation_manager + global autonomous_planner # 1. SemanticMatcher try: @@ -178,6 +181,24 @@ def init_system(): else: logger.info("ℹ Mode simulation (composants d'exécution non disponibles)") + # 5. Autonomous Planner (Agent Libre) + try: + autonomous_planner = get_autonomous_planner(llm_model="qwen2.5:7b") + + # Configurer les callbacks pour l'exécution + if screen_capturer: + autonomous_planner.set_screen_capturer(screen_capturer.capture) + + def progress_callback(data): + socketio.emit('agent_progress', data) + + autonomous_planner.set_progress_callback(progress_callback) + + logger.info(f"✓ AutonomousPlanner initialisé (LLM: {autonomous_planner.llm_available})") + except Exception as e: + logger.warning(f"⚠ AutonomousPlanner: {e}") + autonomous_planner = None + # ============================================================================= # Routes Web @@ -185,7 +206,13 @@ def init_system(): @app.route('/') def index(): - """Page principale.""" + """Page principale - nouvelle interface chat.""" + return render_template('chat.html') + + +@app.route('/classic') +def classic(): + """Ancienne interface (fallback).""" return render_template('command.html') @@ -614,6 +641,174 @@ def api_llm_set_model(): return jsonify({"success": False, "error": "IntentParser non initialisé"}) +# ============================================================================= +# API Agent Libre (Autonomous Mode) +# ============================================================================= + +@app.route('/api/agent/plan', methods=['POST']) +def api_agent_plan(): + """ + Génère un plan d'exécution pour une tâche en langage naturel. + + Le mode "Agent Libre" permet d'exécuter des tâches sans workflow pré-enregistré. + Le LLM (Qwen) décompose la demande en étapes d'actions. + """ + if not autonomous_planner: + return jsonify({"error": "Agent autonome non disponible"}), 503 + + data = request.json + user_request = data.get('request', '').strip() + + if not user_request: + return jsonify({"error": "Requête vide"}), 400 + + try: + # Contexte optionnel (écran actuel, etc.) + context = data.get('context', {}) + + # Générer le plan + plan = autonomous_planner.plan(user_request, context) + + return jsonify({ + "success": True, + "plan": { + "task": plan.task_description, + "steps": [ + { + "step": s.step_number, + "action": s.action_type.value, + "description": s.description, + "target": s.target, + "params": s.parameters, + "expected_result": s.expected_result + } + for s in plan.steps + ], + "estimated_seconds": plan.estimated_duration_seconds, + "risk_level": plan.risk_level, + "requires_confirmation": plan.requires_confirmation + }, + "llm_available": autonomous_planner.llm_available + }) + + except Exception as e: + logger.error(f"Agent plan error: {e}") + return jsonify({"error": str(e)}), 500 + + +@app.route('/api/agent/execute', methods=['POST']) +def api_agent_execute(): + """ + Exécute un plan d'agent autonome. + + Attend un objet plan (généré par /api/agent/plan) et l'exécute étape par étape. + """ + if not autonomous_planner: + return jsonify({"error": "Agent autonome non disponible"}), 503 + + data = request.json + plan_data = data.get('plan') + + if not plan_data: + return jsonify({"error": "Plan manquant"}), 400 + + try: + # Reconstruire le plan depuis les données + from .autonomous_planner import PlannedAction, ActionType + + steps = [] + for step_data in plan_data.get('steps', []): + action_type_str = step_data.get('action', 'click') + action_type_map = { + 'open_app': ActionType.OPEN_APP, + 'open_url': ActionType.OPEN_URL, + 'click': ActionType.CLICK, + 'type_text': ActionType.TYPE_TEXT, + 'hotkey': ActionType.HOTKEY, + 'scroll': ActionType.SCROLL, + 'wait': ActionType.WAIT, + 'screenshot': ActionType.SCREENSHOT + } + + steps.append(PlannedAction( + step_number=step_data.get('step', len(steps) + 1), + action_type=action_type_map.get(action_type_str, ActionType.CLICK), + description=step_data.get('description', ''), + target=step_data.get('target'), + parameters=step_data.get('params', {}), + expected_result=step_data.get('expected_result') + )) + + plan = ExecutionPlan( + task_description=plan_data.get('task', ''), + steps=steps, + estimated_duration_seconds=plan_data.get('estimated_seconds', 30), + risk_level=plan_data.get('risk_level', 'low') + ) + + # Exécuter en arrière-plan + socketio.start_background_task(execute_agent_plan, plan) + + return jsonify({ + "success": True, + "message": "Exécution démarrée", + "steps_count": len(steps) + }) + + except Exception as e: + logger.error(f"Agent execute error: {e}") + return jsonify({"error": str(e)}), 500 + + +@app.route('/api/agent/status') +def api_agent_status(): + """Statut de l'agent autonome.""" + return jsonify({ + "available": autonomous_planner is not None, + "llm_available": autonomous_planner.llm_available if autonomous_planner else False, + "llm_model": autonomous_planner.llm_model if autonomous_planner else None + }) + + +def execute_agent_plan(plan: ExecutionPlan): + """Exécute un plan d'agent en arrière-plan.""" + import asyncio + + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + results = loop.run_until_complete(autonomous_planner.execute_plan(plan)) + + loop.close() + + # Envoyer le résultat final + success_count = sum(1 for r in results if r.success) + total = len(results) + + socketio.emit('execution_completed', { + "success": success_count == total, + "workflow": plan.task_description, + "message": f"{success_count}/{total} étapes réussies", + "results": [ + { + "step": r.action.step_number, + "success": r.success, + "message": r.message + } + for r in results + ] + }) + + except Exception as e: + logger.error(f"Agent execution error: {e}") + socketio.emit('execution_completed', { + "success": False, + "workflow": plan.task_description, + "message": f"Erreur: {str(e)}" + }) + + @app.route('/api/help') def api_help(): """Aide et mode d'emploi.""" diff --git a/agent_chat/autonomous_planner.py b/agent_chat/autonomous_planner.py new file mode 100644 index 000000000..bb38297cf --- /dev/null +++ b/agent_chat/autonomous_planner.py @@ -0,0 +1,1013 @@ +#!/usr/bin/env python3 +""" +Autonomous Planner - Agent Libre pour RPA Vision V3 + +Ce module permet d'exécuter des tâches sans workflow pré-enregistré. +Il utilise un LLM (Qwen via Ollama) pour : +1. Comprendre l'intention utilisateur +2. Décomposer en étapes d'actions +3. Adapter dynamiquement selon le contexte visuel + +Auteur: Dom - Janvier 2026 +""" + +import json +import logging +import time +import re +import sys +import os +from dataclasses import dataclass, field +from typing import List, Dict, Any, Optional, Callable, Tuple +from enum import Enum +from datetime import datetime + +import requests + +# Ajouter le chemin du projet pour les imports core +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +logger = logging.getLogger(__name__) + +# Essayer d'importer les composants de détection visuelle +try: + from core.detection.owl_detector import OwlDetector, OWL_AVAILABLE + VISUAL_DETECTION_AVAILABLE = OWL_AVAILABLE +except ImportError: + VISUAL_DETECTION_AVAILABLE = False + OwlDetector = None + +# Essayer d'importer le client VLM pour analyse intelligente +try: + from core.detection.ollama_client import OllamaClient + VLM_AVAILABLE = True +except ImportError: + VLM_AVAILABLE = False + OllamaClient = None + +try: + from PIL import Image as PILImage + import pyautogui + PYAUTOGUI_AVAILABLE = True +except ImportError: + PYAUTOGUI_AVAILABLE = False + PILImage = None + pyautogui = None + + +class ActionType(Enum): + """Types d'actions supportées par l'agent autonome.""" + CLICK = "click" + TYPE_TEXT = "type_text" + HOTKEY = "hotkey" + SCROLL = "scroll" + WAIT = "wait" + OPEN_APP = "open_app" + OPEN_URL = "open_url" + SCREENSHOT = "screenshot" + FIND_ELEMENT = "find_element" + + +@dataclass +class PlannedAction: + """Une action planifiée par le LLM.""" + step_number: int + action_type: ActionType + description: str + target: Optional[str] = None # Description textuelle de la cible + parameters: Dict[str, Any] = field(default_factory=dict) + expected_result: Optional[str] = None + fallback: Optional[str] = None + + +@dataclass +class ExecutionPlan: + """Plan d'exécution complet généré par le LLM.""" + task_description: str + steps: List[PlannedAction] + estimated_duration_seconds: int = 30 + requires_confirmation: bool = True + risk_level: str = "low" # low, medium, high + created_at: datetime = field(default_factory=datetime.now) + + +@dataclass +class ActionResult: + """Résultat d'une action exécutée.""" + success: bool + action: PlannedAction + message: str + screenshot_path: Optional[str] = None + duration_ms: int = 0 + error: Optional[str] = None + + +class AutonomousPlanner: + """ + Planificateur autonome utilisant LLM pour générer des plans d'action. + """ + + def __init__( + self, + llm_endpoint: str = "http://localhost:11434/api/generate", + llm_model: str = "qwen2.5:7b", + timeout: int = 60 + ): + self.llm_endpoint = llm_endpoint + self.llm_model = llm_model + self.timeout = timeout + self.llm_available = self._check_llm() + + # Callbacks pour l'exécution + self._action_executor: Optional[Callable] = None + self._screen_capturer: Optional[Callable] = None + self._progress_callback: Optional[Callable] = None + + # Détecteur visuel (OWL-v2) + self._owl_detector: Optional[OwlDetector] = None + self._init_visual_detection() + + # Client VLM pour analyse intelligente d'écran + self._vlm_client = None + self._init_vlm_client() + + logger.info(f"AutonomousPlanner initialized (LLM: {self.llm_model}, available: {self.llm_available}, visual: {self._owl_detector is not None}, vlm: {self._vlm_client is not None})") + + def _init_visual_detection(self): + """Initialise le détecteur visuel OWL-v2.""" + if VISUAL_DETECTION_AVAILABLE and OwlDetector: + try: + self._owl_detector = OwlDetector(confidence_threshold=0.1) + logger.info("OWL-v2 visual detector initialized") + except Exception as e: + logger.warning(f"Could not initialize OWL detector: {e}") + self._owl_detector = None + + def _init_vlm_client(self): + """Initialise le client VLM pour analyse intelligente.""" + if VLM_AVAILABLE and OllamaClient: + try: + self._vlm_client = OllamaClient(model="qwen2.5vl:7b") + logger.info("VLM client initialized (qwen2.5vl:7b)") + except Exception as e: + logger.warning(f"Could not initialize VLM client: {e}") + self._vlm_client = None + + def _analyze_screen_for_element( + self, + description: str, + screenshot=None + ) -> Optional[Tuple[int, int]]: + """ + Utilise le VLM pour analyser l'écran et trouver un élément. + Plus intelligent que OWL car peut comprendre le contexte. + + Args: + description: Description de l'élément à trouver + screenshot: Capture d'écran (capturée si non fournie) + + Returns: + Tuple (x, y) des coordonnées ou None + """ + if not self._vlm_client: + return None + + if screenshot is None: + screenshot = self._capture_screen() + + if screenshot is None: + return None + + # Prompt pour le VLM + prompt = f"""Analyse cette capture d'écran et trouve l'élément suivant: "{description}" + +IMPORTANT: +- Ignore les publicités et les éléments sponsorisés +- Cherche un vrai résultat de recherche, pas une annonce +- Si c'est une vidéo, cherche une miniature de vidéo avec un titre pertinent + +Réponds UNIQUEMENT avec les coordonnées X,Y du CENTRE de l'élément au format: +COORDINATES: X, Y + +Si tu ne trouves pas l'élément, réponds: +NOT_FOUND""" + + try: + result = self._vlm_client.generate( + prompt=prompt, + image=screenshot, + temperature=0.1, + max_tokens=100 + ) + + if result.get('success'): + response = result.get('response', '') + logger.info(f"VLM response: {response}") + + # Parser les coordonnées + coord_match = re.search(r'COORDINATES:\s*(\d+)\s*,\s*(\d+)', response) + if coord_match: + x = int(coord_match.group(1)) + y = int(coord_match.group(2)) + logger.info(f"VLM found element at ({x}, {y})") + return (x, y) + + except Exception as e: + logger.warning(f"VLM analysis failed: {e}") + + return None + + def _capture_screen(self): + """Capture l'écran actuel. Retourne PIL.Image ou None.""" + if not PYAUTOGUI_AVAILABLE: + return None + try: + screenshot = pyautogui.screenshot() + return screenshot + except Exception as e: + logger.warning(f"Screen capture failed: {e}") + return None + + def _find_element_by_description( + self, + description: str, + screenshot=None + ) -> Optional[Tuple[int, int]]: + """ + Trouve un élément à l'écran par sa description textuelle. + + Args: + description: Description de l'élément (ex: "search bar", "play button") + screenshot: Capture d'écran (capturée si non fournie) + + Returns: + Tuple (x, y) des coordonnées du centre de l'élément, ou None si non trouvé + """ + if screenshot is None: + screenshot = self._capture_screen() + + if screenshot is None: + return None + + # Essayer avec OWL-v2 si disponible + if self._owl_detector: + try: + # Créer des variantes de la requête pour OWL + queries = self._generate_owl_queries(description) + logger.info(f"OWL searching for: {queries}") + + detections = self._owl_detector.detect(screenshot, queries, confidence_threshold=0.05) + + if detections: + # Prendre la détection avec le meilleur score + best = max(detections, key=lambda d: d['confidence']) + center = best.get('center') + if center: + logger.info(f"OWL found '{best['label']}' at {center} (conf: {best['confidence']:.2f})") + return (int(center[0]), int(center[1])) + except Exception as e: + logger.warning(f"OWL detection failed: {e}") + + # Fallback 2: Utiliser le VLM pour analyse intelligente + # Particulièrement utile pour distinguer les pubs des vrais résultats + if self._vlm_client and ("vidéo" in description.lower() or "video" in description.lower()): + logger.info("Trying VLM analysis for intelligent element detection...") + vlm_result = self._analyze_screen_for_element(description, screenshot) + if vlm_result: + return vlm_result + + # Fallback 3: positions heuristiques basées sur la description + return self._heuristic_position(description, screenshot.size if screenshot else (1920, 1080)) + + def _generate_owl_queries(self, description: str) -> List[str]: + """Génère des requêtes OWL à partir d'une description.""" + desc_lower = description.lower() + queries = [] + + # Mapping description -> termes OWL + if "recherche" in desc_lower or "search" in desc_lower: + queries.extend(["search bar", "search box", "text input", "search field"]) + elif "vidéo" in desc_lower or "video" in desc_lower or "miniature" in desc_lower: + queries.extend(["video thumbnail", "video preview", "video card"]) + elif "bouton" in desc_lower or "button" in desc_lower: + queries.extend(["button", "click button", "submit button"]) + elif "play" in desc_lower or "lecture" in desc_lower: + queries.extend(["play button", "video player"]) + else: + # Utiliser la description directement + queries.append(description) + + return queries + + def _heuristic_position( + self, + description: str, + screen_size: Tuple[int, int] + ) -> Optional[Tuple[int, int]]: + """Position heuristique basée sur la description (fallback).""" + width, height = screen_size + desc_lower = description.lower() + + # YouTube specific heuristics (après scroll, les vidéos sont plus visibles) + if "recherche" in desc_lower or "search" in desc_lower: + # Barre de recherche YouTube: centre-haut + return (width // 2, 60) + elif ("vidéo" in desc_lower or "video" in desc_lower or "miniature" in desc_lower) and ("musique" in desc_lower or "music" in desc_lower or "première" in desc_lower or "visible" in desc_lower): + # Première vidéo YouTube visible (après scroll, au milieu de l'écran) + # Position adaptée pour différentes résolutions + x = width // 4 # Colonne gauche où sont les miniatures + y = height // 3 # Tiers supérieur après scroll + logger.info(f"Heuristic video position: ({x}, {y}) for screen {width}x{height}") + return (x, y) + elif "bouton" in desc_lower and "recherche" in desc_lower: + # Bouton de recherche YouTube (loupe) + return (width // 2 + 280, 60) + + # Position par défaut: centre + return (width // 2, height // 2) + + def _check_llm(self) -> bool: + """Vérifie si le LLM est disponible.""" + try: + response = requests.get( + "http://localhost:11434/api/tags", + timeout=5 + ) + if response.status_code == 200: + models = response.json().get('models', []) + model_names = [m['name'] for m in models] + # Vérifier si notre modèle est disponible + if any(self.llm_model in name for name in model_names): + return True + logger.warning(f"Model {self.llm_model} not found. Available: {model_names}") + return False + except Exception as e: + logger.warning(f"LLM check failed: {e}") + return False + + def set_action_executor(self, executor: Callable): + """Configure l'exécuteur d'actions.""" + self._action_executor = executor + + def set_screen_capturer(self, capturer: Callable): + """Configure le capteur d'écran.""" + self._screen_capturer = capturer + + def set_progress_callback(self, callback: Callable): + """Configure le callback de progression.""" + self._progress_callback = callback + + def _notify_progress(self, step: int, total: int, message: str, status: str = "running"): + """Notifie la progression.""" + if self._progress_callback: + self._progress_callback({ + "step": step, + "total": total, + "message": message, + "status": status, + "percent": int((step / total) * 100) if total > 0 else 0 + }) + + def plan(self, user_request: str, context: Optional[Dict] = None) -> ExecutionPlan: + """ + Génère un plan d'exécution à partir d'une requête utilisateur. + + Args: + user_request: La demande en langage naturel + context: Contexte optionnel (écran actuel, apps ouvertes, etc.) + + Returns: + ExecutionPlan avec les étapes à exécuter + """ + if not self.llm_available: + return self._fallback_plan(user_request) + + # Construire le prompt + prompt = self._build_planning_prompt(user_request, context) + + try: + # Appeler le LLM + response = requests.post( + self.llm_endpoint, + json={ + "model": self.llm_model, + "prompt": prompt, + "stream": False, + "options": { + "temperature": 0.3, # Déterministe + "num_predict": 1500 + } + }, + timeout=self.timeout + ) + + if response.status_code == 200: + result = response.json() + llm_response = result.get('response', '') + return self._parse_plan(user_request, llm_response) + else: + logger.error(f"LLM request failed: {response.status_code}") + return self._fallback_plan(user_request) + + except Exception as e: + logger.error(f"Planning error: {e}") + return self._fallback_plan(user_request) + + def _build_planning_prompt(self, user_request: str, context: Optional[Dict] = None) -> str: + """Construit le prompt pour le LLM.""" + + context_info = "" + if context: + context_info = f""" +Contexte actuel: +- Application active: {context.get('active_app', 'inconnue')} +- Fenêtre: {context.get('window_title', 'inconnue')} +- Écran: {context.get('screen_info', 'standard')} +""" + + prompt = f"""Tu es un assistant RPA intelligent qui planifie des actions sur ordinateur. + +TÂCHE DEMANDÉE: {user_request} +{context_info} + +ACTIONS DISPONIBLES: +- open_app: Ouvrir une application (params: app_name - ex: "firefox", "terminal", "code") +- open_url: Ouvrir une URL dans le navigateur (params: url) +- click: Cliquer sur un élément visuel (target: description précise de l'élément à cliquer) +- type_text: Taper du texte (target: description du champ, params: text) +- hotkey: Raccourci clavier (params: keys - ex: "ctrl+t", "ctrl+l", "Return", "Escape", "Tab") +- scroll: Défiler (params: direction - "up"/"down", amount - nombre de scrolls) +- wait: Attendre (params: seconds) + +PRINCIPES DE PLANIFICATION: +1. Décompose la tâche en étapes simples, une action par étape +2. Ajoute wait (2-3s) après open_url/open_app pour laisser le temps au chargement +3. Pour taper dans un champ web, utilise d'abord click sur le champ OU hotkey pour focus (ex: ctrl+l pour barre d'adresse) +4. Après avoir tapé dans un champ de recherche, ajoute hotkey "Return" pour valider +5. Pour click, décris précisément l'élément visuel (ex: "bouton bleu Envoyer", "première image de la liste") +6. Si la page peut avoir des publicités, ajoute un scroll pour les passer avant de cliquer + +RACCOURCIS UTILES: +- ctrl+l : Focus sur la barre d'adresse du navigateur +- ctrl+t : Nouvel onglet +- ctrl+f : Rechercher dans la page +- Return : Valider/Entrée +- Escape : Annuler/Fermer + +RÉPONDS EN JSON UNIQUEMENT: +{{ + "steps": [ + {{ + "step": 1, + "action": "open_url|click|type_text|hotkey|wait|scroll", + "target": "description visuelle de la cible", + "params": {{"url": "...", "text": "...", "keys": "...", "seconds": 2}}, + "description": "Ce que fait cette étape", + "expected_result": "Résultat attendu" + }} + ], + "risk_level": "low|medium|high", + "estimated_seconds": 30 +}} + +JSON:""" + + return prompt + + def _parse_plan(self, user_request: str, llm_response: str) -> ExecutionPlan: + """Parse la réponse du LLM en ExecutionPlan.""" + + try: + # Extraire le JSON de la réponse + json_match = re.search(r'\{[\s\S]*\}', llm_response) + if not json_match: + logger.warning("No JSON found in LLM response") + return self._fallback_plan(user_request) + + data = json.loads(json_match.group()) + + steps = [] + for step_data in data.get('steps', []): + action_type_str = step_data.get('action', 'click') + + # Mapper vers ActionType + action_type_map = { + 'open_app': ActionType.OPEN_APP, + 'open_url': ActionType.OPEN_URL, + 'click': ActionType.CLICK, + 'type_text': ActionType.TYPE_TEXT, + 'hotkey': ActionType.HOTKEY, + 'scroll': ActionType.SCROLL, + 'wait': ActionType.WAIT, + 'screenshot': ActionType.SCREENSHOT, + 'find_element': ActionType.FIND_ELEMENT + } + + action_type = action_type_map.get(action_type_str, ActionType.CLICK) + + steps.append(PlannedAction( + step_number=step_data.get('step', len(steps) + 1), + action_type=action_type, + description=step_data.get('description', ''), + target=step_data.get('target'), + parameters=step_data.get('params', {}), + expected_result=step_data.get('expected_result') + )) + + return ExecutionPlan( + task_description=user_request, + steps=steps, + estimated_duration_seconds=data.get('estimated_seconds', 30), + risk_level=data.get('risk_level', 'low'), + requires_confirmation=data.get('risk_level', 'low') != 'low' + ) + + except json.JSONDecodeError as e: + logger.error(f"JSON parse error: {e}") + return self._fallback_plan(user_request) + + def _fallback_plan(self, user_request: str) -> ExecutionPlan: + """Plan de fallback si le LLM n'est pas disponible.""" + + # Détection simple de patterns + request_lower = user_request.lower() + steps = [] + + # Pattern: ouvrir navigateur/browser + if any(word in request_lower for word in ['navigateur', 'browser', 'chrome', 'firefox']): + steps.append(PlannedAction( + step_number=1, + action_type=ActionType.OPEN_APP, + description="Ouvrir le navigateur", + parameters={"app_name": "firefox"} + )) + + # Pattern: aller sur / ouvrir site + url_patterns = [ + (r'youtube', 'https://www.youtube.com'), + (r'google', 'https://www.google.com'), + (r'github', 'https://www.github.com'), + ] + + for pattern, url in url_patterns: + if pattern in request_lower: + steps.append(PlannedAction( + step_number=len(steps) + 1, + action_type=ActionType.OPEN_URL, + description=f"Ouvrir {pattern}", + parameters={"url": url} + )) + break + + # Pattern: chercher / rechercher + if any(word in request_lower for word in ['chercher', 'rechercher', 'search']): + # Extraire le terme de recherche + search_term = request_lower + for remove in ['cherche', 'recherche', 'search', 'sur youtube', 'sur google', 'une vidéo', 'video']: + search_term = search_term.replace(remove, '') + search_term = search_term.strip() + + if search_term: + steps.append(PlannedAction( + step_number=len(steps) + 1, + action_type=ActionType.CLICK, + description="Cliquer sur la barre de recherche", + target="barre de recherche" + )) + steps.append(PlannedAction( + step_number=len(steps) + 1, + action_type=ActionType.TYPE_TEXT, + description=f"Taper '{search_term}'", + parameters={"text": search_term} + )) + steps.append(PlannedAction( + step_number=len(steps) + 1, + action_type=ActionType.HOTKEY, + description="Appuyer sur Entrée", + parameters={"keys": "Return"} + )) + + # Pattern: lancer / jouer vidéo + if any(word in request_lower for word in ['lancer', 'jouer', 'play', 'regarder']): + steps.append(PlannedAction( + step_number=len(steps) + 1, + action_type=ActionType.WAIT, + description="Attendre le chargement", + parameters={"seconds": 2} + )) + steps.append(PlannedAction( + step_number=len(steps) + 1, + action_type=ActionType.CLICK, + description="Cliquer sur la première vidéo", + target="première miniature de vidéo" + )) + + if not steps: + # Plan générique + steps.append(PlannedAction( + step_number=1, + action_type=ActionType.SCREENSHOT, + description="Capturer l'écran actuel", + parameters={} + )) + + return ExecutionPlan( + task_description=user_request, + steps=steps, + estimated_duration_seconds=len(steps) * 5, + risk_level="low" + ) + + async def execute_plan(self, plan: ExecutionPlan) -> List[ActionResult]: + """ + Exécute un plan d'action. + + Args: + plan: Le plan à exécuter + + Returns: + Liste des résultats d'actions + """ + results = [] + total_steps = len(plan.steps) + + self._notify_progress(0, total_steps, "Démarrage de l'exécution...", "starting") + + for i, action in enumerate(plan.steps): + step_num = i + 1 + self._notify_progress( + step_num, + total_steps, + f"Étape {step_num}/{total_steps}: {action.description}", + "running" + ) + + try: + result = await self._execute_action(action) + results.append(result) + + if not result.success: + logger.warning(f"Step {step_num} failed: {result.message}") + # Continuer malgré l'échec (best effort) + + except Exception as e: + logger.error(f"Step {step_num} error: {e}") + results.append(ActionResult( + success=False, + action=action, + message=f"Erreur: {str(e)}", + error=str(e) + )) + + # Petit délai entre les actions + time.sleep(0.3) + + success_count = sum(1 for r in results if r.success) + self._notify_progress( + total_steps, + total_steps, + f"Terminé: {success_count}/{total_steps} étapes réussies", + "completed" if success_count == total_steps else "partial" + ) + + return results + + async def _execute_action(self, action: PlannedAction) -> ActionResult: + """Exécute une action individuelle.""" + + start_time = time.time() + + try: + if action.action_type == ActionType.OPEN_APP: + return await self._exec_open_app(action) + + elif action.action_type == ActionType.OPEN_URL: + return await self._exec_open_url(action) + + elif action.action_type == ActionType.CLICK: + return await self._exec_click(action) + + elif action.action_type == ActionType.TYPE_TEXT: + return await self._exec_type_text(action) + + elif action.action_type == ActionType.HOTKEY: + return await self._exec_hotkey(action) + + elif action.action_type == ActionType.SCROLL: + return await self._exec_scroll(action) + + elif action.action_type == ActionType.WAIT: + return await self._exec_wait(action) + + elif action.action_type == ActionType.SCREENSHOT: + return await self._exec_screenshot(action) + + else: + return ActionResult( + success=False, + action=action, + message=f"Type d'action non supporté: {action.action_type}", + duration_ms=int((time.time() - start_time) * 1000) + ) + + except Exception as e: + return ActionResult( + success=False, + action=action, + message=f"Erreur: {str(e)}", + error=str(e), + duration_ms=int((time.time() - start_time) * 1000) + ) + + async def _exec_open_app(self, action: PlannedAction) -> ActionResult: + """Ouvre une application.""" + import subprocess + + app_name = action.parameters.get('app_name', 'firefox') + + # Mapping des noms courants + app_commands = { + 'firefox': 'firefox', + 'chrome': 'google-chrome', + 'navigateur': 'firefox', + 'browser': 'firefox', + 'terminal': 'gnome-terminal', + 'files': 'nautilus', + 'code': 'code', + 'vscode': 'code' + } + + cmd = app_commands.get(app_name.lower(), app_name) + + try: + subprocess.Popen([cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + time.sleep(1) # Attendre que l'app démarre + + return ActionResult( + success=True, + action=action, + message=f"Application '{app_name}' ouverte" + ) + except FileNotFoundError: + return ActionResult( + success=False, + action=action, + message=f"Application '{app_name}' non trouvée" + ) + + async def _exec_open_url(self, action: PlannedAction) -> ActionResult: + """Ouvre une URL dans le navigateur.""" + import subprocess + + url = action.parameters.get('url', 'https://www.google.com') + + try: + subprocess.Popen(['xdg-open', url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + time.sleep(2) # Attendre le chargement + + return ActionResult( + success=True, + action=action, + message=f"URL '{url}' ouverte" + ) + except Exception as e: + return ActionResult( + success=False, + action=action, + message=f"Erreur ouverture URL: {e}" + ) + + async def _exec_click(self, action: PlannedAction) -> ActionResult: + """Exécute un clic sur un élément avec détection visuelle.""" + + target = action.target or action.parameters.get('target', '') + + # Si on a un ActionExecutor configuré, l'utiliser en priorité + if self._action_executor: + try: + screenshot = None + if self._screen_capturer: + screenshot = self._screen_capturer() + + result = self._action_executor( + action_type="click", + target=target, + screenshot=screenshot + ) + + return ActionResult( + success=result.get('success', False), + action=action, + message=result.get('message', 'Clic effectué') + ) + except Exception as e: + logger.warning(f"ActionExecutor failed: {e}, using visual detection") + + # Utiliser la détection visuelle + try: + if not PYAUTOGUI_AVAILABLE: + return ActionResult( + success=False, + action=action, + message="pyautogui non disponible" + ) + + # Capturer l'écran + screenshot = self._capture_screen() + + # Trouver l'élément par sa description + position = self._find_element_by_description(target, screenshot) + + if position: + x, y = position + logger.info(f"Clicking at ({x}, {y}) for target: '{target}'") + pyautogui.click(x, y) + + return ActionResult( + success=True, + action=action, + message=f"Clic à ({x}, {y}) pour '{target}'" + ) + else: + return ActionResult( + success=False, + action=action, + message=f"Élément non trouvé: '{target}'" + ) + + except Exception as e: + logger.error(f"Click error: {e}") + return ActionResult( + success=False, + action=action, + message=f"Erreur clic: {e}" + ) + + async def _exec_type_text(self, action: PlannedAction) -> ActionResult: + """Tape du texte dans un champ (clique d'abord sur le champ si spécifié).""" + try: + if not PYAUTOGUI_AVAILABLE: + return ActionResult( + success=False, + action=action, + message="pyautogui non disponible" + ) + + text = action.parameters.get('text', '') + target = action.target or action.parameters.get('target', '') + + # Si une cible est spécifiée, cliquer dessus d'abord + if target: + screenshot = self._capture_screen() + position = self._find_element_by_description(target, screenshot) + + if position: + x, y = position + logger.info(f"Clicking on input field at ({x}, {y}) before typing") + pyautogui.click(x, y) + time.sleep(0.5) # Attendre que le focus soit acquis + else: + # Champ non trouvé - on continue car le focus est peut-être déjà au bon endroit + # (ex: après un hotkey ctrl+l ou ctrl+k) + logger.info(f"Target '{target}' not found visually, assuming focus is already correct") + + # Taper le texte + time.sleep(0.2) + + # Utiliser write() pour le texte Unicode (français, etc.) + if text.isascii(): + pyautogui.typewrite(text, interval=0.03) + else: + # Pour les caractères non-ASCII, utiliser le presse-papier + import subprocess + # Copier dans le presse-papier + process = subprocess.Popen(['xclip', '-selection', 'clipboard'], stdin=subprocess.PIPE) + process.communicate(text.encode('utf-8')) + # Coller avec Ctrl+V + pyautogui.hotkey('ctrl', 'v') + + return ActionResult( + success=True, + action=action, + message=f"Texte tapé: '{text[:30]}...'" if len(text) > 30 else f"Texte tapé: '{text}'" + ) + + except Exception as e: + logger.error(f"Type text error: {e}") + return ActionResult( + success=False, + action=action, + message=f"Erreur frappe: {e}" + ) + + async def _exec_hotkey(self, action: PlannedAction) -> ActionResult: + """Exécute un raccourci clavier.""" + try: + import pyautogui + + keys = action.parameters.get('keys', 'Return') + + # Parser les combinaisons de touches + if '+' in keys: + key_list = [k.strip().lower() for k in keys.split('+')] + pyautogui.hotkey(*key_list) + else: + pyautogui.press(keys.lower()) + + return ActionResult( + success=True, + action=action, + message=f"Touche(s) '{keys}' pressée(s)" + ) + + except Exception as e: + return ActionResult( + success=False, + action=action, + message=f"Erreur hotkey: {e}" + ) + + async def _exec_scroll(self, action: PlannedAction) -> ActionResult: + """Effectue un scroll.""" + try: + import pyautogui + + direction = action.parameters.get('direction', 'down') + amount = action.parameters.get('amount', 3) + + scroll_amount = amount if direction == 'up' else -amount + pyautogui.scroll(scroll_amount) + + return ActionResult( + success=True, + action=action, + message=f"Scroll {direction} de {amount}" + ) + + except Exception as e: + return ActionResult( + success=False, + action=action, + message=f"Erreur scroll: {e}" + ) + + async def _exec_wait(self, action: PlannedAction) -> ActionResult: + """Attend un certain temps.""" + seconds = action.parameters.get('seconds', 1) + time.sleep(seconds) + + return ActionResult( + success=True, + action=action, + message=f"Attente de {seconds}s terminée" + ) + + async def _exec_screenshot(self, action: PlannedAction) -> ActionResult: + """Capture l'écran.""" + try: + if self._screen_capturer: + screenshot = self._screen_capturer() + # Sauvegarder si nécessaire + path = f"/tmp/screenshot_{datetime.now().strftime('%H%M%S')}.png" + + return ActionResult( + success=True, + action=action, + message="Screenshot capturé", + screenshot_path=path + ) + else: + import pyautogui + screenshot = pyautogui.screenshot() + path = f"/tmp/screenshot_{datetime.now().strftime('%H%M%S')}.png" + screenshot.save(path) + + return ActionResult( + success=True, + action=action, + message="Screenshot capturé", + screenshot_path=path + ) + + except Exception as e: + return ActionResult( + success=False, + action=action, + message=f"Erreur screenshot: {e}" + ) + + +# Singleton +_planner_instance: Optional[AutonomousPlanner] = None + + +def get_autonomous_planner( + llm_model: str = "qwen2.5:7b" +) -> AutonomousPlanner: + """Retourne l'instance singleton du planner.""" + global _planner_instance + + if _planner_instance is None: + _planner_instance = AutonomousPlanner(llm_model=llm_model) + + return _planner_instance diff --git a/agent_chat/templates/chat.html b/agent_chat/templates/chat.html new file mode 100644 index 000000000..504f8c441 --- /dev/null +++ b/agent_chat/templates/chat.html @@ -0,0 +1,1187 @@ + + +
+ + ++ Je suis votre assistant RPA. Décrivez ce que vous voulez faire en langage naturel. +
+