Files
rpa_vision_v3/agent_chat/autonomous_planner.py
Dom a2b82d3e76
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 14s
security-audit / pip-audit (CVE dépendances) (push) Successful in 12s
security-audit / Scan secrets (grep) (push) Successful in 9s
tests / Lint (ruff + black) (push) Successful in 15s
tests / Tests sécurité (critique) (push) Has been cancelled
tests / Tests unitaires (sans GPU) (push) Has been cancelled
fix(lint): ruff passe propre — 2 vrais bugs + suppression fichier corrompu
Vrais bugs corrigés :
- core/execution/target_resolver.py : suppression de 5 lignes de dead code
  après return (vestige de refacto incomplète référençant des params
  jamais assignés à self : similarity_threshold, use_spatial_fallback)
- agent_v0/agent_v1/core/executor.py:2180 : variable `prefill` référencée
  mais jamais définie. Initialisation explicite ajoutée en amont
  (conditionnée sur _is_thinking_popup, cohérent avec l'append du message)

Fichier supprimé :
- core/security/input_validator_new.py : contenu corrompu (texte inversé,
  artefact de copier-coller), jamais importé nulle part, 550 erreurs ruff
  à lui seul

Workflow CI :
- Exclusions ajoutées pour dossiers legacy connus cassés :
    - agent_v0/deploy/windows_client/ (clone obsolète)
    - tests/property/ (cf. MEMORY.md — imports cassés)
    - tests/integration/test_visual_rpa_checkpoint.py (VisualMetadata
      inexistant, déjà documenté)

Résultat : "ruff All checks passed!" sur core/ agent_v0/ tests/
(avec E9,F63,F7,F82 — syntax + undefined critiques).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 19:00:16 +02:00

1017 lines
36 KiB
Python

#!/usr/bin/env python3
"""
Autonomous Planner - Agent Libre pour RPA Vision V3
Ce module permet d'exécuter des tâches sans workflow pré-enregistré.
Il utilise un LLM (Qwen via Ollama) pour :
1. Comprendre l'intention utilisateur
2. Décomposer en étapes d'actions
3. Adapter dynamiquement selon le contexte visuel
Auteur: Dom - Janvier 2026
"""
import json
import logging
import time
import re
import sys
import os
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable, Tuple
from enum import Enum
from datetime import datetime
import requests
# Ajouter le chemin du projet pour les imports core
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
logger = logging.getLogger(__name__)
# Essayer d'importer les composants de détection visuelle
try:
from core.detection.owl_detector import OwlDetector, OWL_AVAILABLE
VISUAL_DETECTION_AVAILABLE = OWL_AVAILABLE
except ImportError:
VISUAL_DETECTION_AVAILABLE = False
OwlDetector = None
# Essayer d'importer le client VLM pour analyse intelligente
try:
from core.detection.ollama_client import OllamaClient
VLM_AVAILABLE = True
except ImportError:
VLM_AVAILABLE = False
OllamaClient = None
try:
from PIL import Image as PILImage
import pyautogui
PYAUTOGUI_AVAILABLE = True
except ImportError:
PYAUTOGUI_AVAILABLE = False
PILImage = None
pyautogui = None
class ActionType(Enum):
"""Types d'actions supportées par l'agent autonome."""
CLICK = "click"
TYPE_TEXT = "type_text"
HOTKEY = "hotkey"
SCROLL = "scroll"
WAIT = "wait"
OPEN_APP = "open_app"
OPEN_URL = "open_url"
SCREENSHOT = "screenshot"
FIND_ELEMENT = "find_element"
@dataclass
class PlannedAction:
"""Une action planifiée par le LLM."""
step_number: int
action_type: ActionType
description: str
target: Optional[str] = None # Description textuelle de la cible
parameters: Dict[str, Any] = field(default_factory=dict)
expected_result: Optional[str] = None
fallback: Optional[str] = None
@dataclass
class ExecutionPlan:
"""Plan d'exécution complet généré par le LLM."""
task_description: str
steps: List[PlannedAction]
estimated_duration_seconds: int = 30
requires_confirmation: bool = True
risk_level: str = "low" # low, medium, high
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class ActionResult:
"""Résultat d'une action exécutée."""
success: bool
action: PlannedAction
message: str
screenshot_path: Optional[str] = None
duration_ms: int = 0
error: Optional[str] = None
class AutonomousPlanner:
"""
Planificateur autonome utilisant LLM pour générer des plans d'action.
"""
def __init__(
self,
llm_endpoint: str = "http://localhost:11434/api/generate",
llm_model: str = "qwen2.5:7b",
timeout: int = 60
):
self.llm_endpoint = llm_endpoint
self.llm_model = llm_model
self.timeout = timeout
self.llm_available = self._check_llm()
# Callbacks pour l'exécution
self._action_executor: Optional[Callable] = None
self._screen_capturer: Optional[Callable] = None
self._progress_callback: Optional[Callable] = None
# Détecteur visuel (OWL-v2)
self._owl_detector: Optional[OwlDetector] = None
self._init_visual_detection()
# Client VLM pour analyse intelligente d'écran
self._vlm_client = None
self._init_vlm_client()
logger.info(f"AutonomousPlanner initialized (LLM: {self.llm_model}, available: {self.llm_available}, visual: {self._owl_detector is not None}, vlm: {self._vlm_client is not None})")
def _init_visual_detection(self):
"""Initialise le détecteur visuel OWL-v2."""
if VISUAL_DETECTION_AVAILABLE and OwlDetector:
try:
self._owl_detector = OwlDetector(confidence_threshold=0.1)
logger.info("OWL-v2 visual detector initialized")
except Exception as e:
logger.warning(f"Could not initialize OWL detector: {e}")
self._owl_detector = None
def _init_vlm_client(self):
"""Initialise le client VLM pour analyse intelligente."""
if VLM_AVAILABLE and OllamaClient:
try:
from core.detection.vlm_config import get_vlm_model
_planner_vlm = get_vlm_model()
self._vlm_client = OllamaClient(model=_planner_vlm)
logger.info("VLM client initialized (%s)", _planner_vlm)
except Exception as e:
logger.warning(f"Could not initialize VLM client: {e}")
self._vlm_client = None
def _analyze_screen_for_element(
self,
description: str,
screenshot=None
) -> Optional[Tuple[int, int]]:
"""
Utilise le VLM pour analyser l'écran et trouver un élément.
Plus intelligent que OWL car peut comprendre le contexte.
Args:
description: Description de l'élément à trouver
screenshot: Capture d'écran (capturée si non fournie)
Returns:
Tuple (x, y) des coordonnées ou None
"""
if not self._vlm_client:
return None
if screenshot is None:
screenshot = self._capture_screen()
if screenshot is None:
return None
# Prompt pour le VLM
prompt = f"""Analyse cette capture d'écran et trouve l'élément suivant: "{description}"
IMPORTANT:
- Ignore les publicités et les éléments sponsorisés
- Cherche un vrai résultat de recherche, pas une annonce
- Si c'est une vidéo, cherche une miniature de vidéo avec un titre pertinent
Réponds UNIQUEMENT avec les coordonnées X,Y du CENTRE de l'élément au format:
COORDINATES: X, Y
Si tu ne trouves pas l'élément, réponds:
NOT_FOUND"""
try:
result = self._vlm_client.generate(
prompt=prompt,
image=screenshot,
temperature=0.1,
max_tokens=100,
assistant_prefill="COORDINATES:",
)
if result.get('success'):
response = result.get('response', '')
logger.info(f"VLM response: {response}")
# Parser les coordonnées
coord_match = re.search(r'COORDINATES:\s*(\d+)\s*,\s*(\d+)', response)
if coord_match:
x = int(coord_match.group(1))
y = int(coord_match.group(2))
logger.info(f"VLM found element at ({x}, {y})")
return (x, y)
except Exception as e:
logger.warning(f"VLM analysis failed: {e}")
return None
def _capture_screen(self):
"""Capture l'écran actuel. Retourne PIL.Image ou None."""
if not PYAUTOGUI_AVAILABLE:
return None
try:
screenshot = pyautogui.screenshot()
return screenshot
except Exception as e:
logger.warning(f"Screen capture failed: {e}")
return None
def _find_element_by_description(
self,
description: str,
screenshot=None
) -> Optional[Tuple[int, int]]:
"""
Trouve un élément à l'écran par sa description textuelle.
Args:
description: Description de l'élément (ex: "search bar", "play button")
screenshot: Capture d'écran (capturée si non fournie)
Returns:
Tuple (x, y) des coordonnées du centre de l'élément, ou None si non trouvé
"""
if screenshot is None:
screenshot = self._capture_screen()
if screenshot is None:
return None
# Essayer avec OWL-v2 si disponible
if self._owl_detector:
try:
# Créer des variantes de la requête pour OWL
queries = self._generate_owl_queries(description)
logger.info(f"OWL searching for: {queries}")
detections = self._owl_detector.detect(screenshot, queries, confidence_threshold=0.05)
if detections:
# Prendre la détection avec le meilleur score
best = max(detections, key=lambda d: d['confidence'])
center = best.get('center')
if center:
logger.info(f"OWL found '{best['label']}' at {center} (conf: {best['confidence']:.2f})")
return (int(center[0]), int(center[1]))
except Exception as e:
logger.warning(f"OWL detection failed: {e}")
# Fallback 2: Utiliser le VLM pour analyse intelligente
# Particulièrement utile pour distinguer les pubs des vrais résultats
if self._vlm_client and ("vidéo" in description.lower() or "video" in description.lower()):
logger.info("Trying VLM analysis for intelligent element detection...")
vlm_result = self._analyze_screen_for_element(description, screenshot)
if vlm_result:
return vlm_result
# Fallback 3: positions heuristiques basées sur la description
return self._heuristic_position(description, screenshot.size if screenshot else (1920, 1080))
def _generate_owl_queries(self, description: str) -> List[str]:
"""Génère des requêtes OWL à partir d'une description."""
desc_lower = description.lower()
queries = []
# Mapping description -> termes OWL
if "recherche" in desc_lower or "search" in desc_lower:
queries.extend(["search bar", "search box", "text input", "search field"])
elif "vidéo" in desc_lower or "video" in desc_lower or "miniature" in desc_lower:
queries.extend(["video thumbnail", "video preview", "video card"])
elif "bouton" in desc_lower or "button" in desc_lower:
queries.extend(["button", "click button", "submit button"])
elif "play" in desc_lower or "lecture" in desc_lower:
queries.extend(["play button", "video player"])
else:
# Utiliser la description directement
queries.append(description)
return queries
def _heuristic_position(
self,
description: str,
screen_size: Tuple[int, int]
) -> Optional[Tuple[int, int]]:
"""Position heuristique basée sur la description (fallback)."""
width, height = screen_size
desc_lower = description.lower()
# YouTube specific heuristics (après scroll, les vidéos sont plus visibles)
if "recherche" in desc_lower or "search" in desc_lower:
# Barre de recherche YouTube: centre-haut
return (width // 2, 60)
elif ("vidéo" in desc_lower or "video" in desc_lower or "miniature" in desc_lower) and ("musique" in desc_lower or "music" in desc_lower or "première" in desc_lower or "visible" in desc_lower):
# Première vidéo YouTube visible (après scroll, au milieu de l'écran)
# Position adaptée pour différentes résolutions
x = width // 4 # Colonne gauche où sont les miniatures
y = height // 3 # Tiers supérieur après scroll
logger.info(f"Heuristic video position: ({x}, {y}) for screen {width}x{height}")
return (x, y)
elif "bouton" in desc_lower and "recherche" in desc_lower:
# Bouton de recherche YouTube (loupe)
return (width // 2 + 280, 60)
# Position par défaut: centre
return (width // 2, height // 2)
def _check_llm(self) -> bool:
"""Vérifie si le LLM est disponible."""
try:
response = requests.get(
"http://localhost:11434/api/tags",
timeout=5
)
if response.status_code == 200:
models = response.json().get('models', [])
model_names = [m['name'] for m in models]
# Vérifier si notre modèle est disponible
if any(self.llm_model in name for name in model_names):
return True
logger.warning(f"Model {self.llm_model} not found. Available: {model_names}")
return False
except Exception as e:
logger.warning(f"LLM check failed: {e}")
return False
def set_action_executor(self, executor: Callable):
"""Configure l'exécuteur d'actions."""
self._action_executor = executor
def set_screen_capturer(self, capturer: Callable):
"""Configure le capteur d'écran."""
self._screen_capturer = capturer
def set_progress_callback(self, callback: Callable):
"""Configure le callback de progression."""
self._progress_callback = callback
def _notify_progress(self, step: int, total: int, message: str, status: str = "running"):
"""Notifie la progression."""
if self._progress_callback:
self._progress_callback({
"step": step,
"total": total,
"message": message,
"status": status,
"percent": int((step / total) * 100) if total > 0 else 0
})
def plan(self, user_request: str, context: Optional[Dict] = None) -> ExecutionPlan:
"""
Génère un plan d'exécution à partir d'une requête utilisateur.
Args:
user_request: La demande en langage naturel
context: Contexte optionnel (écran actuel, apps ouvertes, etc.)
Returns:
ExecutionPlan avec les étapes à exécuter
"""
if not self.llm_available:
return self._fallback_plan(user_request)
# Construire le prompt
prompt = self._build_planning_prompt(user_request, context)
try:
# Appeler le LLM
response = requests.post(
self.llm_endpoint,
json={
"model": self.llm_model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3, # Déterministe
"num_predict": 1500
}
},
timeout=self.timeout
)
if response.status_code == 200:
result = response.json()
llm_response = result.get('response', '')
return self._parse_plan(user_request, llm_response)
else:
logger.error(f"LLM request failed: {response.status_code}")
return self._fallback_plan(user_request)
except Exception as e:
logger.error(f"Planning error: {e}")
return self._fallback_plan(user_request)
def _build_planning_prompt(self, user_request: str, context: Optional[Dict] = None) -> str:
"""Construit le prompt pour le LLM."""
context_info = ""
if context:
context_info = f"""
Contexte actuel:
- Application active: {context.get('active_app', 'inconnue')}
- Fenêtre: {context.get('window_title', 'inconnue')}
- Écran: {context.get('screen_info', 'standard')}
"""
prompt = f"""Tu es un assistant RPA intelligent qui planifie des actions sur ordinateur.
TÂCHE DEMANDÉE: {user_request}
{context_info}
ACTIONS DISPONIBLES:
- open_app: Ouvrir une application (params: app_name - ex: "firefox", "terminal", "code")
- open_url: Ouvrir une URL dans le navigateur (params: url)
- click: Cliquer sur un élément visuel (target: description précise de l'élément à cliquer)
- type_text: Taper du texte (target: description du champ, params: text)
- hotkey: Raccourci clavier (params: keys - ex: "ctrl+t", "ctrl+l", "Return", "Escape", "Tab")
- scroll: Défiler (params: direction - "up"/"down", amount - nombre de scrolls)
- wait: Attendre (params: seconds)
PRINCIPES DE PLANIFICATION:
1. Décompose la tâche en étapes simples, une action par étape
2. Ajoute wait (2-3s) après open_url/open_app pour laisser le temps au chargement
3. Pour taper dans un champ web, utilise d'abord click sur le champ OU hotkey pour focus (ex: ctrl+l pour barre d'adresse)
4. Après avoir tapé dans un champ de recherche, ajoute hotkey "Return" pour valider
5. Pour click, décris précisément l'élément visuel (ex: "bouton bleu Envoyer", "première image de la liste")
6. Si la page peut avoir des publicités, ajoute un scroll pour les passer avant de cliquer
RACCOURCIS UTILES:
- ctrl+l : Focus sur la barre d'adresse du navigateur
- ctrl+t : Nouvel onglet
- ctrl+f : Rechercher dans la page
- Return : Valider/Entrée
- Escape : Annuler/Fermer
RÉPONDS EN JSON UNIQUEMENT:
{{
"steps": [
{{
"step": 1,
"action": "open_url|click|type_text|hotkey|wait|scroll",
"target": "description visuelle de la cible",
"params": {{"url": "...", "text": "...", "keys": "...", "seconds": 2}},
"description": "Ce que fait cette étape",
"expected_result": "Résultat attendu"
}}
],
"risk_level": "low|medium|high",
"estimated_seconds": 30
}}
JSON:"""
return prompt
def _parse_plan(self, user_request: str, llm_response: str) -> ExecutionPlan:
"""Parse la réponse du LLM en ExecutionPlan."""
try:
# Extraire le JSON de la réponse
json_match = re.search(r'\{[\s\S]*\}', llm_response)
if not json_match:
logger.warning("No JSON found in LLM response")
return self._fallback_plan(user_request)
data = json.loads(json_match.group())
steps = []
for step_data in data.get('steps', []):
action_type_str = step_data.get('action', 'click')
# Mapper vers ActionType
action_type_map = {
'open_app': ActionType.OPEN_APP,
'open_url': ActionType.OPEN_URL,
'click': ActionType.CLICK,
'type_text': ActionType.TYPE_TEXT,
'hotkey': ActionType.HOTKEY,
'scroll': ActionType.SCROLL,
'wait': ActionType.WAIT,
'screenshot': ActionType.SCREENSHOT,
'find_element': ActionType.FIND_ELEMENT
}
action_type = action_type_map.get(action_type_str, ActionType.CLICK)
steps.append(PlannedAction(
step_number=step_data.get('step', len(steps) + 1),
action_type=action_type,
description=step_data.get('description', ''),
target=step_data.get('target'),
parameters=step_data.get('params', {}),
expected_result=step_data.get('expected_result')
))
return ExecutionPlan(
task_description=user_request,
steps=steps,
estimated_duration_seconds=data.get('estimated_seconds', 30),
risk_level=data.get('risk_level', 'low'),
requires_confirmation=data.get('risk_level', 'low') != 'low'
)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
return self._fallback_plan(user_request)
def _fallback_plan(self, user_request: str) -> ExecutionPlan:
"""Plan de fallback si le LLM n'est pas disponible."""
# Détection simple de patterns
request_lower = user_request.lower()
steps = []
# Pattern: ouvrir navigateur/browser
if any(word in request_lower for word in ['navigateur', 'browser', 'chrome', 'firefox']):
steps.append(PlannedAction(
step_number=1,
action_type=ActionType.OPEN_APP,
description="Ouvrir le navigateur",
parameters={"app_name": "firefox"}
))
# Pattern: aller sur / ouvrir site
url_patterns = [
(r'youtube', 'https://www.youtube.com'),
(r'google', 'https://www.google.com'),
(r'github', 'https://www.github.com'),
]
for pattern, url in url_patterns:
if pattern in request_lower:
steps.append(PlannedAction(
step_number=len(steps) + 1,
action_type=ActionType.OPEN_URL,
description=f"Ouvrir {pattern}",
parameters={"url": url}
))
break
# Pattern: chercher / rechercher
if any(word in request_lower for word in ['chercher', 'rechercher', 'search']):
# Extraire le terme de recherche
search_term = request_lower
for remove in ['cherche', 'recherche', 'search', 'sur youtube', 'sur google', 'une vidéo', 'video']:
search_term = search_term.replace(remove, '')
search_term = search_term.strip()
if search_term:
steps.append(PlannedAction(
step_number=len(steps) + 1,
action_type=ActionType.CLICK,
description="Cliquer sur la barre de recherche",
target="barre de recherche"
))
steps.append(PlannedAction(
step_number=len(steps) + 1,
action_type=ActionType.TYPE_TEXT,
description=f"Taper '{search_term}'",
parameters={"text": search_term}
))
steps.append(PlannedAction(
step_number=len(steps) + 1,
action_type=ActionType.HOTKEY,
description="Appuyer sur Entrée",
parameters={"keys": "Return"}
))
# Pattern: lancer / jouer vidéo
if any(word in request_lower for word in ['lancer', 'jouer', 'play', 'regarder']):
steps.append(PlannedAction(
step_number=len(steps) + 1,
action_type=ActionType.WAIT,
description="Attendre le chargement",
parameters={"seconds": 2}
))
steps.append(PlannedAction(
step_number=len(steps) + 1,
action_type=ActionType.CLICK,
description="Cliquer sur la première vidéo",
target="première miniature de vidéo"
))
if not steps:
# Plan générique
steps.append(PlannedAction(
step_number=1,
action_type=ActionType.SCREENSHOT,
description="Capturer l'écran actuel",
parameters={}
))
return ExecutionPlan(
task_description=user_request,
steps=steps,
estimated_duration_seconds=len(steps) * 5,
risk_level="low"
)
async def execute_plan(self, plan: ExecutionPlan) -> List[ActionResult]:
"""
Exécute un plan d'action.
Args:
plan: Le plan à exécuter
Returns:
Liste des résultats d'actions
"""
results = []
total_steps = len(plan.steps)
self._notify_progress(0, total_steps, "Démarrage de l'exécution...", "starting")
for i, action in enumerate(plan.steps):
step_num = i + 1
self._notify_progress(
step_num,
total_steps,
f"Étape {step_num}/{total_steps}: {action.description}",
"running"
)
try:
result = await self._execute_action(action)
results.append(result)
if not result.success:
logger.warning(f"Step {step_num} failed: {result.message}")
# Continuer malgré l'échec (best effort)
except Exception as e:
logger.error(f"Step {step_num} error: {e}")
results.append(ActionResult(
success=False,
action=action,
message=f"Erreur: {str(e)}",
error=str(e)
))
# Petit délai entre les actions
time.sleep(0.3)
success_count = sum(1 for r in results if r.success)
self._notify_progress(
total_steps,
total_steps,
f"Terminé: {success_count}/{total_steps} étapes réussies",
"completed" if success_count == total_steps else "partial"
)
return results
async def _execute_action(self, action: PlannedAction) -> ActionResult:
"""Exécute une action individuelle."""
start_time = time.time()
try:
if action.action_type == ActionType.OPEN_APP:
return await self._exec_open_app(action)
elif action.action_type == ActionType.OPEN_URL:
return await self._exec_open_url(action)
elif action.action_type == ActionType.CLICK:
return await self._exec_click(action)
elif action.action_type == ActionType.TYPE_TEXT:
return await self._exec_type_text(action)
elif action.action_type == ActionType.HOTKEY:
return await self._exec_hotkey(action)
elif action.action_type == ActionType.SCROLL:
return await self._exec_scroll(action)
elif action.action_type == ActionType.WAIT:
return await self._exec_wait(action)
elif action.action_type == ActionType.SCREENSHOT:
return await self._exec_screenshot(action)
else:
return ActionResult(
success=False,
action=action,
message=f"Type d'action non supporté: {action.action_type}",
duration_ms=int((time.time() - start_time) * 1000)
)
except Exception as e:
return ActionResult(
success=False,
action=action,
message=f"Erreur: {str(e)}",
error=str(e),
duration_ms=int((time.time() - start_time) * 1000)
)
async def _exec_open_app(self, action: PlannedAction) -> ActionResult:
"""Ouvre une application."""
import subprocess
app_name = action.parameters.get('app_name', 'firefox')
# Mapping des noms courants
app_commands = {
'firefox': 'firefox',
'chrome': 'google-chrome',
'navigateur': 'firefox',
'browser': 'firefox',
'terminal': 'gnome-terminal',
'files': 'nautilus',
'code': 'code',
'vscode': 'code'
}
cmd = app_commands.get(app_name.lower(), app_name)
try:
subprocess.Popen([cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(1) # Attendre que l'app démarre
return ActionResult(
success=True,
action=action,
message=f"Application '{app_name}' ouverte"
)
except FileNotFoundError:
return ActionResult(
success=False,
action=action,
message=f"Application '{app_name}' non trouvée"
)
async def _exec_open_url(self, action: PlannedAction) -> ActionResult:
"""Ouvre une URL dans le navigateur."""
import subprocess
url = action.parameters.get('url', 'https://www.google.com')
try:
subprocess.Popen(['xdg-open', url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2) # Attendre le chargement
return ActionResult(
success=True,
action=action,
message=f"URL '{url}' ouverte"
)
except Exception as e:
return ActionResult(
success=False,
action=action,
message=f"Erreur ouverture URL: {e}"
)
async def _exec_click(self, action: PlannedAction) -> ActionResult:
"""Exécute un clic sur un élément avec détection visuelle."""
target = action.target or action.parameters.get('target', '')
# Si on a un ActionExecutor configuré, l'utiliser en priorité
if self._action_executor:
try:
screenshot = None
if self._screen_capturer:
screenshot = self._screen_capturer()
result = self._action_executor(
action_type="click",
target=target,
screenshot=screenshot
)
return ActionResult(
success=result.get('success', False),
action=action,
message=result.get('message', 'Clic effectué')
)
except Exception as e:
logger.warning(f"ActionExecutor failed: {e}, using visual detection")
# Utiliser la détection visuelle
try:
if not PYAUTOGUI_AVAILABLE:
return ActionResult(
success=False,
action=action,
message="pyautogui non disponible"
)
# Capturer l'écran
screenshot = self._capture_screen()
# Trouver l'élément par sa description
position = self._find_element_by_description(target, screenshot)
if position:
x, y = position
logger.info(f"Clicking at ({x}, {y}) for target: '{target}'")
pyautogui.click(x, y)
return ActionResult(
success=True,
action=action,
message=f"Clic à ({x}, {y}) pour '{target}'"
)
else:
return ActionResult(
success=False,
action=action,
message=f"Élément non trouvé: '{target}'"
)
except Exception as e:
logger.error(f"Click error: {e}")
return ActionResult(
success=False,
action=action,
message=f"Erreur clic: {e}"
)
async def _exec_type_text(self, action: PlannedAction) -> ActionResult:
"""Tape du texte dans un champ (clique d'abord sur le champ si spécifié)."""
try:
if not PYAUTOGUI_AVAILABLE:
return ActionResult(
success=False,
action=action,
message="pyautogui non disponible"
)
text = action.parameters.get('text', '')
target = action.target or action.parameters.get('target', '')
# Si une cible est spécifiée, cliquer dessus d'abord
if target:
screenshot = self._capture_screen()
position = self._find_element_by_description(target, screenshot)
if position:
x, y = position
logger.info(f"Clicking on input field at ({x}, {y}) before typing")
pyautogui.click(x, y)
time.sleep(0.5) # Attendre que le focus soit acquis
else:
# Champ non trouvé - on continue car le focus est peut-être déjà au bon endroit
# (ex: après un hotkey ctrl+l ou ctrl+k)
logger.info(f"Target '{target}' not found visually, assuming focus is already correct")
# Taper le texte
time.sleep(0.2)
# Utiliser write() pour le texte Unicode (français, etc.)
if text.isascii():
pyautogui.typewrite(text, interval=0.03)
else:
# Pour les caractères non-ASCII, utiliser le presse-papier
import subprocess
# Copier dans le presse-papier
process = subprocess.Popen(['xclip', '-selection', 'clipboard'], stdin=subprocess.PIPE)
process.communicate(text.encode('utf-8'))
# Coller avec Ctrl+V
pyautogui.hotkey('ctrl', 'v')
return ActionResult(
success=True,
action=action,
message=f"Texte tapé: '{text[:30]}...'" if len(text) > 30 else f"Texte tapé: '{text}'"
)
except Exception as e:
logger.error(f"Type text error: {e}")
return ActionResult(
success=False,
action=action,
message=f"Erreur frappe: {e}"
)
async def _exec_hotkey(self, action: PlannedAction) -> ActionResult:
"""Exécute un raccourci clavier."""
try:
import pyautogui
keys = action.parameters.get('keys', 'Return')
# Parser les combinaisons de touches
if '+' in keys:
key_list = [k.strip().lower() for k in keys.split('+')]
pyautogui.hotkey(*key_list)
else:
pyautogui.press(keys.lower())
return ActionResult(
success=True,
action=action,
message=f"Touche(s) '{keys}' pressée(s)"
)
except Exception as e:
return ActionResult(
success=False,
action=action,
message=f"Erreur hotkey: {e}"
)
async def _exec_scroll(self, action: PlannedAction) -> ActionResult:
"""Effectue un scroll."""
try:
import pyautogui
direction = action.parameters.get('direction', 'down')
amount = action.parameters.get('amount', 3)
scroll_amount = amount if direction == 'up' else -amount
pyautogui.scroll(scroll_amount)
return ActionResult(
success=True,
action=action,
message=f"Scroll {direction} de {amount}"
)
except Exception as e:
return ActionResult(
success=False,
action=action,
message=f"Erreur scroll: {e}"
)
async def _exec_wait(self, action: PlannedAction) -> ActionResult:
"""Attend un certain temps."""
seconds = action.parameters.get('seconds', 1)
time.sleep(seconds)
return ActionResult(
success=True,
action=action,
message=f"Attente de {seconds}s terminée"
)
async def _exec_screenshot(self, action: PlannedAction) -> ActionResult:
"""Capture l'écran."""
try:
if self._screen_capturer:
screenshot = self._screen_capturer()
# Sauvegarder si nécessaire
path = f"/tmp/screenshot_{datetime.now().strftime('%H%M%S')}.png"
return ActionResult(
success=True,
action=action,
message="Screenshot capturé",
screenshot_path=path
)
else:
import pyautogui
screenshot = pyautogui.screenshot()
path = f"/tmp/screenshot_{datetime.now().strftime('%H%M%S')}.png"
screenshot.save(path)
return ActionResult(
success=True,
action=action,
message="Screenshot capturé",
screenshot_path=path
)
except Exception as e:
return ActionResult(
success=False,
action=action,
message=f"Erreur screenshot: {e}"
)
# Singleton
_planner_instance: Optional[AutonomousPlanner] = None
def get_autonomous_planner(
llm_model: str = "qwen2.5:7b"
) -> AutonomousPlanner:
"""Retourne l'instance singleton du planner."""
global _planner_instance
if _planner_instance is None:
_planner_instance = AutonomousPlanner(llm_model=llm_model)
return _planner_instance