Files
rpa_vision_v3/core/learning/continuous_learner.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

645 lines
22 KiB
Python

"""
ContinuousLearner - Apprentissage continu et adaptation
Ce module implémente l'apprentissage continu qui permet au système de:
- Mettre à jour les prototypes avec EMA (Exponential Moving Average)
- Détecter la dérive UI (drift)
- Créer et consolider des variantes
- Maintenir un historique des versions de prototypes
"""
import logging
from typing import List, Dict, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
import numpy as np
import json
logger = logging.getLogger(__name__)
# =============================================================================
# Dataclasses
# =============================================================================
@dataclass
class DriftStatus:
"""Statut de dérive UI"""
is_drifting: bool = False # Dérive détectée
drift_severity: float = 0.0 # Sévérité 0.0 - 1.0
consecutive_low_confidence: int = 0 # Matchs faibles consécutifs
recommended_action: str = "monitor" # "monitor", "create_variant", "retrain"
last_confidences: List[float] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Sérialiser en dictionnaire"""
return {
"is_drifting": self.is_drifting,
"drift_severity": self.drift_severity,
"consecutive_low_confidence": self.consecutive_low_confidence,
"recommended_action": self.recommended_action,
"last_confidences": self.last_confidences
}
@dataclass
class VersionInfo:
"""Information sur une version de prototype"""
version: int
created_at: datetime
embedding_path: str
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"version": self.version,
"created_at": self.created_at.isoformat(),
"embedding_path": self.embedding_path,
"metadata": self.metadata
}
@dataclass
class ContinuousLearnerConfig:
"""Configuration de l'apprenant continu"""
# EMA
ema_alpha: float = 0.1 # Alpha pour mise à jour EMA
# Détection de dérive
drift_confidence_threshold: float = 0.85 # Seuil de confiance pour dérive
drift_consecutive_count: int = 3 # Matchs consécutifs pour détecter dérive
# Variantes
max_variants_per_node: int = 5 # Nombre max de variantes
variant_similarity_threshold: float = 0.7 # Seuil pour créer variante
# Stockage
embeddings_dir: str = "data/embeddings/prototypes"
versions_dir: str = "data/embeddings/versions"
# =============================================================================
# Gestionnaire de Versions de Prototypes
# =============================================================================
class PrototypeVersionManager:
"""
Gère l'historique des versions de prototypes.
Permet de sauvegarder, récupérer et rollback les prototypes.
"""
def __init__(self, versions_dir: str = "data/embeddings/versions"):
"""
Initialiser le gestionnaire.
Args:
versions_dir: Répertoire pour stocker les versions
"""
self.versions_dir = Path(versions_dir)
self.versions_dir.mkdir(parents=True, exist_ok=True)
self._version_cache: Dict[str, List[VersionInfo]] = {}
logger.info(f"PrototypeVersionManager initialisé: {versions_dir}")
def save_version(
self,
node_id: str,
embedding: np.ndarray,
metadata: Optional[Dict] = None
) -> int:
"""
Sauvegarder une nouvelle version du prototype.
Args:
node_id: ID du node
embedding: Vecteur d'embedding
metadata: Métadonnées optionnelles
Returns:
Numéro de version créé
"""
# Récupérer versions existantes
versions = self.list_versions(node_id)
new_version = len(versions) + 1
# Créer chemin pour le fichier
node_dir = self.versions_dir / node_id
node_dir.mkdir(parents=True, exist_ok=True)
embedding_path = node_dir / f"v{new_version:04d}.npy"
metadata_path = node_dir / f"v{new_version:04d}_meta.json"
# Sauvegarder embedding
np.save(str(embedding_path), embedding)
# Sauvegarder métadonnées
version_info = VersionInfo(
version=new_version,
created_at=datetime.now(),
embedding_path=str(embedding_path),
metadata=metadata or {}
)
with open(metadata_path, 'w') as f:
json.dump(version_info.to_dict(), f, indent=2)
# Mettre à jour cache
if node_id not in self._version_cache:
self._version_cache[node_id] = []
self._version_cache[node_id].append(version_info)
logger.info(f"Version {new_version} sauvegardée pour node {node_id}")
return new_version
def get_version(self, node_id: str, version: int) -> Optional[np.ndarray]:
"""
Récupérer une version spécifique du prototype.
Args:
node_id: ID du node
version: Numéro de version
Returns:
Embedding ou None si non trouvé
"""
embedding_path = self.versions_dir / node_id / f"v{version:04d}.npy"
if embedding_path.exists():
return np.load(str(embedding_path))
logger.warning(f"Version {version} non trouvée pour node {node_id}")
return None
def list_versions(self, node_id: str) -> List[VersionInfo]:
"""
Lister toutes les versions d'un node.
Args:
node_id: ID du node
Returns:
Liste des VersionInfo
"""
# Vérifier cache
if node_id in self._version_cache:
return self._version_cache[node_id]
versions = []
node_dir = self.versions_dir / node_id
if node_dir.exists():
for meta_file in sorted(node_dir.glob("v*_meta.json")):
try:
with open(meta_file, 'r') as f:
data = json.load(f)
versions.append(VersionInfo(
version=data['version'],
created_at=datetime.fromisoformat(data['created_at']),
embedding_path=data['embedding_path'],
metadata=data.get('metadata', {})
))
except Exception as e:
logger.warning(f"Erreur lecture version {meta_file}: {e}")
self._version_cache[node_id] = versions
return versions
def get_latest_version(self, node_id: str) -> Optional[Tuple[int, np.ndarray]]:
"""
Récupérer la dernière version du prototype.
Returns:
Tuple (version, embedding) ou None
"""
versions = self.list_versions(node_id)
if not versions:
return None
latest = versions[-1]
embedding = self.get_version(node_id, latest.version)
if embedding is not None:
return (latest.version, embedding)
return None
# =============================================================================
# Apprenant Continu
# =============================================================================
class ContinuousLearner:
"""
Apprentissage continu et adaptation aux changements UI.
Fonctionnalités:
- Mise à jour des prototypes avec EMA
- Détection de dérive UI
- Création et consolidation de variantes
- Rollback vers versions précédentes
Example:
>>> learner = ContinuousLearner()
>>> learner.update_prototype("node_001", new_embedding, success=True)
>>> drift = learner.detect_drift("node_001", [0.7, 0.6, 0.5])
>>> if drift.is_drifting:
... learner.create_variant("node_001", variant_embedding)
"""
def __init__(self, config: Optional[ContinuousLearnerConfig] = None):
"""
Initialiser l'apprenant.
Args:
config: Configuration (utilise défaut si None)
"""
self.config = config or ContinuousLearnerConfig()
self.version_manager = PrototypeVersionManager(self.config.versions_dir)
# Cache des prototypes actuels
self._prototypes: Dict[str, np.ndarray] = {}
# Historique des confidences par node
self._confidence_history: Dict[str, List[float]] = {}
# Variantes par node
self._variants: Dict[str, List[Dict]] = {}
# Créer répertoire embeddings
Path(self.config.embeddings_dir).mkdir(parents=True, exist_ok=True)
logger.info(f"ContinuousLearner initialisé (alpha={self.config.ema_alpha})")
def update_prototype(
self,
node_id: str,
new_embedding: np.ndarray,
execution_success: bool = True
) -> np.ndarray:
"""
Mettre à jour le prototype d'un node avec EMA.
Formule: new_prototype = (1 - alpha) * old_prototype + alpha * new_embedding
Args:
node_id: ID du node
new_embedding: Nouvel embedding observé
execution_success: True si l'exécution a réussi
Returns:
Nouveau prototype mis à jour
"""
# Récupérer prototype actuel
current_prototype = self._get_prototype(node_id)
if current_prototype is None:
# Premier prototype
updated_prototype = new_embedding.copy()
logger.info(f"Premier prototype créé pour node {node_id}")
else:
# Mise à jour EMA
alpha = self.config.ema_alpha
# Réduire alpha si échec (moins de poids au nouvel embedding)
if not execution_success:
alpha = alpha * 0.5
updated_prototype = (1 - alpha) * current_prototype + alpha * new_embedding
# Normaliser
norm = np.linalg.norm(updated_prototype)
if norm > 0:
updated_prototype = updated_prototype / norm
# Sauvegarder nouvelle version
self.version_manager.save_version(
node_id,
updated_prototype,
metadata={
"execution_success": execution_success,
"alpha_used": self.config.ema_alpha if execution_success else self.config.ema_alpha * 0.5
}
)
# Mettre à jour cache
self._prototypes[node_id] = updated_prototype
# Sauvegarder prototype actuel
self._save_current_prototype(node_id, updated_prototype)
logger.debug(f"Prototype mis à jour pour node {node_id}")
return updated_prototype
def detect_drift(
self,
node_id: str,
recent_confidences: List[float]
) -> DriftStatus:
"""
Détecter la dérive UI pour un node.
Signale une dérive si N matchs consécutifs ont une confiance < seuil.
Args:
node_id: ID du node
recent_confidences: Confidences des derniers matchs
Returns:
DriftStatus avec diagnostic
"""
# Mettre à jour historique
if node_id not in self._confidence_history:
self._confidence_history[node_id] = []
self._confidence_history[node_id].extend(recent_confidences)
# Garder seulement les N dernières
max_history = 20
self._confidence_history[node_id] = self._confidence_history[node_id][-max_history:]
# Compter matchs consécutifs à faible confiance
consecutive_low = 0
threshold = self.config.drift_confidence_threshold
for conf in reversed(self._confidence_history[node_id]):
if conf < threshold:
consecutive_low += 1
else:
break
# Déterminer si dérive
is_drifting = consecutive_low >= self.config.drift_consecutive_count
# Calculer sévérité
if is_drifting:
recent = self._confidence_history[node_id][-consecutive_low:]
avg_confidence = np.mean(recent)
drift_severity = 1.0 - (avg_confidence / threshold)
else:
drift_severity = 0.0
# Recommander action
if is_drifting:
if drift_severity > 0.5:
recommended_action = "retrain"
else:
recommended_action = "create_variant"
else:
recommended_action = "monitor"
status = DriftStatus(
is_drifting=is_drifting,
drift_severity=drift_severity,
consecutive_low_confidence=consecutive_low,
recommended_action=recommended_action,
last_confidences=self._confidence_history[node_id][-5:]
)
if is_drifting:
logger.warning(
f"Dérive détectée pour node {node_id}: "
f"severity={drift_severity:.2f}, action={recommended_action}"
)
return status
def create_variant(
self,
node_id: str,
variant_embedding: np.ndarray,
metadata: Optional[Dict] = None
) -> str:
"""
Créer une nouvelle variante pour un node.
Args:
node_id: ID du node
variant_embedding: Embedding de la variante
metadata: Métadonnées optionnelles
Returns:
ID de la variante créée
"""
if node_id not in self._variants:
self._variants[node_id] = []
# Vérifier limite de variantes
if len(self._variants[node_id]) >= self.config.max_variants_per_node:
logger.warning(
f"Limite de variantes atteinte pour node {node_id}, "
f"consolidation nécessaire"
)
self.consolidate_variants(node_id)
# Créer ID de variante
variant_id = f"{node_id}_var_{len(self._variants[node_id]) + 1:03d}"
# Normaliser embedding
norm = np.linalg.norm(variant_embedding)
if norm > 0:
variant_embedding = variant_embedding / norm
# Calculer similarité avec prototype principal
primary_prototype = self._get_prototype(node_id)
if primary_prototype is not None:
similarity = self._cosine_similarity(variant_embedding, primary_prototype)
else:
similarity = 0.0
# Sauvegarder variante
variant_path = Path(self.config.embeddings_dir) / f"{variant_id}.npy"
np.save(str(variant_path), variant_embedding)
variant_info = {
"variant_id": variant_id,
"embedding_path": str(variant_path),
"similarity_to_primary": similarity,
"created_at": datetime.now().isoformat(),
"metadata": metadata or {}
}
self._variants[node_id].append(variant_info)
logger.info(
f"Variante {variant_id} créée pour node {node_id} "
f"(similarité={similarity:.3f})"
)
return variant_id
def consolidate_variants(self, node_id: str) -> None:
"""
Consolider les variantes d'un node par re-clustering.
Réduit le nombre de variantes en fusionnant les plus similaires.
Args:
node_id: ID du node
"""
if node_id not in self._variants or len(self._variants[node_id]) < 2:
return
logger.info(f"Consolidation des variantes pour node {node_id}")
# Charger tous les embeddings de variantes
embeddings = []
for var_info in self._variants[node_id]:
try:
emb = np.load(var_info['embedding_path'])
embeddings.append(emb)
except Exception as e:
logger.warning(f"Erreur chargement variante: {e}")
if len(embeddings) < 2:
return
# Clustering simple: fusionner variantes très similaires
embeddings_array = np.array(embeddings)
# Calculer matrice de similarité
n = len(embeddings)
similarity_matrix = np.zeros((n, n))
for i in range(n):
for j in range(n):
similarity_matrix[i, j] = self._cosine_similarity(
embeddings_array[i], embeddings_array[j]
)
# Fusionner variantes avec similarité > 0.9
merged_indices = set()
new_variants = []
for i in range(n):
if i in merged_indices:
continue
# Trouver variantes similaires
similar = [i]
for j in range(i + 1, n):
if j not in merged_indices and similarity_matrix[i, j] > 0.9:
similar.append(j)
merged_indices.add(j)
# Fusionner en calculant la moyenne
merged_embedding = np.mean([embeddings_array[k] for k in similar], axis=0)
merged_embedding = merged_embedding / np.linalg.norm(merged_embedding)
# Créer nouvelle variante consolidée
new_variant_id = f"{node_id}_var_c{len(new_variants) + 1:03d}"
variant_path = Path(self.config.embeddings_dir) / f"{new_variant_id}.npy"
np.save(str(variant_path), merged_embedding)
new_variants.append({
"variant_id": new_variant_id,
"embedding_path": str(variant_path),
"similarity_to_primary": 0.0, # Sera recalculé
"created_at": datetime.now().isoformat(),
"metadata": {"consolidated_from": similar}
})
# Remplacer variantes
self._variants[node_id] = new_variants
logger.info(
f"Consolidation terminée: {n} -> {len(new_variants)} variantes"
)
def rollback_prototype(self, node_id: str, version: int) -> bool:
"""
Restaurer une version précédente du prototype.
Args:
node_id: ID du node
version: Numéro de version à restaurer
Returns:
True si rollback réussi
"""
embedding = self.version_manager.get_version(node_id, version)
if embedding is None:
logger.error(f"Version {version} non trouvée pour node {node_id}")
return False
# Mettre à jour cache
self._prototypes[node_id] = embedding
# Sauvegarder comme prototype actuel
self._save_current_prototype(node_id, embedding)
logger.info(f"Rollback vers version {version} pour node {node_id}")
return True
def get_variants(self, node_id: str) -> List[Dict]:
"""Récupérer les variantes d'un node."""
return self._variants.get(node_id, [])
def _get_prototype(self, node_id: str) -> Optional[np.ndarray]:
"""Récupérer le prototype actuel d'un node."""
# Vérifier cache
if node_id in self._prototypes:
return self._prototypes[node_id]
# Charger depuis fichier
prototype_path = Path(self.config.embeddings_dir) / f"{node_id}_current.npy"
if prototype_path.exists():
prototype = np.load(str(prototype_path))
self._prototypes[node_id] = prototype
return prototype
# Essayer dernière version
latest = self.version_manager.get_latest_version(node_id)
if latest:
_, embedding = latest
self._prototypes[node_id] = embedding
return embedding
return None
def _save_current_prototype(self, node_id: str, embedding: np.ndarray) -> None:
"""Sauvegarder le prototype actuel."""
prototype_path = Path(self.config.embeddings_dir) / f"{node_id}_current.npy"
np.save(str(prototype_path), embedding)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculer similarité cosinus."""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))
def get_config(self) -> ContinuousLearnerConfig:
"""Récupérer la configuration."""
return self.config
# =============================================================================
# Fonctions utilitaires
# =============================================================================
def create_learner(
ema_alpha: float = 0.1,
drift_threshold: float = 0.85,
drift_count: int = 3
) -> ContinuousLearner:
"""
Créer un apprenant avec configuration personnalisée.
Args:
ema_alpha: Alpha pour EMA
drift_threshold: Seuil de confiance pour dérive
drift_count: Matchs consécutifs pour détecter dérive
Returns:
ContinuousLearner configuré
"""
config = ContinuousLearnerConfig(
ema_alpha=ema_alpha,
drift_confidence_threshold=drift_threshold,
drift_consecutive_count=drift_count
)
return ContinuousLearner(config)