- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
645 lines
22 KiB
Python
645 lines
22 KiB
Python
"""
|
|
ContinuousLearner - Apprentissage continu et adaptation
|
|
|
|
Ce module implémente l'apprentissage continu qui permet au système de:
|
|
- Mettre à jour les prototypes avec EMA (Exponential Moving Average)
|
|
- Détecter la dérive UI (drift)
|
|
- Créer et consolider des variantes
|
|
- Maintenir un historique des versions de prototypes
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Optional, Any, Tuple
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Dataclasses
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class DriftStatus:
|
|
"""Statut de dérive UI"""
|
|
is_drifting: bool = False # Dérive détectée
|
|
drift_severity: float = 0.0 # Sévérité 0.0 - 1.0
|
|
consecutive_low_confidence: int = 0 # Matchs faibles consécutifs
|
|
recommended_action: str = "monitor" # "monitor", "create_variant", "retrain"
|
|
last_confidences: List[float] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Sérialiser en dictionnaire"""
|
|
return {
|
|
"is_drifting": self.is_drifting,
|
|
"drift_severity": self.drift_severity,
|
|
"consecutive_low_confidence": self.consecutive_low_confidence,
|
|
"recommended_action": self.recommended_action,
|
|
"last_confidences": self.last_confidences
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class VersionInfo:
|
|
"""Information sur une version de prototype"""
|
|
version: int
|
|
created_at: datetime
|
|
embedding_path: str
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"version": self.version,
|
|
"created_at": self.created_at.isoformat(),
|
|
"embedding_path": self.embedding_path,
|
|
"metadata": self.metadata
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ContinuousLearnerConfig:
|
|
"""Configuration de l'apprenant continu"""
|
|
# EMA
|
|
ema_alpha: float = 0.1 # Alpha pour mise à jour EMA
|
|
|
|
# Détection de dérive
|
|
drift_confidence_threshold: float = 0.85 # Seuil de confiance pour dérive
|
|
drift_consecutive_count: int = 3 # Matchs consécutifs pour détecter dérive
|
|
|
|
# Variantes
|
|
max_variants_per_node: int = 5 # Nombre max de variantes
|
|
variant_similarity_threshold: float = 0.7 # Seuil pour créer variante
|
|
|
|
# Stockage
|
|
embeddings_dir: str = "data/embeddings/prototypes"
|
|
versions_dir: str = "data/embeddings/versions"
|
|
|
|
|
|
# =============================================================================
|
|
# Gestionnaire de Versions de Prototypes
|
|
# =============================================================================
|
|
|
|
class PrototypeVersionManager:
|
|
"""
|
|
Gère l'historique des versions de prototypes.
|
|
|
|
Permet de sauvegarder, récupérer et rollback les prototypes.
|
|
"""
|
|
|
|
def __init__(self, versions_dir: str = "data/embeddings/versions"):
|
|
"""
|
|
Initialiser le gestionnaire.
|
|
|
|
Args:
|
|
versions_dir: Répertoire pour stocker les versions
|
|
"""
|
|
self.versions_dir = Path(versions_dir)
|
|
self.versions_dir.mkdir(parents=True, exist_ok=True)
|
|
self._version_cache: Dict[str, List[VersionInfo]] = {}
|
|
|
|
logger.info(f"PrototypeVersionManager initialisé: {versions_dir}")
|
|
|
|
def save_version(
|
|
self,
|
|
node_id: str,
|
|
embedding: np.ndarray,
|
|
metadata: Optional[Dict] = None
|
|
) -> int:
|
|
"""
|
|
Sauvegarder une nouvelle version du prototype.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
embedding: Vecteur d'embedding
|
|
metadata: Métadonnées optionnelles
|
|
|
|
Returns:
|
|
Numéro de version créé
|
|
"""
|
|
# Récupérer versions existantes
|
|
versions = self.list_versions(node_id)
|
|
new_version = len(versions) + 1
|
|
|
|
# Créer chemin pour le fichier
|
|
node_dir = self.versions_dir / node_id
|
|
node_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
embedding_path = node_dir / f"v{new_version:04d}.npy"
|
|
metadata_path = node_dir / f"v{new_version:04d}_meta.json"
|
|
|
|
# Sauvegarder embedding
|
|
np.save(str(embedding_path), embedding)
|
|
|
|
# Sauvegarder métadonnées
|
|
version_info = VersionInfo(
|
|
version=new_version,
|
|
created_at=datetime.now(),
|
|
embedding_path=str(embedding_path),
|
|
metadata=metadata or {}
|
|
)
|
|
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(version_info.to_dict(), f, indent=2)
|
|
|
|
# Mettre à jour cache
|
|
if node_id not in self._version_cache:
|
|
self._version_cache[node_id] = []
|
|
self._version_cache[node_id].append(version_info)
|
|
|
|
logger.info(f"Version {new_version} sauvegardée pour node {node_id}")
|
|
return new_version
|
|
|
|
def get_version(self, node_id: str, version: int) -> Optional[np.ndarray]:
|
|
"""
|
|
Récupérer une version spécifique du prototype.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
version: Numéro de version
|
|
|
|
Returns:
|
|
Embedding ou None si non trouvé
|
|
"""
|
|
embedding_path = self.versions_dir / node_id / f"v{version:04d}.npy"
|
|
|
|
if embedding_path.exists():
|
|
return np.load(str(embedding_path))
|
|
|
|
logger.warning(f"Version {version} non trouvée pour node {node_id}")
|
|
return None
|
|
|
|
def list_versions(self, node_id: str) -> List[VersionInfo]:
|
|
"""
|
|
Lister toutes les versions d'un node.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
|
|
Returns:
|
|
Liste des VersionInfo
|
|
"""
|
|
# Vérifier cache
|
|
if node_id in self._version_cache:
|
|
return self._version_cache[node_id]
|
|
|
|
versions = []
|
|
node_dir = self.versions_dir / node_id
|
|
|
|
if node_dir.exists():
|
|
for meta_file in sorted(node_dir.glob("v*_meta.json")):
|
|
try:
|
|
with open(meta_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
versions.append(VersionInfo(
|
|
version=data['version'],
|
|
created_at=datetime.fromisoformat(data['created_at']),
|
|
embedding_path=data['embedding_path'],
|
|
metadata=data.get('metadata', {})
|
|
))
|
|
except Exception as e:
|
|
logger.warning(f"Erreur lecture version {meta_file}: {e}")
|
|
|
|
self._version_cache[node_id] = versions
|
|
return versions
|
|
|
|
def get_latest_version(self, node_id: str) -> Optional[Tuple[int, np.ndarray]]:
|
|
"""
|
|
Récupérer la dernière version du prototype.
|
|
|
|
Returns:
|
|
Tuple (version, embedding) ou None
|
|
"""
|
|
versions = self.list_versions(node_id)
|
|
if not versions:
|
|
return None
|
|
|
|
latest = versions[-1]
|
|
embedding = self.get_version(node_id, latest.version)
|
|
|
|
if embedding is not None:
|
|
return (latest.version, embedding)
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Apprenant Continu
|
|
# =============================================================================
|
|
|
|
class ContinuousLearner:
|
|
"""
|
|
Apprentissage continu et adaptation aux changements UI.
|
|
|
|
Fonctionnalités:
|
|
- Mise à jour des prototypes avec EMA
|
|
- Détection de dérive UI
|
|
- Création et consolidation de variantes
|
|
- Rollback vers versions précédentes
|
|
|
|
Example:
|
|
>>> learner = ContinuousLearner()
|
|
>>> learner.update_prototype("node_001", new_embedding, success=True)
|
|
>>> drift = learner.detect_drift("node_001", [0.7, 0.6, 0.5])
|
|
>>> if drift.is_drifting:
|
|
... learner.create_variant("node_001", variant_embedding)
|
|
"""
|
|
|
|
def __init__(self, config: Optional[ContinuousLearnerConfig] = None):
|
|
"""
|
|
Initialiser l'apprenant.
|
|
|
|
Args:
|
|
config: Configuration (utilise défaut si None)
|
|
"""
|
|
self.config = config or ContinuousLearnerConfig()
|
|
self.version_manager = PrototypeVersionManager(self.config.versions_dir)
|
|
|
|
# Cache des prototypes actuels
|
|
self._prototypes: Dict[str, np.ndarray] = {}
|
|
|
|
# Historique des confidences par node
|
|
self._confidence_history: Dict[str, List[float]] = {}
|
|
|
|
# Variantes par node
|
|
self._variants: Dict[str, List[Dict]] = {}
|
|
|
|
# Créer répertoire embeddings
|
|
Path(self.config.embeddings_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
logger.info(f"ContinuousLearner initialisé (alpha={self.config.ema_alpha})")
|
|
|
|
def update_prototype(
|
|
self,
|
|
node_id: str,
|
|
new_embedding: np.ndarray,
|
|
execution_success: bool = True
|
|
) -> np.ndarray:
|
|
"""
|
|
Mettre à jour le prototype d'un node avec EMA.
|
|
|
|
Formule: new_prototype = (1 - alpha) * old_prototype + alpha * new_embedding
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
new_embedding: Nouvel embedding observé
|
|
execution_success: True si l'exécution a réussi
|
|
|
|
Returns:
|
|
Nouveau prototype mis à jour
|
|
"""
|
|
# Récupérer prototype actuel
|
|
current_prototype = self._get_prototype(node_id)
|
|
|
|
if current_prototype is None:
|
|
# Premier prototype
|
|
updated_prototype = new_embedding.copy()
|
|
logger.info(f"Premier prototype créé pour node {node_id}")
|
|
else:
|
|
# Mise à jour EMA
|
|
alpha = self.config.ema_alpha
|
|
|
|
# Réduire alpha si échec (moins de poids au nouvel embedding)
|
|
if not execution_success:
|
|
alpha = alpha * 0.5
|
|
|
|
updated_prototype = (1 - alpha) * current_prototype + alpha * new_embedding
|
|
|
|
# Normaliser
|
|
norm = np.linalg.norm(updated_prototype)
|
|
if norm > 0:
|
|
updated_prototype = updated_prototype / norm
|
|
|
|
# Sauvegarder nouvelle version
|
|
self.version_manager.save_version(
|
|
node_id,
|
|
updated_prototype,
|
|
metadata={
|
|
"execution_success": execution_success,
|
|
"alpha_used": self.config.ema_alpha if execution_success else self.config.ema_alpha * 0.5
|
|
}
|
|
)
|
|
|
|
# Mettre à jour cache
|
|
self._prototypes[node_id] = updated_prototype
|
|
|
|
# Sauvegarder prototype actuel
|
|
self._save_current_prototype(node_id, updated_prototype)
|
|
|
|
logger.debug(f"Prototype mis à jour pour node {node_id}")
|
|
return updated_prototype
|
|
|
|
def detect_drift(
|
|
self,
|
|
node_id: str,
|
|
recent_confidences: List[float]
|
|
) -> DriftStatus:
|
|
"""
|
|
Détecter la dérive UI pour un node.
|
|
|
|
Signale une dérive si N matchs consécutifs ont une confiance < seuil.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
recent_confidences: Confidences des derniers matchs
|
|
|
|
Returns:
|
|
DriftStatus avec diagnostic
|
|
"""
|
|
# Mettre à jour historique
|
|
if node_id not in self._confidence_history:
|
|
self._confidence_history[node_id] = []
|
|
|
|
self._confidence_history[node_id].extend(recent_confidences)
|
|
|
|
# Garder seulement les N dernières
|
|
max_history = 20
|
|
self._confidence_history[node_id] = self._confidence_history[node_id][-max_history:]
|
|
|
|
# Compter matchs consécutifs à faible confiance
|
|
consecutive_low = 0
|
|
threshold = self.config.drift_confidence_threshold
|
|
|
|
for conf in reversed(self._confidence_history[node_id]):
|
|
if conf < threshold:
|
|
consecutive_low += 1
|
|
else:
|
|
break
|
|
|
|
# Déterminer si dérive
|
|
is_drifting = consecutive_low >= self.config.drift_consecutive_count
|
|
|
|
# Calculer sévérité
|
|
if is_drifting:
|
|
recent = self._confidence_history[node_id][-consecutive_low:]
|
|
avg_confidence = np.mean(recent)
|
|
drift_severity = 1.0 - (avg_confidence / threshold)
|
|
else:
|
|
drift_severity = 0.0
|
|
|
|
# Recommander action
|
|
if is_drifting:
|
|
if drift_severity > 0.5:
|
|
recommended_action = "retrain"
|
|
else:
|
|
recommended_action = "create_variant"
|
|
else:
|
|
recommended_action = "monitor"
|
|
|
|
status = DriftStatus(
|
|
is_drifting=is_drifting,
|
|
drift_severity=drift_severity,
|
|
consecutive_low_confidence=consecutive_low,
|
|
recommended_action=recommended_action,
|
|
last_confidences=self._confidence_history[node_id][-5:]
|
|
)
|
|
|
|
if is_drifting:
|
|
logger.warning(
|
|
f"Dérive détectée pour node {node_id}: "
|
|
f"severity={drift_severity:.2f}, action={recommended_action}"
|
|
)
|
|
|
|
return status
|
|
|
|
def create_variant(
|
|
self,
|
|
node_id: str,
|
|
variant_embedding: np.ndarray,
|
|
metadata: Optional[Dict] = None
|
|
) -> str:
|
|
"""
|
|
Créer une nouvelle variante pour un node.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
variant_embedding: Embedding de la variante
|
|
metadata: Métadonnées optionnelles
|
|
|
|
Returns:
|
|
ID de la variante créée
|
|
"""
|
|
if node_id not in self._variants:
|
|
self._variants[node_id] = []
|
|
|
|
# Vérifier limite de variantes
|
|
if len(self._variants[node_id]) >= self.config.max_variants_per_node:
|
|
logger.warning(
|
|
f"Limite de variantes atteinte pour node {node_id}, "
|
|
f"consolidation nécessaire"
|
|
)
|
|
self.consolidate_variants(node_id)
|
|
|
|
# Créer ID de variante
|
|
variant_id = f"{node_id}_var_{len(self._variants[node_id]) + 1:03d}"
|
|
|
|
# Normaliser embedding
|
|
norm = np.linalg.norm(variant_embedding)
|
|
if norm > 0:
|
|
variant_embedding = variant_embedding / norm
|
|
|
|
# Calculer similarité avec prototype principal
|
|
primary_prototype = self._get_prototype(node_id)
|
|
if primary_prototype is not None:
|
|
similarity = self._cosine_similarity(variant_embedding, primary_prototype)
|
|
else:
|
|
similarity = 0.0
|
|
|
|
# Sauvegarder variante
|
|
variant_path = Path(self.config.embeddings_dir) / f"{variant_id}.npy"
|
|
np.save(str(variant_path), variant_embedding)
|
|
|
|
variant_info = {
|
|
"variant_id": variant_id,
|
|
"embedding_path": str(variant_path),
|
|
"similarity_to_primary": similarity,
|
|
"created_at": datetime.now().isoformat(),
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
self._variants[node_id].append(variant_info)
|
|
|
|
logger.info(
|
|
f"Variante {variant_id} créée pour node {node_id} "
|
|
f"(similarité={similarity:.3f})"
|
|
)
|
|
|
|
return variant_id
|
|
|
|
def consolidate_variants(self, node_id: str) -> None:
|
|
"""
|
|
Consolider les variantes d'un node par re-clustering.
|
|
|
|
Réduit le nombre de variantes en fusionnant les plus similaires.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
"""
|
|
if node_id not in self._variants or len(self._variants[node_id]) < 2:
|
|
return
|
|
|
|
logger.info(f"Consolidation des variantes pour node {node_id}")
|
|
|
|
# Charger tous les embeddings de variantes
|
|
embeddings = []
|
|
for var_info in self._variants[node_id]:
|
|
try:
|
|
emb = np.load(var_info['embedding_path'])
|
|
embeddings.append(emb)
|
|
except Exception as e:
|
|
logger.warning(f"Erreur chargement variante: {e}")
|
|
|
|
if len(embeddings) < 2:
|
|
return
|
|
|
|
# Clustering simple: fusionner variantes très similaires
|
|
embeddings_array = np.array(embeddings)
|
|
|
|
# Calculer matrice de similarité
|
|
n = len(embeddings)
|
|
similarity_matrix = np.zeros((n, n))
|
|
for i in range(n):
|
|
for j in range(n):
|
|
similarity_matrix[i, j] = self._cosine_similarity(
|
|
embeddings_array[i], embeddings_array[j]
|
|
)
|
|
|
|
# Fusionner variantes avec similarité > 0.9
|
|
merged_indices = set()
|
|
new_variants = []
|
|
|
|
for i in range(n):
|
|
if i in merged_indices:
|
|
continue
|
|
|
|
# Trouver variantes similaires
|
|
similar = [i]
|
|
for j in range(i + 1, n):
|
|
if j not in merged_indices and similarity_matrix[i, j] > 0.9:
|
|
similar.append(j)
|
|
merged_indices.add(j)
|
|
|
|
# Fusionner en calculant la moyenne
|
|
merged_embedding = np.mean([embeddings_array[k] for k in similar], axis=0)
|
|
merged_embedding = merged_embedding / np.linalg.norm(merged_embedding)
|
|
|
|
# Créer nouvelle variante consolidée
|
|
new_variant_id = f"{node_id}_var_c{len(new_variants) + 1:03d}"
|
|
variant_path = Path(self.config.embeddings_dir) / f"{new_variant_id}.npy"
|
|
np.save(str(variant_path), merged_embedding)
|
|
|
|
new_variants.append({
|
|
"variant_id": new_variant_id,
|
|
"embedding_path": str(variant_path),
|
|
"similarity_to_primary": 0.0, # Sera recalculé
|
|
"created_at": datetime.now().isoformat(),
|
|
"metadata": {"consolidated_from": similar}
|
|
})
|
|
|
|
# Remplacer variantes
|
|
self._variants[node_id] = new_variants
|
|
|
|
logger.info(
|
|
f"Consolidation terminée: {n} -> {len(new_variants)} variantes"
|
|
)
|
|
|
|
def rollback_prototype(self, node_id: str, version: int) -> bool:
|
|
"""
|
|
Restaurer une version précédente du prototype.
|
|
|
|
Args:
|
|
node_id: ID du node
|
|
version: Numéro de version à restaurer
|
|
|
|
Returns:
|
|
True si rollback réussi
|
|
"""
|
|
embedding = self.version_manager.get_version(node_id, version)
|
|
|
|
if embedding is None:
|
|
logger.error(f"Version {version} non trouvée pour node {node_id}")
|
|
return False
|
|
|
|
# Mettre à jour cache
|
|
self._prototypes[node_id] = embedding
|
|
|
|
# Sauvegarder comme prototype actuel
|
|
self._save_current_prototype(node_id, embedding)
|
|
|
|
logger.info(f"Rollback vers version {version} pour node {node_id}")
|
|
return True
|
|
|
|
def get_variants(self, node_id: str) -> List[Dict]:
|
|
"""Récupérer les variantes d'un node."""
|
|
return self._variants.get(node_id, [])
|
|
|
|
def _get_prototype(self, node_id: str) -> Optional[np.ndarray]:
|
|
"""Récupérer le prototype actuel d'un node."""
|
|
# Vérifier cache
|
|
if node_id in self._prototypes:
|
|
return self._prototypes[node_id]
|
|
|
|
# Charger depuis fichier
|
|
prototype_path = Path(self.config.embeddings_dir) / f"{node_id}_current.npy"
|
|
if prototype_path.exists():
|
|
prototype = np.load(str(prototype_path))
|
|
self._prototypes[node_id] = prototype
|
|
return prototype
|
|
|
|
# Essayer dernière version
|
|
latest = self.version_manager.get_latest_version(node_id)
|
|
if latest:
|
|
_, embedding = latest
|
|
self._prototypes[node_id] = embedding
|
|
return embedding
|
|
|
|
return None
|
|
|
|
def _save_current_prototype(self, node_id: str, embedding: np.ndarray) -> None:
|
|
"""Sauvegarder le prototype actuel."""
|
|
prototype_path = Path(self.config.embeddings_dir) / f"{node_id}_current.npy"
|
|
np.save(str(prototype_path), embedding)
|
|
|
|
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Calculer similarité cosinus."""
|
|
norm_a = np.linalg.norm(a)
|
|
norm_b = np.linalg.norm(b)
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
return float(np.dot(a, b) / (norm_a * norm_b))
|
|
|
|
def get_config(self) -> ContinuousLearnerConfig:
|
|
"""Récupérer la configuration."""
|
|
return self.config
|
|
|
|
|
|
# =============================================================================
|
|
# Fonctions utilitaires
|
|
# =============================================================================
|
|
|
|
def create_learner(
|
|
ema_alpha: float = 0.1,
|
|
drift_threshold: float = 0.85,
|
|
drift_count: int = 3
|
|
) -> ContinuousLearner:
|
|
"""
|
|
Créer un apprenant avec configuration personnalisée.
|
|
|
|
Args:
|
|
ema_alpha: Alpha pour EMA
|
|
drift_threshold: Seuil de confiance pour dérive
|
|
drift_count: Matchs consécutifs pour détecter dérive
|
|
|
|
Returns:
|
|
ContinuousLearner configuré
|
|
"""
|
|
config = ContinuousLearnerConfig(
|
|
ema_alpha=ema_alpha,
|
|
drift_confidence_threshold=drift_threshold,
|
|
drift_consecutive_count=drift_count
|
|
)
|
|
return ContinuousLearner(config)
|