Files
Geniusia_v2/geniusia2/main.py
2026-03-05 00:20:25 +01:00

437 lines
16 KiB
Python
Raw Permalink Blame History

"""
Point d'entrée principal pour RPA Vision V2.
Initialise tous les composants et lance l'application.
"""
import sys
import signal
import asyncio
from pathlib import Path
from typing import Optional
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QThread, pyqtSignal
from core.config import CONFIG
from core.logger import Logger
from core.embeddings_manager import EmbeddingsManager
from core.llm_manager import LLMManager
from core.learning_manager import LearningManager
from core.orchestrator import Orchestrator
from core.whitelist_manager import WhitelistManager
from core.ui_change_detector import UIChangeDetector
from core.metrics_collector import MetricsCollector
from core.utils.vision_utils import VisionUtils
from core.utils.input_utils import InputUtils
from core.replay_async import ReplayEngine
from gui.minimal_gui import MinimalGUI
from gui import setup_gui_for_orchestrator
class OrchestratorThread(QThread):
"""Thread pour exécuter l'orchestrateur de manière asynchrone."""
error_signal = pyqtSignal(str)
status_signal = pyqtSignal(str)
def __init__(self, orchestrator: Orchestrator):
super().__init__()
self.orchestrator = orchestrator
self.running = False
def run(self):
"""Exécute la boucle cognitive."""
self.running = True
self.status_signal.emit("started")
try:
# Exécuter l'orchestrateur (méthode synchrone)
self.orchestrator.run()
except Exception as e:
self.error_signal.emit(str(e))
finally:
self.running = False
self.status_signal.emit("stopped")
def stop(self):
"""Arrête l'orchestrateur."""
self.running = False
self.orchestrator.stop()
class RPAVisionApp:
"""Application principale RPA Vision V2."""
def __init__(self):
"""Initialise l'application."""
self.config = None
self.logger = None
self.embeddings_manager = None
self.llm_manager = None
self.vision_utils = None
self.input_utils = None
self.replay_engine = None
self.whitelist_manager = None
self.ui_change_detector = None
self.metrics_collector = None
self.learning_manager = None
self.orchestrator = None
self.gui = None
self.gui_bridge = None
self.orchestrator_thread = None
self.qt_app = None
def initialize(self):
"""Initialise tous les composants."""
print("🚀 Initialisation de RPA Vision V2...")
# 1. Charger la configuration
print("📋 Chargement de la configuration...")
self.config = CONFIG
config_dict = CONFIG
# 2. Initialiser le logger
print("📝 Initialisation du logger...")
self.logger = Logger()
self.logger.log_action({
"action": "application_startup",
"version": "2.0"
})
# 3. Initialiser le gestionnaire d'embeddings
print("🧠 Chargement du modèle OpenCLIP...")
self.embeddings_manager = EmbeddingsManager(
model_name=CONFIG["models"]["clip"],
logger=self.logger
)
# 4. Initialiser le gestionnaire LLM
print("💬 Connexion à Ollama...")
self.llm_manager = LLMManager(
model_name=CONFIG["models"]["llm"],
logger=self.logger
)
# 5. Initialiser les utilitaires de vision
print("👁️ Chargement des modèles de vision...")
self.vision_utils = VisionUtils(config=CONFIG)
# 6. Initialiser les utilitaires d'entrée
print("⌨️ Initialisation des utilitaires d'entrée...")
self.input_utils = InputUtils(logger=self.logger, config=CONFIG)
# 7. Initialiser le moteur de rejeu
print("🔄 Initialisation du moteur de rejeu...")
self.replay_engine = ReplayEngine(
input_utils=self.input_utils,
logger=self.logger,
config=CONFIG
)
# 8. Initialiser le gestionnaire de liste blanche
print("🛡️ Chargement de la liste blanche...")
self.whitelist_manager = WhitelistManager(logger=self.logger)
# 9. Initialiser le détecteur de changements UI
print("🔍 Initialisation du détecteur de changements UI...")
self.ui_change_detector = UIChangeDetector(
embeddings_manager=self.embeddings_manager,
logger=self.logger,
config=CONFIG
)
# 10. Initialiser le collecteur de métriques
print("📊 Initialisation du collecteur de métriques...")
self.metrics_collector = MetricsCollector(logger=self.logger, config=CONFIG)
# 11. Initialiser le gestionnaire d'apprentissage
print("🎓 Initialisation du gestionnaire d'apprentissage...")
self.learning_manager = LearningManager(
embeddings_manager=self.embeddings_manager,
logger=self.logger,
config=CONFIG
)
# 12. Initialiser l'orchestrateur
print("🎯 Initialisation de l'orchestrateur...")
self.orchestrator = Orchestrator(
learning_manager=self.learning_manager,
vision_utils=self.vision_utils,
llm_manager=self.llm_manager,
logger=self.logger,
config=CONFIG,
whitelist_manager=self.whitelist_manager,
input_utils=self.input_utils
)
print("✅ Tous les composants initialisés avec succès!")
def setup_gui(self):
"""Configure l'interface graphique."""
print("🖥️ Initialisation de l'interface graphique améliorée...")
# Créer l'application Qt
self.qt_app = QApplication(sys.argv)
# Utiliser la nouvelle GUI améliorée avec intégration automatique
self.gui_bridge = setup_gui_for_orchestrator(self.orchestrator)
self.gui = self.gui_bridge.gui
# Connecter les signaux (déjà fait par le bridge, mais on garde pour compatibilité)
self.gui.start_requested.connect(self.start_orchestrator)
self.gui.stop_requested.connect(self.stop_orchestrator)
self.gui.pause_requested.connect(self.pause_orchestrator)
# Envoyer un message de bienvenue
self.orchestrator.log_to_gui("", "Système initialisé et prêt", "success")
self.orchestrator.update_gui_stats(
actions_count=0,
patterns_count=0,
workflows_count=0
)
# Activer les boutons Pause et Arrêter (car démarrage automatique)
self.gui.pause_button.setEnabled(True)
self.gui.stop_button.setEnabled(True)
print("✅ Interface graphique améliorée prête!")
def start_orchestrator(self):
"""Démarre l'orchestrateur dans un thread séparé."""
if self.orchestrator_thread is None or not self.orchestrator_thread.running:
self.orchestrator_thread = OrchestratorThread(self.orchestrator)
# Connecter les signaux
self.orchestrator_thread.error_signal.connect(self.on_orchestrator_error)
self.orchestrator_thread.status_signal.connect(self.on_orchestrator_status)
# Démarrer le thread
self.orchestrator_thread.start()
self.logger.log_action({
"action": "orchestrator_started"
})
def stop_orchestrator(self):
"""Arrête l'orchestrateur."""
# Arrêter la capture d'événements
if hasattr(self, 'event_capture') and self.event_capture:
self.event_capture.stop()
# Arrêter le thread de l'orchestrateur
if self.orchestrator_thread and self.orchestrator_thread.running:
self.orchestrator_thread.stop()
self.orchestrator_thread.wait() # Attendre la fin du thread
self.logger.log_action({
"action": "orchestrator_stopped"
})
def pause_orchestrator(self):
"""Met en pause l'orchestrateur."""
if self.orchestrator:
self.orchestrator.pause()
self.logger.log_action({
"action": "orchestrator_paused"
})
def on_orchestrator_error(self, error: str):
"""Gère les erreurs de l'orchestrateur."""
self.logger.log_action({
"action": "orchestrator_error",
"error": error
})
# Logger l'erreur dans la GUI
if self.orchestrator and hasattr(self.orchestrator, 'log_to_gui'):
self.orchestrator.log_to_gui("", f"Erreur: {error}", "error")
def on_orchestrator_status(self, status: str):
"""Gère les changements de statut de l'orchestrateur."""
self.logger.log_action({
"action": "orchestrator_status_changed",
"status": status
})
def run(self, mode='shadow', headless=False):
"""
Lance l'application.
Args:
mode: Mode opérationnel (shadow, assist, auto)
headless: Si True, lance sans GUI
"""
try:
# Initialiser les composants
self.initialize()
# Définir le mode
if self.learning_manager:
# Mode progressif: démarre en shadow, propose assist après patterns
if mode == 'progressive':
self.orchestrator.enable_progressive_mode()
actual_mode = 'shadow' # Démarre en shadow
else:
actual_mode = mode
self.learning_manager.mode = actual_mode
self.logger.log_action({
"action": "mode_set",
"mode": mode,
"actual_mode": actual_mode if mode == 'progressive' else mode,
"headless": headless
})
if headless:
# Mode headless: juste l'orchestrateur
print(f"🔇 Mode headless activé - Mode: {mode.upper()}")
self.orchestrator.run()
else:
# Mode normal avec GUI
self.setup_gui()
self.gui.show()
# Configurer le gestionnaire de signaux pour arrêt gracieux
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
print("\n" + "="*60)
print("🎉 RPA Vision V2 est prêt!")
print("="*60)
print(f"\n🎯 Mode: {mode.upper()}")
print("\n📌 Raccourcis clavier:")
print(" - Ctrl+Pause : Arrêt d'urgence")
print(" - Entrée : Valider une suggestion")
print(" - Échap : Rejeter une suggestion")
print(" - Alt+C : Corriger une action")
print("\n<EFBFBD> CDémarrage automatique de l'observation...")
print("="*60 + "\n")
# Envoyer un log de démarrage
self.orchestrator.log_to_gui("🚀", "Démarrage de l'observation...", "info")
# Démarrer automatiquement l'orchestrator
self.start_orchestrator()
# Confirmer le démarrage
self.orchestrator.log_to_gui("👀", "Observation active - En attente d'actions...", "success")
self.gui.signals.emit_status_change(True)
# Lancer la boucle d'événements Qt
sys.exit(self.qt_app.exec_())
except KeyboardInterrupt:
print("\n🛑 Arrêt demandé...")
self.shutdown()
except Exception as e:
print(f"\n❌ Erreur fatale: {e}")
import traceback
traceback.print_exc()
self.logger.log_action({
"action": "application_fatal_error",
"error": str(e)
})
sys.exit(1)
def signal_handler(self, signum, frame):
"""Gère les signaux d'arrêt (Ctrl+C, etc.)."""
print("\n\n🛑 Arrêt demandé...")
self.shutdown()
def shutdown(self):
"""Arrête proprement l'application avec timeout de 5 secondes."""
import time
from threading import Thread
print("🔄 Arrêt en cours...")
start_time = time.time()
# Arrêter la capture d'événements (IMPORTANT pour libérer les listeners pynput)
if self.orchestrator and hasattr(self.orchestrator, 'event_capture'):
print(" - Arrêt de la capture d'événements...")
try:
self.orchestrator.event_capture.stop()
except Exception as e:
print(f" ⚠️ Erreur lors de l'arrêt de la capture: {e}")
# Arrêter l'orchestrateur avec timeout
if self.orchestrator_thread:
print(" - Arrêt de l'orchestrateur...")
try:
self.stop_orchestrator()
# Attendre max 3 secondes que le thread se termine
if self.orchestrator_thread.isRunning():
self.orchestrator_thread.wait(3000) # 3 secondes en ms
if self.orchestrator_thread.isRunning():
print(" ⚠️ Timeout: forçage de l'arrêt")
self.orchestrator_thread.terminate()
except Exception as e:
print(f" ⚠️ Erreur lors de l'arrêt de l'orchestrateur: {e}")
# Sauvegarder l'état d'apprentissage
if self.learning_manager:
print(" - Sauvegarde de l'état d'apprentissage...")
try:
# Le learning_manager sauvegarde automatiquement via le logger
pass
except Exception as e:
print(f" ⚠️ Erreur lors de la sauvegarde: {e}")
# Sauvegarder la liste blanche
if self.whitelist_manager:
print(" - Sauvegarde de la liste blanche...")
try:
self.whitelist_manager.save_whitelist()
except Exception as e:
print(f" ⚠️ Erreur lors de la sauvegarde: {e}")
# Finaliser les logs
if self.logger:
print(" - Finalisation des logs...")
try:
self.logger.log_action({
"action": "application_shutdown",
"graceful": True,
"duration_seconds": time.time() - start_time
})
except Exception as e:
print(f" ⚠️ Erreur lors de la finalisation des logs: {e}")
elapsed = time.time() - start_time
print(f"✅ Arrêt terminé en {elapsed:.1f}s. Au revoir!")
# Quitter l'application Qt
if self.qt_app:
self.qt_app.quit()
def main():
"""Point d'entrée principal."""
import argparse
# Parser les arguments
parser = argparse.ArgumentParser(description='RPA Vision V2 - Automatisation intelligente')
parser.add_argument(
'--mode',
type=str,
choices=['shadow', 'assist', 'auto', 'progressive'],
default='progressive',
help='Mode opérationnel: progressive (adaptatif), shadow (observation), assist (suggestions), auto (automatique)'
)
parser.add_argument(
'--headless',
action='store_true',
help='Lancer sans interface graphique (pour tests)'
)
args = parser.parse_args()
# Créer et lancer l'application
app = RPAVisionApp()
app.run(mode=args.mode, headless=args.headless)
if __name__ == "__main__":
main()