Files
Geniusia_v2/geniusia2/core/learning_manager.py
2026-03-05 00:20:25 +01:00

909 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Gestionnaire d'apprentissage pour RPA Vision V2.
Gère les transitions de mode, le calcul de confiance et l'état d'apprentissage.
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from dataclasses import asdict
import numpy as np
from .models import TaskProfile, Action
from .embeddings_manager import EmbeddingsManager
from .logger import Logger
from .ui_change_detector import UIChangeDetector
class LearningManager:
"""
Gestionnaire d'apprentissage qui suit la progression, gère les transitions
de mode et calcule les scores de confiance.
"""
def __init__(
self,
embeddings_manager: EmbeddingsManager,
logger: Logger,
config: Dict[str, Any],
profiles_path: str = "data/user_profiles"
):
"""
Initialise le gestionnaire d'apprentissage.
Args:
embeddings_manager: Gestionnaire d'embeddings
logger: Logger pour la journalisation
config: Configuration globale
profiles_path: Chemin vers les profils utilisateur
"""
self.embeddings_manager = embeddings_manager
self.logger = logger
self.config = config
self.profiles_path = Path(profiles_path)
# Créer le répertoire de profils
self.profiles_path.mkdir(parents=True, exist_ok=True)
# État actuel
self.mode = "shadow" # Mode initial
self.tasks: Dict[str, TaskProfile] = {}
self.current_task_id: Optional[str] = None
self.current_context: Dict[str, Any] = {}
# Seuils de configuration
self.autopilot_observations = config.get("thresholds", {}).get(
"autopilot_observations", 20
)
self.autopilot_concordance = config.get("thresholds", {}).get(
"autopilot_concordance", 0.95
)
self.confidence_min = config.get("thresholds", {}).get(
"confidence_min", 0.90
)
self.rollback_confidence = config.get("thresholds", {}).get(
"rollback_confidence", 0.85
)
# Initialiser le détecteur de changements UI
self.ui_change_detector = UIChangeDetector(
embeddings_manager,
logger,
config
)
# Charger les profils existants
self._load_profiles()
# Charger les tâches existantes dans l'index FAISS
self._load_existing_tasks_to_index()
self.logger.log_action({
"action": "learning_manager_initialized",
"mode": self.mode,
"num_tasks": len(self.tasks)
})
def _save_profile(self, task_id: str):
"""Sauvegarde un profil de tâche."""
if task_id not in self.tasks:
return
try:
profile = self.tasks[task_id]
profile_file = self.profiles_path / f"{task_id}.json"
with open(profile_file, 'w', encoding='utf-8') as f:
json.dump(profile.to_json(), f, indent=2, ensure_ascii=False)
self.logger.log_action({
"action": "profile_saved",
"task_id": task_id
})
except Exception as e:
self.logger.log_action({
"action": "profile_save_error",
"task_id": task_id,
"error": str(e)
})
def observe(self, action: Action):
"""
Enregistre une observation en mode Shadow.
Args:
action: Action observée
"""
# Obtenir ou créer le profil de tâche
task_id = self.current_task_id or self._generate_task_id(action)
if task_id not in self.tasks:
self.tasks[task_id] = TaskProfile(
task_id=task_id,
task_name=f"Tâche {task_id}",
mode="shadow",
observation_count=0,
concordance_rate=0.0,
confidence_score=0.0,
correction_count=0,
last_execution=datetime.now(),
window_whitelist=[action.window_title],
action_sequence=[],
embeddings=[],
metadata={}
)
task = self.tasks[task_id]
task.observation_count += 1
task.action_sequence.append(action)
task.last_execution = datetime.now()
# Ajouter l'embedding si disponible
if action.embedding is not None:
task.embeddings.append(action.embedding)
# Ajouter à l'index FAISS
self.embeddings_manager.add_to_index(
action.embedding,
{
"task_id": task_id,
"action_type": action.action_type,
"target_element": action.target_element,
"timestamp": action.timestamp.isoformat()
}
)
self.logger.log_action({
"action": "observation_recorded",
"task_id": task_id,
"observation_count": task.observation_count,
"action_type": action.action_type
})
# Sauvegarder le profil
self._save_profile(task_id)
# Vérifier si on peut passer en mode Assisté
if task.observation_count >= 5 and task.mode == "shadow":
self._transition_mode(task_id, "assist")
def _generate_task_id(self, action: Action) -> str:
"""Génère un ID de tâche basé sur l'action."""
window_clean = action.window_title.replace(" ", "_").lower()
element_clean = action.target_element.replace(" ", "_").lower()
return f"{window_clean}_{element_clean}_{action.action_type}"
def suggest_action(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Génère une suggestion d'action en mode Assisté.
Args:
context: Contexte actuel (fenêtre, capture d'écran, etc.)
Returns:
Suggestion d'action ou None
"""
if not self.current_task_id or self.current_task_id not in self.tasks:
return None
task = self.tasks[self.current_task_id]
if task.mode != "assist" and task.mode != "auto":
return None
# Trouver l'action la plus similaire dans l'historique
if not task.embeddings or 'current_embedding' not in context:
return None
current_emb = context['current_embedding']
# Rechercher les embeddings similaires
similar = self.embeddings_manager.search_similar(current_emb, k=3)
if not similar:
return None
# Prendre le plus similaire
best_match = similar[0]
similarity = best_match['similarity']
# Trouver l'action correspondante
matching_action = None
for action in task.action_sequence:
if (action.timestamp.isoformat() ==
best_match['metadata'].get('timestamp')):
matching_action = action
break
if not matching_action:
return None
# Calculer la confiance
vision_conf = similarity
llm_score = context.get('llm_score', 0.5)
confidence = self.calculate_confidence(vision_conf, llm_score, self.current_task_id)
suggestion = {
"action": matching_action,
"confidence": confidence,
"similarity": similarity,
"task_id": self.current_task_id
}
self.logger.log_action({
"action": "suggestion_generated",
"task_id": self.current_task_id,
"confidence": confidence,
"action_type": matching_action.action_type
})
return suggestion
def confirm_action(self, feedback: Dict[str, Any]):
"""
Traite la validation ou correction utilisateur.
Args:
feedback: Dictionnaire avec type (accept/reject/correct) et données
"""
feedback_type = feedback.get("type")
task_id = feedback.get("task_id", self.current_task_id)
if not task_id or task_id not in self.tasks:
return
task = self.tasks[task_id]
if feedback_type == "accept":
# Action acceptée, augmenter la concordance
self._update_concordance(task_id, success=True)
self.logger.log_action({
"action": "action_accepted",
"task_id": task_id,
"concordance_rate": task.concordance_rate
})
elif feedback_type == "reject":
# Action rejetée
self._update_concordance(task_id, success=False)
self.logger.log_action({
"action": "action_rejected",
"task_id": task_id,
"concordance_rate": task.concordance_rate
})
elif feedback_type == "correct":
# Correction fournie
task.correction_count += 1
corrected_action = feedback.get("corrected_action")
if corrected_action:
# Ajouter la correction à la séquence
task.action_sequence.append(corrected_action)
# Mettre à jour les embeddings
if corrected_action.embedding is not None:
task.embeddings.append(corrected_action.embedding)
self.embeddings_manager.add_to_index(
corrected_action.embedding,
{
"task_id": task_id,
"action_type": corrected_action.action_type,
"target_element": corrected_action.target_element,
"timestamp": corrected_action.timestamp.isoformat(),
"is_correction": True
}
)
self._update_concordance(task_id, success=False)
self.logger.log_correction({
"task_id": task_id,
"correction_count": task.correction_count,
"corrected_action": corrected_action.to_dict() if corrected_action else None
})
# Sauvegarder le profil
self._save_profile(task_id)
# Vérifier les transitions de mode
self._check_mode_transitions(task_id)
def _update_concordance(self, task_id: str, success: bool):
"""Met à jour le taux de concordance."""
if task_id not in self.tasks:
return
task = self.tasks[task_id]
# Utiliser une moyenne mobile sur les 10 dernières exécutions
if not hasattr(task, '_recent_results'):
task.metadata['recent_results'] = []
recent_results = task.metadata.get('recent_results', [])
recent_results.append(1 if success else 0)
# Garder seulement les 10 derniers
if len(recent_results) > 10:
recent_results = recent_results[-10:]
task.metadata['recent_results'] = recent_results
task.concordance_rate = sum(recent_results) / len(recent_results)
def calculate_confidence(
self,
vision_conf: float,
llm_score: float,
task_id: str
) -> float:
"""
Calcule le score de confiance pondéré.
Args:
vision_conf: Confiance de la détection vision (0-1)
llm_score: Score du LLM (0-1)
task_id: ID de la tâche
Returns:
Score de confiance (0-1)
"""
# Obtenir la performance historique
history_score = self._get_historical_performance(task_id)
# Formule : 0.6 × vision + 0.3 × llm + 0.1 × historique
confidence = (
0.6 * vision_conf +
0.3 * llm_score +
0.1 * history_score
)
return max(0.0, min(1.0, confidence))
def _get_historical_performance(self, task_id: str) -> float:
"""Obtient la performance historique d'une tâche."""
if task_id not in self.tasks:
return 0.5
task = self.tasks[task_id]
return task.concordance_rate
def evaluate_task(self, task_id: str) -> Dict[str, Any]:
"""
Évalue une tâche et retourne ses métriques.
Args:
task_id: ID de la tâche
Returns:
Dictionnaire de métriques
"""
if task_id not in self.tasks:
return {}
task = self.tasks[task_id]
return {
"task_id": task_id,
"task_name": task.task_name,
"mode": task.mode,
"observation_count": task.observation_count,
"concordance_rate": task.concordance_rate,
"confidence_score": task.confidence_score,
"correction_count": task.correction_count,
"correction_rate": (
task.correction_count / max(1, task.observation_count)
),
"last_execution": task.last_execution.isoformat() if task.last_execution else None
}
def should_transition_to_auto(self, task_id: str) -> bool:
"""
Vérifie si une tâche remplit les critères pour passer en Autopilot.
Args:
task_id: ID de la tâche
Returns:
True si éligible, False sinon
"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
return (
task.observation_count >= self.autopilot_observations and
task.concordance_rate >= self.autopilot_concordance
)
def rollback_if_low_confidence(self, task_id: str):
"""
Rétrograde une tâche au mode Assisté si la confiance est faible.
Args:
task_id: ID de la tâche
"""
if task_id not in self.tasks:
return
task = self.tasks[task_id]
if task.mode == "auto" and task.confidence_score < self.confidence_min:
self._transition_mode(task_id, "assist")
self.logger.log_mode_transition(
task_id,
"auto",
"assist",
f"Confiance faible: {task.confidence_score:.2f}"
)
def _check_mode_transitions(self, task_id: str):
"""Vérifie et effectue les transitions de mode si nécessaire."""
if task_id not in self.tasks:
return
task = self.tasks[task_id]
# Shadow → Assist (après 5 observations)
if task.mode == "shadow" and task.observation_count >= 5:
self._transition_mode(task_id, "assist")
# Assist → Auto (si critères remplis)
elif task.mode == "assist" and self.should_transition_to_auto(task_id):
self._transition_mode(task_id, "auto")
# Auto → Assist (si confiance faible)
elif task.mode == "auto" and task.concordance_rate < self.rollback_confidence:
self._transition_mode(task_id, "assist")
def _transition_mode(self, task_id: str, new_mode: str):
"""Effectue une transition de mode."""
if task_id not in self.tasks:
return
task = self.tasks[task_id]
old_mode = task.mode
if old_mode == new_mode:
return
task.mode = new_mode
self.logger.log_mode_transition(
task_id,
old_mode,
new_mode,
f"Observations: {task.observation_count}, Concordance: {task.concordance_rate:.2%}"
)
self._save_profile(task_id)
def get_mode(self) -> str:
"""Retourne le mode opérationnel actuel."""
if self.current_task_id and self.current_task_id in self.tasks:
return self.tasks[self.current_task_id].mode
return self.mode
def get_current_intent(self) -> str:
"""Retourne l'intention actuelle."""
return self.current_context.get("intent", "")
def set_current_intent(self, intent: str):
"""
Définit l'intention actuelle.
Args:
intent: Intention à définir (ex: "button", "text field", "form")
"""
self.current_context["intent"] = intent
self.logger.log_action({
"action": "intent_set",
"intent": intent,
"mode": self.mode
})
def create_task_from_signatures(
self,
signatures: List[Dict[str, Any]],
description: str = "Tâche automatique"
) -> TaskProfile:
"""
Crée une tâche à partir de signatures visuelles.
Args:
signatures: Liste de signatures d'actions
description: Description de la tâche
Returns:
TaskProfile créé
"""
import hashlib
import json
# Générer un ID unique
task_id = hashlib.md5(
json.dumps(str(signatures[0])).encode()
).hexdigest()[:8]
# Créer le profil de tâche
task = TaskProfile(
task_id=f"task_{task_id}",
task_name=description,
window_whitelist=[signatures[0].get("window", "Unknown")],
observation_count=len(signatures),
embeddings=[sig.get("embedding") for sig in signatures if sig.get("embedding") is not None],
metadata={"signatures": signatures} # Stocker les signatures dans metadata
)
# Ajouter aux tâches
self.tasks[task.task_id] = task
# Sauvegarder
self._save_task(task)
self.logger.log_action({
"action": "task_created",
"task_id": task.task_id,
"signatures_count": len(signatures)
})
return task
def _save_task(self, task: TaskProfile):
"""Sauvegarde une tâche sur disque."""
import json
import pickle
from pathlib import Path
task_dir = self.profiles_path / task.task_id
task_dir.mkdir(parents=True, exist_ok=True)
# Sauvegarder les métadonnées
metadata = {
"task_id": task.task_id,
"task_name": task.task_name,
"window_whitelist": task.window_whitelist,
"observation_count": task.observation_count,
"mode": task.mode,
"confidence_score": task.confidence_score
}
with open(task_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
# Sauvegarder les signatures (depuis metadata)
if "signatures" in task.metadata:
with open(task_dir / "signatures.pkl", "wb") as f:
pickle.dump(task.metadata["signatures"], f)
# Sauvegarder l'index FAISS automatiquement (MVP)
try:
self.embeddings_manager.save_index()
self.logger.log_action({
"action": "faiss_index_saved",
"task_id": task.task_id
})
except Exception as e:
self.logger.log_action({
"action": "faiss_index_save_error",
"task_id": task.task_id,
"error": str(e)
})
def load_task(self, task_id: str) -> Optional[TaskProfile]:
"""Charge une tâche depuis le disque."""
import json
import pickle
task_dir = self.profiles_path / task_id
if not task_dir.exists():
return None
try:
# Charger métadonnées
with open(task_dir / "metadata.json", "r") as f:
metadata = json.load(f)
task = TaskProfile(
task_id=metadata["task_id"],
task_name=metadata.get("task_name", metadata.get("description", "Unknown")),
window_whitelist=metadata.get("window_whitelist", [metadata.get("window_title", "Unknown")]),
observation_count=metadata.get("observation_count", metadata.get("observations", 0)),
mode=metadata.get("mode", "shadow"),
confidence_score=metadata.get("confidence_score", 0.0)
)
# Charger signatures dans metadata
sig_file = task_dir / "signatures.pkl"
if sig_file.exists():
with open(sig_file, "rb") as f:
task.metadata["signatures"] = pickle.load(f)
return task
except Exception as e:
self.logger.log_action({
"action": "task_load_failed",
"task_id": task_id,
"error": str(e)
})
return None
def set_current_task(self, task_id: str):
"""Définit la tâche actuelle."""
self.current_task_id = task_id
def set_current_context(self, context: Dict[str, Any]):
"""Définit le contexte actuel."""
self.current_context = context
def record_execution(self, decision: Dict[str, Any]):
"""
Enregistre l'exécution d'une action.
Args:
decision: Dictionnaire décrivant la décision et l'action
"""
task_id = decision.get("task_id", self.current_task_id)
if not task_id or task_id not in self.tasks:
return
task = self.tasks[task_id]
task.last_execution = datetime.now()
# Mettre à jour le score de confiance
if "confidence" in decision:
task.confidence_score = decision["confidence"]
self.logger.log_action({
"action": "execution_recorded",
"task_id": task_id,
"confidence": task.confidence_score,
"mode": task.mode
})
self._save_profile(task_id)
def get_all_tasks(self) -> List[Dict[str, Any]]:
"""Retourne toutes les tâches avec leurs métriques."""
return [self.evaluate_task(task_id) for task_id in self.tasks.keys()]
def get_task_stats(self) -> Dict[str, Any]:
"""Retourne des statistiques globales."""
total_tasks = len(self.tasks)
shadow_tasks = sum(1 for t in self.tasks.values() if t.mode == "shadow")
assist_tasks = sum(1 for t in self.tasks.values() if t.mode == "assist")
auto_tasks = sum(1 for t in self.tasks.values() if t.mode == "auto")
return {
"total_tasks": total_tasks,
"shadow_tasks": shadow_tasks,
"assist_tasks": assist_tasks,
"auto_tasks": auto_tasks,
"current_mode": self.get_mode()
}
def check_ui_changes(
self,
task_id: str,
current_embedding: np.ndarray,
predicted_bbox: Optional[Tuple[int, int, int, int]] = None,
actual_bbox: Optional[Tuple[int, int, int, int]] = None
) -> Dict[str, Any]:
"""
Vérifie les changements UI pour une tâche et déclenche le ré-entraînement si nécessaire.
Args:
task_id: ID de la tâche
current_embedding: Embedding visuel actuel
predicted_bbox: Bbox prédite (optionnel)
actual_bbox: Bbox réelle (optionnel)
Returns:
Résultats de la vérification
"""
if task_id not in self.tasks:
return {
"error": "task_not_found",
"task_id": task_id
}
task = self.tasks[task_id]
# Vérifier les changements avec le détecteur
result = self.ui_change_detector.check_and_trigger_retraining(
task_id,
current_embedding,
task.embeddings,
predicted_bbox,
actual_bbox
)
# Si ré-entraînement déclenché, mettre à jour le profil
if result.get("retraining_triggered"):
# Ajouter l'embedding actuel pour améliorer la détection future
task.embeddings.append(current_embedding)
# Ajouter à l'index FAISS
self.embeddings_manager.add_to_index(
current_embedding,
{
"task_id": task_id,
"timestamp": datetime.now().isoformat(),
"is_retraining": True
}
)
# Sauvegarder le profil
self._save_profile(task_id)
# Logger l'événement
self.logger.log_action({
"action": "ui_change_retraining",
"task_id": task_id,
"similarity": result.get("similarity"),
"deltas": result.get("deltas"),
"reason": "ui_drift_detected"
})
return result
def monitor_execution_drift(
self,
task_id: str,
predicted_action: Action,
actual_action: Action
) -> bool:
"""
Surveille la dérive entre l'action prédite et l'action réelle.
Args:
task_id: ID de la tâche
predicted_action: Action prédite par le système
actual_action: Action réelle effectuée/validée
Returns:
True si dérive détectée et ré-entraînement déclenché
"""
if task_id not in self.tasks:
return False
# Vérifier les changements UI
result = self.check_ui_changes(
task_id,
actual_action.embedding,
predicted_action.bbox,
actual_action.bbox
)
return result.get("retraining_triggered", False)
def get_ui_change_stats(self) -> Dict[str, Any]:
"""
Retourne les statistiques de détection de changements UI.
Returns:
Statistiques du détecteur de changements
"""
return self.ui_change_detector.get_stats()
def _load_profiles(self):
"""Charge les profils de tâches existants."""
if not self.profiles_path.exists():
return
for profile_file in self.profiles_path.glob("*.json"):
try:
with open(profile_file, 'r', encoding='utf-8') as f:
json_str = f.read()
# Charger le profil (sans embeddings pour l'instant)
task = TaskProfile.from_json(json_str)
self.tasks[task.task_id] = task
self.logger.log_action({
"action": "profile_loaded",
"task_id": task.task_id,
"mode": task.mode
})
except Exception as e:
self.logger.log_action({
"action": "profile_load_error",
"file": str(profile_file),
"error": str(e)
})
def _load_existing_tasks_to_index(self):
"""
Charge les tâches existantes dans l'index FAISS au démarrage.
Résout le problème : index vide malgré 40 tâches sauvegardées.
"""
from .faiss_index_builder import FAISSIndexBuilder
self.logger.log_action({
"action": "load_existing_tasks_started"
})
try:
# Créer le builder
builder = FAISSIndexBuilder(
self.embeddings_manager,
self.logger,
str(self.profiles_path)
)
# Vérifier l'intégrité de l'index
report = builder.verify_index_integrity()
self.logger.log_action({
"action": "index_integrity_check",
"report": report
})
# Si l'index est vide ou incohérent, le reconstruire
if report['needs_rebuild']:
self.logger.log_action({
"action": "rebuilding_index_automatically",
"reason": "index_empty_or_inconsistent"
})
stats = builder.rebuild_index(force=True)
self.logger.log_action({
"action": "index_rebuilt_automatically",
"stats": stats
})
if stats['success']:
self.logger.log_action({
"action": "load_existing_tasks_success",
"embeddings_loaded": stats['embeddings_added'],
"tasks_processed": stats['tasks_processed']
})
else:
self.logger.log_action({
"action": "load_existing_tasks_failed",
"reason": "rebuild_failed"
})
else:
self.logger.log_action({
"action": "load_existing_tasks_skipped",
"reason": "index_already_consistent",
"index_size": report['actual_embeddings']
})
# Charger les tâches dans self.tasks si pas déjà fait
tasks_info = builder.scan_tasks()
for task_info in tasks_info:
task_id = task_info['task_id']
if task_id not in self.tasks:
# Charger la tâche
task = self.load_task(task_id)
if task:
self.tasks[task_id] = task
self.logger.log_action({
"action": "task_loaded_from_disk",
"task_id": task_id
})
except Exception as e:
self.logger.log_action({
"action": "load_existing_tasks_error",
"error": str(e)
})
# Ne pas bloquer le démarrage si erreur
import traceback
self.logger.log_action({
"action": "load_existing_tasks_traceback",
"traceback": traceback.format_exc()
})