""" Gestionnaire d'apprentissage pour RPA Vision V2. Gère les transitions de mode, le calcul de confiance et l'état d'apprentissage. """ import json from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime from dataclasses import asdict import numpy as np from .models import TaskProfile, Action from .embeddings_manager import EmbeddingsManager from .logger import Logger from .ui_change_detector import UIChangeDetector class LearningManager: """ Gestionnaire d'apprentissage qui suit la progression, gère les transitions de mode et calcule les scores de confiance. """ def __init__( self, embeddings_manager: EmbeddingsManager, logger: Logger, config: Dict[str, Any], profiles_path: str = "data/user_profiles" ): """ Initialise le gestionnaire d'apprentissage. Args: embeddings_manager: Gestionnaire d'embeddings logger: Logger pour la journalisation config: Configuration globale profiles_path: Chemin vers les profils utilisateur """ self.embeddings_manager = embeddings_manager self.logger = logger self.config = config self.profiles_path = Path(profiles_path) # Créer le répertoire de profils self.profiles_path.mkdir(parents=True, exist_ok=True) # État actuel self.mode = "shadow" # Mode initial self.tasks: Dict[str, TaskProfile] = {} self.current_task_id: Optional[str] = None self.current_context: Dict[str, Any] = {} # Seuils de configuration self.autopilot_observations = config.get("thresholds", {}).get( "autopilot_observations", 20 ) self.autopilot_concordance = config.get("thresholds", {}).get( "autopilot_concordance", 0.95 ) self.confidence_min = config.get("thresholds", {}).get( "confidence_min", 0.90 ) self.rollback_confidence = config.get("thresholds", {}).get( "rollback_confidence", 0.85 ) # Initialiser le détecteur de changements UI self.ui_change_detector = UIChangeDetector( embeddings_manager, logger, config ) # Charger les profils existants self._load_profiles() # Charger les tâches existantes dans l'index FAISS self._load_existing_tasks_to_index() self.logger.log_action({ "action": "learning_manager_initialized", "mode": self.mode, "num_tasks": len(self.tasks) }) def _save_profile(self, task_id: str): """Sauvegarde un profil de tâche.""" if task_id not in self.tasks: return try: profile = self.tasks[task_id] profile_file = self.profiles_path / f"{task_id}.json" with open(profile_file, 'w', encoding='utf-8') as f: json.dump(profile.to_json(), f, indent=2, ensure_ascii=False) self.logger.log_action({ "action": "profile_saved", "task_id": task_id }) except Exception as e: self.logger.log_action({ "action": "profile_save_error", "task_id": task_id, "error": str(e) }) def observe(self, action: Action): """ Enregistre une observation en mode Shadow. Args: action: Action observée """ # Obtenir ou créer le profil de tâche task_id = self.current_task_id or self._generate_task_id(action) if task_id not in self.tasks: self.tasks[task_id] = TaskProfile( task_id=task_id, task_name=f"Tâche {task_id}", mode="shadow", observation_count=0, concordance_rate=0.0, confidence_score=0.0, correction_count=0, last_execution=datetime.now(), window_whitelist=[action.window_title], action_sequence=[], embeddings=[], metadata={} ) task = self.tasks[task_id] task.observation_count += 1 task.action_sequence.append(action) task.last_execution = datetime.now() # Ajouter l'embedding si disponible if action.embedding is not None: task.embeddings.append(action.embedding) # Ajouter à l'index FAISS self.embeddings_manager.add_to_index( action.embedding, { "task_id": task_id, "action_type": action.action_type, "target_element": action.target_element, "timestamp": action.timestamp.isoformat() } ) self.logger.log_action({ "action": "observation_recorded", "task_id": task_id, "observation_count": task.observation_count, "action_type": action.action_type }) # Sauvegarder le profil self._save_profile(task_id) # Vérifier si on peut passer en mode Assisté if task.observation_count >= 5 and task.mode == "shadow": self._transition_mode(task_id, "assist") def _generate_task_id(self, action: Action) -> str: """Génère un ID de tâche basé sur l'action.""" window_clean = action.window_title.replace(" ", "_").lower() element_clean = action.target_element.replace(" ", "_").lower() return f"{window_clean}_{element_clean}_{action.action_type}" def suggest_action(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Génère une suggestion d'action en mode Assisté. Args: context: Contexte actuel (fenêtre, capture d'écran, etc.) Returns: Suggestion d'action ou None """ if not self.current_task_id or self.current_task_id not in self.tasks: return None task = self.tasks[self.current_task_id] if task.mode != "assist" and task.mode != "auto": return None # Trouver l'action la plus similaire dans l'historique if not task.embeddings or 'current_embedding' not in context: return None current_emb = context['current_embedding'] # Rechercher les embeddings similaires similar = self.embeddings_manager.search_similar(current_emb, k=3) if not similar: return None # Prendre le plus similaire best_match = similar[0] similarity = best_match['similarity'] # Trouver l'action correspondante matching_action = None for action in task.action_sequence: if (action.timestamp.isoformat() == best_match['metadata'].get('timestamp')): matching_action = action break if not matching_action: return None # Calculer la confiance vision_conf = similarity llm_score = context.get('llm_score', 0.5) confidence = self.calculate_confidence(vision_conf, llm_score, self.current_task_id) suggestion = { "action": matching_action, "confidence": confidence, "similarity": similarity, "task_id": self.current_task_id } self.logger.log_action({ "action": "suggestion_generated", "task_id": self.current_task_id, "confidence": confidence, "action_type": matching_action.action_type }) return suggestion def confirm_action(self, feedback: Dict[str, Any]): """ Traite la validation ou correction utilisateur. Args: feedback: Dictionnaire avec type (accept/reject/correct) et données """ feedback_type = feedback.get("type") task_id = feedback.get("task_id", self.current_task_id) if not task_id or task_id not in self.tasks: return task = self.tasks[task_id] if feedback_type == "accept": # Action acceptée, augmenter la concordance self._update_concordance(task_id, success=True) self.logger.log_action({ "action": "action_accepted", "task_id": task_id, "concordance_rate": task.concordance_rate }) elif feedback_type == "reject": # Action rejetée self._update_concordance(task_id, success=False) self.logger.log_action({ "action": "action_rejected", "task_id": task_id, "concordance_rate": task.concordance_rate }) elif feedback_type == "correct": # Correction fournie task.correction_count += 1 corrected_action = feedback.get("corrected_action") if corrected_action: # Ajouter la correction à la séquence task.action_sequence.append(corrected_action) # Mettre à jour les embeddings if corrected_action.embedding is not None: task.embeddings.append(corrected_action.embedding) self.embeddings_manager.add_to_index( corrected_action.embedding, { "task_id": task_id, "action_type": corrected_action.action_type, "target_element": corrected_action.target_element, "timestamp": corrected_action.timestamp.isoformat(), "is_correction": True } ) self._update_concordance(task_id, success=False) self.logger.log_correction({ "task_id": task_id, "correction_count": task.correction_count, "corrected_action": corrected_action.to_dict() if corrected_action else None }) # Sauvegarder le profil self._save_profile(task_id) # Vérifier les transitions de mode self._check_mode_transitions(task_id) def _update_concordance(self, task_id: str, success: bool): """Met à jour le taux de concordance.""" if task_id not in self.tasks: return task = self.tasks[task_id] # Utiliser une moyenne mobile sur les 10 dernières exécutions if not hasattr(task, '_recent_results'): task.metadata['recent_results'] = [] recent_results = task.metadata.get('recent_results', []) recent_results.append(1 if success else 0) # Garder seulement les 10 derniers if len(recent_results) > 10: recent_results = recent_results[-10:] task.metadata['recent_results'] = recent_results task.concordance_rate = sum(recent_results) / len(recent_results) def calculate_confidence( self, vision_conf: float, llm_score: float, task_id: str ) -> float: """ Calcule le score de confiance pondéré. Args: vision_conf: Confiance de la détection vision (0-1) llm_score: Score du LLM (0-1) task_id: ID de la tâche Returns: Score de confiance (0-1) """ # Obtenir la performance historique history_score = self._get_historical_performance(task_id) # Formule : 0.6 × vision + 0.3 × llm + 0.1 × historique confidence = ( 0.6 * vision_conf + 0.3 * llm_score + 0.1 * history_score ) return max(0.0, min(1.0, confidence)) def _get_historical_performance(self, task_id: str) -> float: """Obtient la performance historique d'une tâche.""" if task_id not in self.tasks: return 0.5 task = self.tasks[task_id] return task.concordance_rate def evaluate_task(self, task_id: str) -> Dict[str, Any]: """ Évalue une tâche et retourne ses métriques. Args: task_id: ID de la tâche Returns: Dictionnaire de métriques """ if task_id not in self.tasks: return {} task = self.tasks[task_id] return { "task_id": task_id, "task_name": task.task_name, "mode": task.mode, "observation_count": task.observation_count, "concordance_rate": task.concordance_rate, "confidence_score": task.confidence_score, "correction_count": task.correction_count, "correction_rate": ( task.correction_count / max(1, task.observation_count) ), "last_execution": task.last_execution.isoformat() if task.last_execution else None } def should_transition_to_auto(self, task_id: str) -> bool: """ Vérifie si une tâche remplit les critères pour passer en Autopilot. Args: task_id: ID de la tâche Returns: True si éligible, False sinon """ if task_id not in self.tasks: return False task = self.tasks[task_id] return ( task.observation_count >= self.autopilot_observations and task.concordance_rate >= self.autopilot_concordance ) def rollback_if_low_confidence(self, task_id: str): """ Rétrograde une tâche au mode Assisté si la confiance est faible. Args: task_id: ID de la tâche """ if task_id not in self.tasks: return task = self.tasks[task_id] if task.mode == "auto" and task.confidence_score < self.confidence_min: self._transition_mode(task_id, "assist") self.logger.log_mode_transition( task_id, "auto", "assist", f"Confiance faible: {task.confidence_score:.2f}" ) def _check_mode_transitions(self, task_id: str): """Vérifie et effectue les transitions de mode si nécessaire.""" if task_id not in self.tasks: return task = self.tasks[task_id] # Shadow → Assist (après 5 observations) if task.mode == "shadow" and task.observation_count >= 5: self._transition_mode(task_id, "assist") # Assist → Auto (si critères remplis) elif task.mode == "assist" and self.should_transition_to_auto(task_id): self._transition_mode(task_id, "auto") # Auto → Assist (si confiance faible) elif task.mode == "auto" and task.concordance_rate < self.rollback_confidence: self._transition_mode(task_id, "assist") def _transition_mode(self, task_id: str, new_mode: str): """Effectue une transition de mode.""" if task_id not in self.tasks: return task = self.tasks[task_id] old_mode = task.mode if old_mode == new_mode: return task.mode = new_mode self.logger.log_mode_transition( task_id, old_mode, new_mode, f"Observations: {task.observation_count}, Concordance: {task.concordance_rate:.2%}" ) self._save_profile(task_id) def get_mode(self) -> str: """Retourne le mode opérationnel actuel.""" if self.current_task_id and self.current_task_id in self.tasks: return self.tasks[self.current_task_id].mode return self.mode def get_current_intent(self) -> str: """Retourne l'intention actuelle.""" return self.current_context.get("intent", "") def set_current_intent(self, intent: str): """ Définit l'intention actuelle. Args: intent: Intention à définir (ex: "button", "text field", "form") """ self.current_context["intent"] = intent self.logger.log_action({ "action": "intent_set", "intent": intent, "mode": self.mode }) def create_task_from_signatures( self, signatures: List[Dict[str, Any]], description: str = "Tâche automatique" ) -> TaskProfile: """ Crée une tâche à partir de signatures visuelles. Args: signatures: Liste de signatures d'actions description: Description de la tâche Returns: TaskProfile créé """ import hashlib import json # Générer un ID unique task_id = hashlib.md5( json.dumps(str(signatures[0])).encode() ).hexdigest()[:8] # Créer le profil de tâche task = TaskProfile( task_id=f"task_{task_id}", task_name=description, window_whitelist=[signatures[0].get("window", "Unknown")], observation_count=len(signatures), embeddings=[sig.get("embedding") for sig in signatures if sig.get("embedding") is not None], metadata={"signatures": signatures} # Stocker les signatures dans metadata ) # Ajouter aux tâches self.tasks[task.task_id] = task # Sauvegarder self._save_task(task) self.logger.log_action({ "action": "task_created", "task_id": task.task_id, "signatures_count": len(signatures) }) return task def _save_task(self, task: TaskProfile): """Sauvegarde une tâche sur disque.""" import json import pickle from pathlib import Path task_dir = self.profiles_path / task.task_id task_dir.mkdir(parents=True, exist_ok=True) # Sauvegarder les métadonnées metadata = { "task_id": task.task_id, "task_name": task.task_name, "window_whitelist": task.window_whitelist, "observation_count": task.observation_count, "mode": task.mode, "confidence_score": task.confidence_score } with open(task_dir / "metadata.json", "w") as f: json.dump(metadata, f, indent=2) # Sauvegarder les signatures (depuis metadata) if "signatures" in task.metadata: with open(task_dir / "signatures.pkl", "wb") as f: pickle.dump(task.metadata["signatures"], f) # Sauvegarder l'index FAISS automatiquement (MVP) try: self.embeddings_manager.save_index() self.logger.log_action({ "action": "faiss_index_saved", "task_id": task.task_id }) except Exception as e: self.logger.log_action({ "action": "faiss_index_save_error", "task_id": task.task_id, "error": str(e) }) def load_task(self, task_id: str) -> Optional[TaskProfile]: """Charge une tâche depuis le disque.""" import json import pickle task_dir = self.profiles_path / task_id if not task_dir.exists(): return None try: # Charger métadonnées with open(task_dir / "metadata.json", "r") as f: metadata = json.load(f) task = TaskProfile( task_id=metadata["task_id"], task_name=metadata.get("task_name", metadata.get("description", "Unknown")), window_whitelist=metadata.get("window_whitelist", [metadata.get("window_title", "Unknown")]), observation_count=metadata.get("observation_count", metadata.get("observations", 0)), mode=metadata.get("mode", "shadow"), confidence_score=metadata.get("confidence_score", 0.0) ) # Charger signatures dans metadata sig_file = task_dir / "signatures.pkl" if sig_file.exists(): with open(sig_file, "rb") as f: task.metadata["signatures"] = pickle.load(f) return task except Exception as e: self.logger.log_action({ "action": "task_load_failed", "task_id": task_id, "error": str(e) }) return None def set_current_task(self, task_id: str): """Définit la tâche actuelle.""" self.current_task_id = task_id def set_current_context(self, context: Dict[str, Any]): """Définit le contexte actuel.""" self.current_context = context def record_execution(self, decision: Dict[str, Any]): """ Enregistre l'exécution d'une action. Args: decision: Dictionnaire décrivant la décision et l'action """ task_id = decision.get("task_id", self.current_task_id) if not task_id or task_id not in self.tasks: return task = self.tasks[task_id] task.last_execution = datetime.now() # Mettre à jour le score de confiance if "confidence" in decision: task.confidence_score = decision["confidence"] self.logger.log_action({ "action": "execution_recorded", "task_id": task_id, "confidence": task.confidence_score, "mode": task.mode }) self._save_profile(task_id) def get_all_tasks(self) -> List[Dict[str, Any]]: """Retourne toutes les tâches avec leurs métriques.""" return [self.evaluate_task(task_id) for task_id in self.tasks.keys()] def get_task_stats(self) -> Dict[str, Any]: """Retourne des statistiques globales.""" total_tasks = len(self.tasks) shadow_tasks = sum(1 for t in self.tasks.values() if t.mode == "shadow") assist_tasks = sum(1 for t in self.tasks.values() if t.mode == "assist") auto_tasks = sum(1 for t in self.tasks.values() if t.mode == "auto") return { "total_tasks": total_tasks, "shadow_tasks": shadow_tasks, "assist_tasks": assist_tasks, "auto_tasks": auto_tasks, "current_mode": self.get_mode() } def check_ui_changes( self, task_id: str, current_embedding: np.ndarray, predicted_bbox: Optional[Tuple[int, int, int, int]] = None, actual_bbox: Optional[Tuple[int, int, int, int]] = None ) -> Dict[str, Any]: """ Vérifie les changements UI pour une tâche et déclenche le ré-entraînement si nécessaire. Args: task_id: ID de la tâche current_embedding: Embedding visuel actuel predicted_bbox: Bbox prédite (optionnel) actual_bbox: Bbox réelle (optionnel) Returns: Résultats de la vérification """ if task_id not in self.tasks: return { "error": "task_not_found", "task_id": task_id } task = self.tasks[task_id] # Vérifier les changements avec le détecteur result = self.ui_change_detector.check_and_trigger_retraining( task_id, current_embedding, task.embeddings, predicted_bbox, actual_bbox ) # Si ré-entraînement déclenché, mettre à jour le profil if result.get("retraining_triggered"): # Ajouter l'embedding actuel pour améliorer la détection future task.embeddings.append(current_embedding) # Ajouter à l'index FAISS self.embeddings_manager.add_to_index( current_embedding, { "task_id": task_id, "timestamp": datetime.now().isoformat(), "is_retraining": True } ) # Sauvegarder le profil self._save_profile(task_id) # Logger l'événement self.logger.log_action({ "action": "ui_change_retraining", "task_id": task_id, "similarity": result.get("similarity"), "deltas": result.get("deltas"), "reason": "ui_drift_detected" }) return result def monitor_execution_drift( self, task_id: str, predicted_action: Action, actual_action: Action ) -> bool: """ Surveille la dérive entre l'action prédite et l'action réelle. Args: task_id: ID de la tâche predicted_action: Action prédite par le système actual_action: Action réelle effectuée/validée Returns: True si dérive détectée et ré-entraînement déclenché """ if task_id not in self.tasks: return False # Vérifier les changements UI result = self.check_ui_changes( task_id, actual_action.embedding, predicted_action.bbox, actual_action.bbox ) return result.get("retraining_triggered", False) def get_ui_change_stats(self) -> Dict[str, Any]: """ Retourne les statistiques de détection de changements UI. Returns: Statistiques du détecteur de changements """ return self.ui_change_detector.get_stats() def _load_profiles(self): """Charge les profils de tâches existants.""" if not self.profiles_path.exists(): return for profile_file in self.profiles_path.glob("*.json"): try: with open(profile_file, 'r', encoding='utf-8') as f: json_str = f.read() # Charger le profil (sans embeddings pour l'instant) task = TaskProfile.from_json(json_str) self.tasks[task.task_id] = task self.logger.log_action({ "action": "profile_loaded", "task_id": task.task_id, "mode": task.mode }) except Exception as e: self.logger.log_action({ "action": "profile_load_error", "file": str(profile_file), "error": str(e) }) def _load_existing_tasks_to_index(self): """ Charge les tâches existantes dans l'index FAISS au démarrage. Résout le problème : index vide malgré 40 tâches sauvegardées. """ from .faiss_index_builder import FAISSIndexBuilder self.logger.log_action({ "action": "load_existing_tasks_started" }) try: # Créer le builder builder = FAISSIndexBuilder( self.embeddings_manager, self.logger, str(self.profiles_path) ) # Vérifier l'intégrité de l'index report = builder.verify_index_integrity() self.logger.log_action({ "action": "index_integrity_check", "report": report }) # Si l'index est vide ou incohérent, le reconstruire if report['needs_rebuild']: self.logger.log_action({ "action": "rebuilding_index_automatically", "reason": "index_empty_or_inconsistent" }) stats = builder.rebuild_index(force=True) self.logger.log_action({ "action": "index_rebuilt_automatically", "stats": stats }) if stats['success']: self.logger.log_action({ "action": "load_existing_tasks_success", "embeddings_loaded": stats['embeddings_added'], "tasks_processed": stats['tasks_processed'] }) else: self.logger.log_action({ "action": "load_existing_tasks_failed", "reason": "rebuild_failed" }) else: self.logger.log_action({ "action": "load_existing_tasks_skipped", "reason": "index_already_consistent", "index_size": report['actual_embeddings'] }) # Charger les tâches dans self.tasks si pas déjà fait tasks_info = builder.scan_tasks() for task_info in tasks_info: task_id = task_info['task_id'] if task_id not in self.tasks: # Charger la tâche task = self.load_task(task_id) if task: self.tasks[task_id] = task self.logger.log_action({ "action": "task_loaded_from_disk", "task_id": task_id }) except Exception as e: self.logger.log_action({ "action": "load_existing_tasks_error", "error": str(e) }) # Ne pas bloquer le démarrage si erreur import traceback self.logger.log_action({ "action": "load_existing_tasks_traceback", "traceback": traceback.format_exc() })