Files
Geniusia_v2/geniusia2/core/replay_async.py
2026-03-05 00:20:25 +01:00

461 lines
15 KiB
Python

"""
Moteur de rejeu d'actions pour rollback asynchrone.
Permet de rejouer des séquences d'actions et d'annuler les dernières actions.
"""
import asyncio
import time
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
from .utils.input_utils import InputUtils, ActionType
from .logger import Logger
class ReplayStatus(Enum):
"""Statuts de rejeu."""
IDLE = "idle"
REPLAYING = "replaying"
ROLLING_BACK = "rolling_back"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
class ReplayEngine:
"""
Moteur de rejeu d'actions pour exécution asynchrone et rollback.
"""
def __init__(
self,
input_utils: InputUtils,
logger: Logger,
config: Dict[str, Any]
):
"""
Initialise le moteur de rejeu.
Args:
input_utils: Utilitaires d'entrée pour exécuter actions
logger: Logger pour journalisation
config: Configuration globale
"""
self.input_utils = input_utils
self.logger = logger
self.config = config
# État du moteur
self.status = ReplayStatus.IDLE
self.current_sequence: List[Dict[str, Any]] = []
self.current_index = 0
# Callbacks pour notifications
self.on_action_executed: Optional[Callable] = None
self.on_sequence_completed: Optional[Callable] = None
self.on_rollback_completed: Optional[Callable] = None
self.on_error: Optional[Callable] = None
# Configuration
self.delay_between_actions = config.get("replay", {}).get(
"delay_between_actions", 0.5
)
self.max_rollback_attempts = config.get("replay", {}).get(
"max_rollback_attempts", 3
)
self.logger.log_action({
"action": "replay_engine_initialized",
"delay_between_actions": self.delay_between_actions,
"max_rollback_attempts": self.max_rollback_attempts
})
async def replay_sequence(
self,
action_sequence: List[Dict[str, Any]],
start_index: int = 0
) -> bool:
"""
Rejoue une séquence d'actions de manière asynchrone.
Args:
action_sequence: Liste d'actions à rejouer
start_index: Index de départ dans la séquence
Returns:
True si succès complet, False sinon
"""
if self.status != ReplayStatus.IDLE:
self.logger.log_action({
"action": "replay_rejected",
"reason": "engine_busy",
"current_status": self.status.value
})
return False
self.status = ReplayStatus.REPLAYING
self.current_sequence = action_sequence
self.current_index = start_index
self.logger.log_action({
"action": "replay_sequence_started",
"num_actions": len(action_sequence),
"start_index": start_index
})
try:
for i in range(start_index, len(action_sequence)):
if self.status == ReplayStatus.PAUSED:
# Attendre la reprise
while self.status == ReplayStatus.PAUSED:
await asyncio.sleep(0.1)
if self.status != ReplayStatus.REPLAYING:
# Arrêt demandé
break
action = action_sequence[i]
self.current_index = i
# Exécuter l'action
success = await self._execute_action(action)
if not success:
self.logger.log_action({
"action": "replay_action_failed",
"index": i,
"action_type": action.get("type")
})
if self.on_error:
self.on_error(i, action)
self.status = ReplayStatus.FAILED
return False
# Notifier l'exécution
if self.on_action_executed:
self.on_action_executed(i, action)
# Attendre entre les actions
if i < len(action_sequence) - 1:
await asyncio.sleep(self.delay_between_actions)
# Séquence terminée avec succès
self.status = ReplayStatus.COMPLETED
self.logger.log_action({
"action": "replay_sequence_completed",
"num_actions_executed": len(action_sequence) - start_index
})
if self.on_sequence_completed:
self.on_sequence_completed()
return True
except Exception as e:
self.logger.log_action({
"action": "replay_sequence_error",
"error": str(e),
"index": self.current_index
})
self.status = ReplayStatus.FAILED
if self.on_error:
self.on_error(self.current_index, None)
return False
finally:
if self.status in [ReplayStatus.COMPLETED, ReplayStatus.FAILED]:
self.status = ReplayStatus.IDLE
async def rollback_last_n(self, n: int) -> bool:
"""
Annule les n dernières actions en exécutant leurs inverses.
Args:
n: Nombre d'actions à annuler
Returns:
True si rollback réussi, False sinon
"""
if self.status != ReplayStatus.IDLE:
self.logger.log_action({
"action": "rollback_rejected",
"reason": "engine_busy",
"current_status": self.status.value
})
return False
# Récupérer les dernières actions de l'historique
action_history = self.input_utils.get_action_history(limit=n)
if len(action_history) < n:
self.logger.log_action({
"action": "rollback_partial",
"requested": n,
"available": len(action_history)
})
n = len(action_history)
if n == 0:
return True
self.status = ReplayStatus.ROLLING_BACK
self.logger.log_action({
"action": "rollback_started",
"num_actions": n
})
try:
# Inverser les actions (de la plus récente à la plus ancienne)
actions_to_rollback = list(reversed(action_history[-n:]))
success_count = 0
for i, action in enumerate(actions_to_rollback):
# Générer l'action inverse
inverse_action = self.input_utils.get_inverse_action(action)
if inverse_action is None:
self.logger.log_action({
"action": "rollback_action_not_invertible",
"index": i,
"action_type": action.get("type")
})
continue
# Exécuter l'action inverse avec retry
success = await self._execute_action_with_retry(
inverse_action,
max_attempts=self.max_rollback_attempts
)
if success:
success_count += 1
else:
self.logger.log_action({
"action": "rollback_action_failed",
"index": i,
"action_type": action.get("type"),
"inverse_action": inverse_action
})
# Attendre entre les actions
if i < len(actions_to_rollback) - 1:
await asyncio.sleep(self.delay_between_actions)
# Vérifier le succès
all_success = success_count == len(actions_to_rollback)
self.logger.log_action({
"action": "rollback_completed",
"total_actions": len(actions_to_rollback),
"successful": success_count,
"failed": len(actions_to_rollback) - success_count,
"all_success": all_success
})
if self.on_rollback_completed:
self.on_rollback_completed(all_success, success_count, len(actions_to_rollback))
return all_success
except Exception as e:
self.logger.log_action({
"action": "rollback_error",
"error": str(e)
})
if self.on_error:
self.on_error(-1, None)
return False
finally:
self.status = ReplayStatus.IDLE
async def execute_inverse_actions(
self,
actions: List[Dict[str, Any]]
) -> bool:
"""
Exécute les actions inverses d'une liste d'actions.
Args:
actions: Liste d'actions à inverser et exécuter
Returns:
True si toutes les actions inverses ont été exécutées
"""
if self.status != ReplayStatus.IDLE:
return False
self.status = ReplayStatus.ROLLING_BACK
try:
# Inverser l'ordre et générer les actions inverses
inverse_actions = []
for action in reversed(actions):
inverse = self.input_utils.get_inverse_action(action)
if inverse:
inverse_actions.append(inverse)
# Exécuter les actions inverses
success = await self.replay_sequence(inverse_actions)
return success
finally:
self.status = ReplayStatus.IDLE
async def _execute_action(self, action: Dict[str, Any]) -> bool:
"""
Exécute une action unique.
Args:
action: Action à exécuter
Returns:
True si succès, False sinon
"""
action_type = action.get("type")
try:
if action_type == ActionType.CLICK.value:
return self.input_utils.click(
action["x"],
action["y"],
button=action.get("button", "left"),
clicks=action.get("clicks", 1)
)
elif action_type == ActionType.TYPE.value:
return self.input_utils.type_text(
action["text"],
interval=action.get("interval", 0.0)
)
elif action_type == ActionType.SCROLL.value:
return self.input_utils.scroll(
action["direction"],
amount=action.get("amount", 3),
x=action.get("x"),
y=action.get("y")
)
elif action_type == ActionType.WAIT.value:
return self.input_utils.wait(action["duration"])
elif action_type == ActionType.MOVE.value:
return self.input_utils.move(
action["x"],
action["y"],
duration=action.get("duration", 0.2)
)
elif action_type == ActionType.DRAG.value:
return self.input_utils.drag(
action["start_x"],
action["start_y"],
action["end_x"],
action["end_y"],
duration=action.get("duration", 0.5),
button=action.get("button", "left")
)
elif action_type == "press_key":
# Action spéciale pour rollback de saisie
import pyautogui
key = action.get("key")
presses = action.get("presses", 1)
for _ in range(presses):
pyautogui.press(key)
await asyncio.sleep(0.05)
return True
else:
self.logger.log_action({
"action": "unknown_action_type",
"type": action_type
})
return False
except Exception as e:
self.logger.log_action({
"action": "execute_action_error",
"action_type": action_type,
"error": str(e)
})
return False
async def _execute_action_with_retry(
self,
action: Dict[str, Any],
max_attempts: int = 3
) -> bool:
"""
Exécute une action avec retry en cas d'échec.
Args:
action: Action à exécuter
max_attempts: Nombre maximum de tentatives
Returns:
True si succès, False sinon
"""
for attempt in range(max_attempts):
success = await self._execute_action(action)
if success:
return True
if attempt < max_attempts - 1:
# Attendre avant de réessayer
await asyncio.sleep(0.5)
self.logger.log_action({
"action": "action_retry",
"attempt": attempt + 1,
"max_attempts": max_attempts
})
return False
def pause(self):
"""Met en pause le rejeu en cours."""
if self.status == ReplayStatus.REPLAYING:
self.status = ReplayStatus.PAUSED
self.logger.log_action({"action": "replay_paused"})
def resume(self):
"""Reprend le rejeu en pause."""
if self.status == ReplayStatus.PAUSED:
self.status = ReplayStatus.REPLAYING
self.logger.log_action({"action": "replay_resumed"})
def stop(self):
"""Arrête le rejeu en cours."""
if self.status in [ReplayStatus.REPLAYING, ReplayStatus.PAUSED]:
self.status = ReplayStatus.IDLE
self.logger.log_action({"action": "replay_stopped"})
def get_status(self) -> Dict[str, Any]:
"""
Retourne l'état actuel du moteur.
Returns:
Dictionnaire avec l'état
"""
return {
"status": self.status.value,
"current_index": self.current_index,
"total_actions": len(self.current_sequence),
"progress": (
self.current_index / len(self.current_sequence)
if len(self.current_sequence) > 0 else 0
)
}