""" Système de rejeu intelligent de tâches apprises avec reconnaissance visuelle. Permet de rejouer des tâches en s'adaptant aux variations d'interface. """ import asyncio import time from typing import Dict, List, Optional, Any, Tuple from pathlib import Path import numpy as np from PIL import Image from .models import TaskProfile, Action from .learning_manager import LearningManager from .embeddings_manager import EmbeddingsManager from .utils.vision_utils import VisionUtils from .utils.input_utils import InputUtils from .logger import Logger class TaskReplayEngine: """ Moteur de rejeu intelligent qui utilise la vision pour localiser les éléments et rejouer les tâches apprises. """ def __init__( self, learning_manager: LearningManager, embeddings_manager: EmbeddingsManager, vision_utils: VisionUtils, input_utils: InputUtils, logger: Logger, config: Dict[str, Any] ): """ Initialise le moteur de rejeu. Args: learning_manager: Gestionnaire d'apprentissage embeddings_manager: Gestionnaire d'embeddings vision_utils: Utilitaires de vision input_utils: Utilitaires d'entrée logger: Logger config: Configuration """ self.learning_manager = learning_manager self.embeddings_manager = embeddings_manager self.vision_utils = vision_utils self.input_utils = input_utils self.logger = logger self.config = config # Configuration self.similarity_threshold = config.get("replay", {}).get( "similarity_threshold", 0.75 ) self.max_search_attempts = config.get("replay", {}).get( "max_search_attempts", 3 ) self.delay_between_actions = config.get("replay", {}).get( "delay_between_actions", 0.5 ) self.logger.log_action({ "action": "task_replay_engine_initialized", "similarity_threshold": self.similarity_threshold }) async def replay_task( self, task_id: str, interactive: bool = False ) -> Dict[str, Any]: """ Rejoue une tâche apprise. Args: task_id: ID de la tâche à rejouer interactive: Si True, demande confirmation avant chaque action Returns: Résultats du rejeu """ # Charger la tâche task = self.learning_manager.load_task(task_id) if not task: return { "success": False, "error": "task_not_found", "task_id": task_id } self.logger.log_action({ "action": "task_replay_started", "task_id": task_id, "interactive": interactive }) # Récupérer les signatures signatures = task.metadata.get("signatures", []) if not signatures: return { "success": False, "error": "no_signatures", "task_id": task_id } results = { "task_id": task_id, "total_actions": len(signatures), "executed_actions": 0, "failed_actions": 0, "actions": [] } # Rejouer chaque action for i, signature in enumerate(signatures): self.logger.log_action({ "action": "replaying_step", "step": i + 1, "total": len(signatures) }) # Localiser l'élément visuellement location = await self._find_element_visually(signature) if not location: self.logger.log_action({ "action": "element_not_found", "step": i + 1, "signature": signature.get("description", "Unknown") }) results["failed_actions"] += 1 results["actions"].append({ "step": i + 1, "success": False, "error": "element_not_found" }) continue # Exécuter l'action success = await self._execute_action_at_location( signature, location, interactive ) results["actions"].append({ "step": i + 1, "success": success, "location": location, "action_type": signature.get("action_type") }) if success: results["executed_actions"] += 1 else: results["failed_actions"] += 1 # Attendre entre les actions if i < len(signatures) - 1: await asyncio.sleep(self.delay_between_actions) results["success"] = results["failed_actions"] == 0 self.logger.log_action({ "action": "task_replay_completed", "task_id": task_id, "success": results["success"], "executed": results["executed_actions"], "failed": results["failed_actions"] }) return results async def _find_element_visually( self, signature: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Localise un élément visuellement en utilisant son embedding. Args: signature: Signature visuelle de l'élément Returns: Localisation (x, y, confidence) ou None """ embedding = signature.get("embedding") if embedding is None: return None # Capturer l'écran actuel import pyautogui screenshot = pyautogui.screenshot() screenshot_np = np.array(screenshot) # Rechercher l'élément avec vision for attempt in range(self.max_search_attempts): # Générer l'embedding de l'écran actuel current_embedding = self.vision_utils.generate_clip_embedding( screenshot_np ) # Rechercher les zones similaires similar_regions = await self._search_similar_regions( screenshot_np, embedding ) if similar_regions: best_match = similar_regions[0] if best_match["similarity"] >= self.similarity_threshold: self.logger.log_action({ "action": "element_found", "similarity": best_match["similarity"], "attempt": attempt + 1 }) return { "x": best_match["x"], "y": best_match["y"], "confidence": best_match["similarity"], "bbox": best_match.get("bbox") } # Attendre un peu avant de réessayer if attempt < self.max_search_attempts - 1: await asyncio.sleep(0.5) return None async def _search_similar_regions( self, screenshot: np.ndarray, target_embedding: np.ndarray, grid_size: int = 4 ) -> List[Dict[str, Any]]: """ Recherche les régions similaires dans une capture d'écran. Args: screenshot: Capture d'écran target_embedding: Embedding cible grid_size: Taille de la grille de recherche Returns: Liste de régions similaires triées par similarité """ height, width = screenshot.shape[:2] cell_height = height // grid_size cell_width = width // grid_size regions = [] # Parcourir la grille for row in range(grid_size): for col in range(grid_size): y1 = row * cell_height x1 = col * cell_width y2 = min((row + 1) * cell_height, height) x2 = min((col + 1) * cell_width, width) # Extraire la région region = screenshot[y1:y2, x1:x2] # Générer l'embedding region_embedding = self.vision_utils.generate_clip_embedding(region) # Calculer la similarité similarity = self._cosine_similarity( target_embedding, region_embedding ) # Centre de la région center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 regions.append({ "x": center_x, "y": center_y, "bbox": (x1, y1, x2, y2), "similarity": similarity }) # Trier par similarité décroissante regions.sort(key=lambda r: r["similarity"], reverse=True) return regions def _cosine_similarity( self, emb1: np.ndarray, emb2: np.ndarray ) -> float: """Calcule la similarité cosinus entre deux embeddings.""" dot_product = np.dot(emb1, emb2) norm1 = np.linalg.norm(emb1) norm2 = np.linalg.norm(emb2) if norm1 == 0 or norm2 == 0: return 0.0 return float(dot_product / (norm1 * norm2)) async def _execute_action_at_location( self, signature: Dict[str, Any], location: Dict[str, Any], interactive: bool ) -> bool: """ Exécute une action à une localisation donnée. Args: signature: Signature de l'action location: Localisation de l'élément interactive: Si True, demande confirmation Returns: True si succès """ action_type = signature.get("action_type", "click") x = location["x"] y = location["y"] # Demander confirmation si mode interactif if interactive: confirmed = await self._ask_confirmation(signature, location) if not confirmed: return False try: if action_type == "click": self.input_utils.click(x, y) return True elif action_type == "type": text = signature.get("text", "") self.input_utils.type_text(text) return True elif action_type == "scroll": direction = signature.get("direction", "down") amount = signature.get("amount", 3) self.input_utils.scroll(direction, amount, x, y) return True elif action_type == "drag": # Pour le drag, on a besoin de la destination end_x = signature.get("end_x", x + 100) end_y = signature.get("end_y", y) self.input_utils.drag(x, y, end_x, end_y) return True else: self.logger.log_action({ "action": "unknown_action_type", "type": action_type }) return False except Exception as e: self.logger.log_action({ "action": "execute_action_error", "error": str(e), "action_type": action_type }) return False async def _ask_confirmation( self, signature: Dict[str, Any], location: Dict[str, Any] ) -> bool: """ Demande confirmation à l'utilisateur (mode interactif). Args: signature: Signature de l'action location: Localisation Returns: True si confirmé """ # TODO: Implémenter une vraie interface de confirmation # Pour l'instant, on accepte automatiquement return True def list_available_tasks(self) -> List[Dict[str, Any]]: """ Liste toutes les tâches disponibles pour le rejeu. Returns: Liste des tâches avec leurs métadonnées """ tasks = [] profiles_path = Path(self.learning_manager.profiles_path) for task_dir in profiles_path.iterdir(): if not task_dir.is_dir(): continue metadata_file = task_dir / "metadata.json" if not metadata_file.exists(): continue try: import json with open(metadata_file, "r") as f: metadata = json.load(f) tasks.append({ "task_id": metadata.get("task_id"), "task_name": metadata.get("task_name", metadata.get("description")), "observation_count": metadata.get("observation_count", metadata.get("observations")), "confidence": metadata.get("confidence_score", 0.0) }) except Exception as e: self.logger.log_action({ "action": "task_list_error", "task_dir": str(task_dir), "error": str(e) }) return tasks async def replay_task_with_monitoring( self, task_id: str, on_step_completed: Optional[callable] = None ) -> Dict[str, Any]: """ Rejoue une tâche avec monitoring en temps réel. Args: task_id: ID de la tâche on_step_completed: Callback appelé après chaque étape Returns: Résultats du rejeu """ task = self.learning_manager.load_task(task_id) if not task: return { "success": False, "error": "task_not_found" } signatures = task.metadata.get("signatures", []) results = { "task_id": task_id, "steps": [] } for i, signature in enumerate(signatures): step_result = { "step": i + 1, "description": signature.get("description", "Unknown"), "status": "pending" } # Localiser et exécuter location = await self._find_element_visually(signature) if location: success = await self._execute_action_at_location( signature, location, False ) step_result["status"] = "success" if success else "failed" step_result["location"] = location else: step_result["status"] = "not_found" results["steps"].append(step_result) # Callback if on_step_completed: on_step_completed(step_result) await asyncio.sleep(self.delay_between_actions) results["success"] = all( s["status"] == "success" for s in results["steps"] ) return results