489 lines
16 KiB
Python
489 lines
16 KiB
Python
"""
|
|
Capture des événements utilisateur (clavier et souris) pour l'apprentissage.
|
|
Utilise pynput pour capturer les actions en temps réel.
|
|
"""
|
|
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Callable, Any
|
|
from threading import Thread, Lock
|
|
from collections import deque
|
|
|
|
try:
|
|
from pynput import mouse, keyboard
|
|
PYNPUT_AVAILABLE = True
|
|
except ImportError:
|
|
PYNPUT_AVAILABLE = False
|
|
print("⚠️ pynput n'est pas installé. Capture d'événements désactivée.")
|
|
|
|
from .logger import Logger
|
|
from .utils.image_utils import get_active_window
|
|
from .session_manager import SessionManager
|
|
from .workflow_detector import WorkflowDetector
|
|
|
|
|
|
class EventCapture:
|
|
"""
|
|
Capture les événements clavier et souris pour détecter les patterns répétitifs.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
logger: Optional[Logger] = None,
|
|
max_history: int = 1000,
|
|
pattern_threshold: int = 3,
|
|
config: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""
|
|
Initialise le capteur d'événements.
|
|
|
|
Args:
|
|
logger: Logger pour journalisation
|
|
max_history: Nombre max d'événements à garder en mémoire
|
|
pattern_threshold: Nombre de répétitions pour détecter un pattern
|
|
config: Configuration pour les workflows
|
|
"""
|
|
self.logger = logger
|
|
self.max_history = max_history
|
|
self.pattern_threshold = pattern_threshold
|
|
self.config = config or {}
|
|
|
|
# Historique des événements
|
|
self.events: deque = deque(maxlen=max_history)
|
|
self.events_lock = Lock()
|
|
|
|
# État de capture
|
|
self.capturing = False
|
|
self.mouse_listener = None
|
|
self.keyboard_listener = None
|
|
|
|
# Callbacks pour patterns détectés
|
|
self.pattern_callbacks: List[Callable] = []
|
|
|
|
# Dernière fenêtre active
|
|
self.last_window = None
|
|
|
|
# Composants pour la détection de workflows
|
|
self.session_manager = SessionManager(logger, self.config)
|
|
self.workflow_detector = WorkflowDetector(logger, self.config)
|
|
|
|
# Connecter les callbacks
|
|
self.session_manager.on_session_completed = self._on_session_completed
|
|
self.workflow_detector.on_workflow_detected = self._on_workflow_detected
|
|
|
|
if not PYNPUT_AVAILABLE:
|
|
if logger:
|
|
logger.log_action({
|
|
"action": "event_capture_unavailable",
|
|
"reason": "pynput not installed"
|
|
})
|
|
|
|
def start(self):
|
|
"""Démarre la capture d'événements."""
|
|
if not PYNPUT_AVAILABLE:
|
|
print("⚠️ Impossible de démarrer la capture : pynput non disponible")
|
|
return False
|
|
|
|
if self.capturing:
|
|
return True
|
|
|
|
self.capturing = True
|
|
|
|
# Démarrer les listeners
|
|
self.mouse_listener = mouse.Listener(
|
|
on_click=self._on_mouse_click,
|
|
on_move=self._on_mouse_move,
|
|
on_scroll=self._on_mouse_scroll
|
|
)
|
|
|
|
self.keyboard_listener = keyboard.Listener(
|
|
on_press=self._on_key_press
|
|
)
|
|
|
|
self.mouse_listener.start()
|
|
self.keyboard_listener.start()
|
|
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "event_capture_started"
|
|
})
|
|
|
|
print("✅ Capture d'événements démarrée")
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Arrête la capture d'événements de manière synchrone."""
|
|
if not self.capturing:
|
|
return
|
|
|
|
self.capturing = False
|
|
|
|
# Arrêter les listeners et attendre qu'ils se terminent
|
|
if self.mouse_listener:
|
|
self.mouse_listener.stop()
|
|
try:
|
|
# Attendre max 2 secondes que le listener se termine
|
|
self.mouse_listener.join(timeout=2.0)
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "mouse_listener_stop_error",
|
|
"error": str(e)
|
|
})
|
|
|
|
if self.keyboard_listener:
|
|
self.keyboard_listener.stop()
|
|
try:
|
|
# Attendre max 2 secondes que le listener se termine
|
|
self.keyboard_listener.join(timeout=2.0)
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "keyboard_listener_stop_error",
|
|
"error": str(e)
|
|
})
|
|
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "event_capture_stopped",
|
|
"total_events": len(self.events)
|
|
})
|
|
|
|
print("⏹️ Capture d'événements arrêtée")
|
|
|
|
def _on_mouse_click(self, x: int, y: int, button, pressed: bool):
|
|
"""Callback pour les clics souris."""
|
|
if not pressed: # On enregistre seulement les clics (pas les relâchements)
|
|
return
|
|
|
|
from .utils.image_utils import capture_screen
|
|
|
|
window = get_active_window()
|
|
|
|
# Capturer l'écran immédiatement
|
|
screenshot = capture_screen()
|
|
|
|
event = {
|
|
"type": "mouse_click",
|
|
"x": x,
|
|
"y": y,
|
|
"button": str(button),
|
|
"window": window,
|
|
"timestamp": datetime.now(),
|
|
"screenshot": screenshot # Ajout du screenshot
|
|
}
|
|
|
|
self._add_event(event)
|
|
|
|
def _on_mouse_move(self, x: int, y: int):
|
|
"""Callback pour les mouvements souris (optionnel, peut être bruyant)."""
|
|
# On n'enregistre pas tous les mouvements pour éviter le bruit
|
|
pass
|
|
|
|
def _on_mouse_scroll(self, x: int, y: int, dx: int, dy: int):
|
|
"""Callback pour le scroll."""
|
|
from .utils.image_utils import capture_screen
|
|
|
|
window = get_active_window()
|
|
screenshot = capture_screen()
|
|
|
|
event = {
|
|
"type": "scroll",
|
|
"x": x,
|
|
"y": y,
|
|
"dx": dx,
|
|
"dy": dy,
|
|
"window": window,
|
|
"timestamp": datetime.now(),
|
|
"screenshot": screenshot
|
|
}
|
|
|
|
self._add_event(event)
|
|
|
|
def _on_key_press(self, key):
|
|
"""Callback pour les frappes clavier."""
|
|
from .utils.image_utils import capture_screen
|
|
|
|
window = get_active_window()
|
|
|
|
try:
|
|
key_char = key.char
|
|
except AttributeError:
|
|
key_char = str(key)
|
|
|
|
# Détecter les combinaisons (Ctrl+C, Ctrl+V, etc.)
|
|
is_ctrl = hasattr(key, 'name') and 'ctrl' in str(key).lower()
|
|
is_combo = is_ctrl or (hasattr(key, 'name') and key.name in ['ctrl_l', 'ctrl_r', 'alt_l', 'alt_r'])
|
|
|
|
# Capturer screenshot seulement pour les combos importants
|
|
screenshot = None
|
|
if is_combo or key_char in ['c', 'v', 'a', 'x', 'z']:
|
|
screenshot = capture_screen()
|
|
|
|
event = {
|
|
"type": "key_press",
|
|
"key": key_char,
|
|
"window": window,
|
|
"timestamp": datetime.now(),
|
|
"screenshot": screenshot,
|
|
"is_combo": is_combo
|
|
}
|
|
|
|
self._add_event(event)
|
|
|
|
def _add_event(self, event: Dict[str, Any]):
|
|
"""Ajoute un événement à l'historique."""
|
|
with self.events_lock:
|
|
self.events.append(event)
|
|
|
|
# Limiter la mémoire : garder seulement les 100 derniers
|
|
if len(self.events) > 100:
|
|
# Supprimer le plus ancien
|
|
old_event = self.events.popleft()
|
|
# Libérer la mémoire du screenshot
|
|
if 'screenshot' in old_event:
|
|
del old_event['screenshot']
|
|
|
|
# Passer l'événement au SessionManager pour segmentation
|
|
try:
|
|
self.session_manager.add_action(event)
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "session_add_action_failed",
|
|
"error": str(e)
|
|
})
|
|
|
|
# Vérifier si on détecte un pattern
|
|
self._check_for_patterns()
|
|
|
|
def _check_for_patterns(self):
|
|
"""Vérifie si les derniers événements forment un pattern répétitif."""
|
|
# Détecter le pattern avec le lock
|
|
with self.events_lock:
|
|
if len(self.events) < self.pattern_threshold * 2:
|
|
return
|
|
|
|
# Analyser les N derniers événements
|
|
recent_events = list(self.events)[-20:] # 20 derniers événements
|
|
|
|
# Détecter des séquences répétitives
|
|
pattern = self._detect_repetitive_sequence(recent_events)
|
|
|
|
# Appeler les callbacks EN DEHORS du lock pour éviter les deadlocks
|
|
if pattern:
|
|
print(f"🎯 Pattern détecté dans event_capture !")
|
|
print(f" Répétitions: {pattern['repetitions']}")
|
|
print(f" Longueur: {pattern['length']}")
|
|
|
|
# Notifier les callbacks
|
|
for callback in self.pattern_callbacks:
|
|
try:
|
|
callback(pattern)
|
|
except Exception as e:
|
|
print(f"❌ Erreur dans callback: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def _detect_repetitive_sequence(
|
|
self,
|
|
events: List[Dict[str, Any]]
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Détecte une séquence répétitive dans les événements.
|
|
|
|
Returns:
|
|
Dictionnaire décrivant le pattern ou None
|
|
"""
|
|
if len(events) < self.pattern_threshold:
|
|
return None
|
|
|
|
# Simplifier les événements pour la comparaison
|
|
simplified = []
|
|
for e in events:
|
|
if e["type"] == "mouse_click":
|
|
# Regrouper les clics proches (tolérance de 100px - plus permissif)
|
|
simplified.append({
|
|
"type": "click",
|
|
"x_zone": e["x"] // 100, # Zones de 100px
|
|
"y_zone": e["y"] // 100,
|
|
"window": e["window"]
|
|
})
|
|
elif e["type"] == "key_press":
|
|
simplified.append({
|
|
"type": "key",
|
|
"key": e["key"],
|
|
"window": e["window"]
|
|
})
|
|
elif e["type"] == "scroll":
|
|
simplified.append({
|
|
"type": "scroll",
|
|
"window": e["window"]
|
|
})
|
|
|
|
# Chercher des répétitions
|
|
for seq_len in range(1, len(simplified) // self.pattern_threshold + 1):
|
|
sequence = simplified[-seq_len:]
|
|
|
|
# Vérifier si cette séquence se répète
|
|
repetitions = 0
|
|
for i in range(len(simplified) - seq_len, -1, -seq_len):
|
|
if simplified[i:i+seq_len] == sequence:
|
|
repetitions += 1
|
|
else:
|
|
break
|
|
|
|
if repetitions >= self.pattern_threshold:
|
|
return {
|
|
"sequence": sequence,
|
|
"repetitions": repetitions,
|
|
"length": seq_len,
|
|
"window": sequence[0]["window"] if sequence else None
|
|
}
|
|
|
|
return None
|
|
|
|
def register_pattern_callback(self, callback: Callable):
|
|
"""Enregistre un callback à appeler quand un pattern est détecté."""
|
|
self.pattern_callbacks.append(callback)
|
|
|
|
def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]:
|
|
"""Retourne les N derniers événements."""
|
|
with self.events_lock:
|
|
return list(self.events)[-count:]
|
|
|
|
def get_events_for_window(self, window_title: str) -> List[Dict[str, Any]]:
|
|
"""Retourne tous les événements pour une fenêtre donnée."""
|
|
with self.events_lock:
|
|
return [e for e in self.events if e.get("window") == window_title]
|
|
|
|
def clear_history(self):
|
|
"""Efface l'historique des événements."""
|
|
with self.events_lock:
|
|
self.events.clear()
|
|
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "event_history_cleared"
|
|
})
|
|
|
|
def get_last_screenshots(self, count: int = 3) -> List[Dict[str, Any]]:
|
|
"""
|
|
Retourne les N derniers événements avec screenshots.
|
|
|
|
Returns:
|
|
Liste d'événements avec screenshots
|
|
"""
|
|
with self.events_lock:
|
|
events_with_screenshots = [
|
|
e for e in self.events
|
|
if e.get('screenshot') is not None
|
|
]
|
|
return events_with_screenshots[-count:] if events_with_screenshots else []
|
|
|
|
def _on_session_completed(self, session):
|
|
"""
|
|
Callback appelé quand une session est terminée.
|
|
|
|
Args:
|
|
session: Session terminée
|
|
"""
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "session_completed_callback",
|
|
"session_id": session.session_id,
|
|
"action_count": session.action_count
|
|
})
|
|
|
|
# Analyser les sessions récentes pour détecter des workflows
|
|
recent_sessions = self.session_manager.get_recent_sessions(10)
|
|
self.workflow_detector.analyze_sessions(recent_sessions)
|
|
|
|
def _on_workflow_detected(self, workflow):
|
|
"""
|
|
Callback appelé quand un workflow est détecté.
|
|
|
|
Args:
|
|
workflow: Workflow détecté (dictionnaire)
|
|
"""
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "workflow_detected_callback",
|
|
"workflow_id": workflow.get("workflow_id"),
|
|
"name": workflow.get("name"),
|
|
"steps": len(workflow.get("steps", [])),
|
|
"repetitions": workflow.get("repetitions")
|
|
})
|
|
|
|
# Notifier les callbacks de pattern (pour compatibilité)
|
|
pattern_data = {
|
|
"type": "workflow",
|
|
"workflow_id": workflow.get("workflow_id"),
|
|
"name": workflow.get("name"),
|
|
"steps": len(workflow.get("steps", [])),
|
|
"repetitions": workflow.get("repetitions"),
|
|
"confidence": workflow.get("confidence")
|
|
}
|
|
|
|
for callback in self.pattern_callbacks:
|
|
try:
|
|
callback(pattern_data)
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.log_action({
|
|
"action": "workflow_callback_error",
|
|
"error": str(e)
|
|
})
|
|
|
|
def get_workflows(self):
|
|
"""
|
|
Retourne les workflows détectés.
|
|
|
|
Returns:
|
|
Liste des workflows
|
|
"""
|
|
return self.workflow_detector.get_workflows()
|
|
|
|
def get_sessions(self, count: int = 10):
|
|
"""
|
|
Retourne les sessions récentes.
|
|
|
|
Args:
|
|
count: Nombre de sessions à retourner
|
|
|
|
Returns:
|
|
Liste des sessions
|
|
"""
|
|
return self.session_manager.get_recent_sessions(count)
|
|
|
|
def get_workflow_stats(self):
|
|
"""
|
|
Retourne les statistiques des workflows.
|
|
|
|
Returns:
|
|
Dictionnaire de statistiques
|
|
"""
|
|
return {
|
|
"sessions": self.session_manager.get_stats(),
|
|
"workflows": self.workflow_detector.get_stats()
|
|
}
|
|
|
|
def force_finalize_session(self):
|
|
"""
|
|
Force la finalisation de la session courante.
|
|
"""
|
|
self.session_manager.force_finalize_session()
|
|
|
|
def capture_event(self, action: Dict[str, Any]):
|
|
"""
|
|
Capture un événement manuellement (pour les tests).
|
|
|
|
Args:
|
|
action: Action à capturer
|
|
"""
|
|
# Ajouter à la session
|
|
self.session_manager.add_action(action)
|
|
|
|
# Ajouter à l'historique
|
|
self._add_event(action)
|