Files
Geniusia_v2/geniusia2/core/multimodal_embedding_manager.py
2026-03-05 00:20:25 +01:00

916 lines
31 KiB
Python

"""
Gestionnaire d'embeddings multi-modaux pour la Phase 3 - Mode Complet.
Fusionne les embeddings de différentes modalités en un seul embedding unifié.
Modalités supportées:
- Image (screenshot entier)
- Texte (texte détecté)
- Titre (window_title)
- UI (éléments UI importants)
- Contexte (métadonnées workflow)
"""
import numpy as np
from typing import List, Optional, Dict, Any
from pathlib import Path
from .ui_element_models import (
UIElement,
StateEmbedding,
EmbeddingComponents,
ComponentInfo
)
from .llm_manager import LLMManager
from .logger import Logger
# Import optionnel de EmbeddingManager
try:
from .embedders.embedding_manager import EmbeddingManager as BaseEmbeddingManager
except ImportError:
BaseEmbeddingManager = None
from dataclasses import dataclass
@dataclass
class EmbeddingWeights:
"""Poids pour la fusion des embeddings multi-modaux."""
image: float = 0.4
text: float = 0.2
title: float = 0.1
ui: float = 0.2
context: float = 0.1
def normalize(self) -> 'EmbeddingWeights':
"""Normalise les poids pour qu'ils somment à 1.0."""
total = self.image + self.text + self.title + self.ui + self.context
if total == 0:
return EmbeddingWeights()
return EmbeddingWeights(
image=self.image / total,
text=self.text / total,
title=self.title / total,
ui=self.ui / total,
context=self.context / total
)
def to_dict(self) -> Dict[str, float]:
"""Convertit en dictionnaire."""
return {
"image": self.image,
"text": self.text,
"title": self.title,
"ui": self.ui,
"context": self.context
}
@classmethod
def from_dict(cls, data: Dict[str, float]) -> 'EmbeddingWeights':
"""Crée depuis un dictionnaire."""
return cls(
image=data.get("image", 0.4),
text=data.get("text", 0.2),
title=data.get("title", 0.1),
ui=data.get("ui", 0.2),
context=data.get("context", 0.1)
)
class MultiModalEmbeddingManager:
"""
Gestionnaire d'embeddings multi-modaux.
Fusionne les embeddings de 5 modalités:
1. Image globale (screenshot)
2. Texte détecté (OCR/VLM)
3. Titre de fenêtre
4. Éléments UI (moyenne des éléments importants)
5. Contexte workflow
La fusion est une combinaison pondérée normalisée.
"""
def __init__(
self,
embedding_manager: Optional[BaseEmbeddingManager] = None,
logger: Optional[Logger] = None,
data_dir: str = "data",
config: Optional[Dict[str, Any]] = None
):
"""
Initialise le gestionnaire d'embeddings multi-modaux.
Args:
embedding_manager: Gestionnaire d'embeddings existant
logger: Logger
data_dir: Répertoire de données
config: Configuration
"""
self.embedding_manager = embedding_manager
self.logger = logger
self.data_dir = Path(data_dir)
self.config = config or {}
# Configuration
self.embedding_dim = self.config.get("embedding_dim", 512)
self.fusion_method = self.config.get("fusion_method", "weighted_average")
self.use_cache = self.config.get("use_cache", True)
# Poids par défaut
weights_config = self.config.get("weights", {})
self.default_weights = EmbeddingWeights.from_dict(weights_config).normalize()
# Poids de fusion (pour compatibilité)
self.weights = {
'image': self.default_weights.image,
'text': self.default_weights.text,
'title': self.default_weights.title,
'ui': self.default_weights.ui,
'context': self.default_weights.context
}
# Cache des embeddings
self._embedding_cache = {} if self.use_cache else None
# Créer les répertoires
self.embeddings_dir = self.data_dir / "embeddings" / "multimodal"
self.embeddings_dir.mkdir(parents=True, exist_ok=True)
if self.logger:
self.logger.log_action({
"action": "multimodal_embedding_manager_initialized",
"embedding_dim": self.embedding_dim,
"fusion_method": self.fusion_method,
"default_weights": self.default_weights.to_dict()
})
def create_state_embedding(
self,
screenshot: np.ndarray,
detected_text: List[str],
window_title: str,
ui_elements: List[UIElement],
context: Optional[Dict[str, Any]] = None,
data_dir: str = "data"
) -> StateEmbedding:
"""
Crée un embedding d'état unifié en fusionnant toutes les modalités.
Args:
screenshot: Screenshot numpy array
detected_text: Liste de texte détecté
window_title: Titre de la fenêtre
ui_elements: Liste des éléments UI
context: Contexte workflow (optionnel)
data_dir: Répertoire de données
Returns:
StateEmbedding avec fusion multi-modale
"""
# Créer le répertoire pour les embeddings
embeddings_dir = Path(data_dir) / "embeddings" / "multimodal"
embeddings_dir.mkdir(parents=True, exist_ok=True)
# Générer un ID unique pour cet état
import time
state_id = f"state_{int(time.time() * 1000000)}"
# Composante 1: Image globale
image_emb, image_path = self._compute_image_embedding(
screenshot, state_id, embeddings_dir
)
image_emb_norm = self._normalize(image_emb)
# Composante 2: Texte concaténé
text_emb, text_path = self._compute_text_embedding(
detected_text, state_id, embeddings_dir
)
text_emb_norm = self._normalize(text_emb)
# Composante 3: Titre de fenêtre
title_emb, title_path = self._compute_title_embedding(
window_title, state_id, embeddings_dir
)
title_emb_norm = self._normalize(title_emb)
# Composante 4: UI éléments
ui_emb, ui_path = self._compute_ui_embedding(
ui_elements, state_id, embeddings_dir
)
ui_emb_norm = self._normalize(ui_emb)
# Composante 5: Contexte
context_emb, context_path = self._compute_context_embedding(
context, state_id, embeddings_dir
)
context_emb_norm = self._normalize(context_emb)
# Fusion pondérée
state_emb = (
self.weights['image'] * image_emb_norm +
self.weights['text'] * text_emb_norm +
self.weights['title'] * title_emb_norm +
self.weights['ui'] * ui_emb_norm +
self.weights['context'] * context_emb_norm
)
# Normalisation finale
state_emb_final = self._normalize(state_emb)
# Sauvegarder l'embedding fusionné
fused_path = embeddings_dir / f"{state_id}_fused.npy"
np.save(fused_path, state_emb_final)
# Créer les composantes
components = EmbeddingComponents(
image_embedding=ComponentInfo(
provider="openclip_ViT-B-32",
vector_id=str(image_path)
),
text_embedding=ComponentInfo(
provider="clip_text",
vector_id=str(text_path)
),
title_embedding=ComponentInfo(
provider="clip_text",
vector_id=str(title_path)
),
ui_embedding=ComponentInfo(
provider="openclip_ViT-B-32",
vector_id=str(ui_path)
),
context_embedding=ComponentInfo(
provider="numeric_context_v1",
vector_id=str(context_path)
) if self.weights['context'] > 0 else None
)
# Créer le StateEmbedding
state_embedding = StateEmbedding(
provider="multimodal_fusion_v1",
vector_id=str(fused_path),
components=components
)
if self.logger:
self.logger.log_action({
"action": "state_embedding_created",
"state_id": state_id,
"components": {
"image": image_emb.shape,
"text": text_emb.shape,
"title": title_emb.shape,
"ui": ui_emb.shape,
"context": context_emb.shape
}
})
return state_embedding
def _compute_image_embedding(
self,
screenshot: np.ndarray,
state_id: str,
embeddings_dir: Path
) -> tuple:
"""Calcule l'embedding de l'image globale."""
try:
# Convertir en PIL Image
from PIL import Image
if screenshot.shape[2] == 3:
# BGR to RGB
screenshot_rgb = screenshot[:, :, ::-1]
else:
screenshot_rgb = screenshot
pil_image = Image.fromarray(screenshot_rgb.astype(np.uint8))
# Générer l'embedding
embedding = self.image_embedder.embed(pil_image)
# Sauvegarder
path = embeddings_dir / f"{state_id}_image.npy"
np.save(path, embedding)
return embedding, path
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "image_embedding_error",
"error": str(e)
})
# Retourner un vecteur zéro
embedding = np.zeros(self.embedding_dim)
path = embeddings_dir / f"{state_id}_image.npy"
np.save(path, embedding)
return embedding, path
def _compute_text_embedding(
self,
detected_text: List[str],
state_id: str,
embeddings_dir: Path
) -> tuple:
"""Calcule l'embedding du texte concaténé."""
try:
# Concaténer le texte
text_concat = " ".join(detected_text) if detected_text else ""
if not text_concat:
# Pas de texte, retourner vecteur zéro
embedding = np.zeros(self.embedding_dim)
else:
# Pour l'instant, utiliser un embedding simple
# TODO: Intégrer avec un vrai text embedder
embedding = self._simple_text_embedding(text_concat)
# Sauvegarder
path = embeddings_dir / f"{state_id}_text.npy"
np.save(path, embedding)
return embedding, path
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "text_embedding_error",
"error": str(e)
})
embedding = np.zeros(self.embedding_dim)
path = embeddings_dir / f"{state_id}_text.npy"
np.save(path, embedding)
return embedding, path
def _compute_title_embedding(
self,
window_title: str,
state_id: str,
embeddings_dir: Path
) -> tuple:
"""Calcule l'embedding du titre de fenêtre."""
try:
if not window_title:
embedding = np.zeros(self.embedding_dim)
else:
embedding = self._simple_text_embedding(window_title)
# Sauvegarder
path = embeddings_dir / f"{state_id}_title.npy"
np.save(path, embedding)
return embedding, path
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "title_embedding_error",
"error": str(e)
})
embedding = np.zeros(self.embedding_dim)
path = embeddings_dir / f"{state_id}_title.npy"
np.save(path, embedding)
return embedding, path
def _compute_ui_embedding(
self,
ui_elements: List[UIElement],
state_id: str,
embeddings_dir: Path
) -> tuple:
"""Calcule l'embedding des éléments UI (moyenne des éléments importants)."""
try:
if not ui_elements:
embedding = np.zeros(self.embedding_dim)
else:
# Filtrer les éléments importants
important_elements = [
elem for elem in ui_elements
if elem.properties.is_clickable or 'primary_action' in elem.tags
]
if not important_elements:
# Prendre les 5 premiers éléments
important_elements = ui_elements[:5]
# Charger et moyenner les embeddings
embeddings = []
for elem in important_elements:
try:
emb = np.load(elem.visual.embedding_vector_id)
embeddings.append(emb)
except:
continue
if embeddings:
embedding = np.mean(embeddings, axis=0)
else:
embedding = np.zeros(self.embedding_dim)
# Sauvegarder
path = embeddings_dir / f"{state_id}_ui.npy"
np.save(path, embedding)
return embedding, path
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "ui_embedding_error",
"error": str(e)
})
embedding = np.zeros(self.embedding_dim)
path = embeddings_dir / f"{state_id}_ui.npy"
np.save(path, embedding)
return embedding, path
def _compute_context_embedding(
self,
context: Optional[Dict[str, Any]],
state_id: str,
embeddings_dir: Path
) -> tuple:
"""Calcule l'embedding du contexte workflow."""
try:
if not context or self.weights['context'] == 0:
embedding = np.zeros(self.embedding_dim)
else:
# Encoder les métadonnées de contexte en vecteur
embedding = self._encode_context(context)
# Sauvegarder
path = embeddings_dir / f"{state_id}_context.npy"
np.save(path, embedding)
return embedding, path
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "context_embedding_error",
"error": str(e)
})
embedding = np.zeros(self.embedding_dim)
path = embeddings_dir / f"{state_id}_context.npy"
np.save(path, embedding)
return embedding, path
def _simple_text_embedding(self, text: str) -> np.ndarray:
"""
Crée un embedding simple de texte.
TODO: Remplacer par un vrai text embedder (CLIP text, Sentence-BERT, etc.)
"""
# Pour l'instant, utiliser un hash simple
import hashlib
hash_obj = hashlib.sha256(text.encode('utf-8'))
hash_bytes = hash_obj.digest()
# Convertir en vecteur de dimension embedding_dim
embedding = np.zeros(self.embedding_dim)
for i in range(min(len(hash_bytes), self.embedding_dim)):
embedding[i] = hash_bytes[i] / 255.0
return embedding
def _encode_context(self, context: Dict[str, Any]) -> np.ndarray:
"""
Encode le contexte en vecteur numérique.
TODO: Améliorer l'encodage du contexte.
"""
# Pour l'instant, encoder simplement les clés/valeurs
context_str = str(context)
return self._simple_text_embedding(context_str)
def _normalize(self, vector: np.ndarray) -> np.ndarray:
"""Normalise un vecteur (norme L2 = 1.0)."""
norm = np.linalg.norm(vector)
if norm > 0:
return vector / norm
return vector
def get_weights(self) -> Dict[str, float]:
"""Retourne les poids de fusion actuels."""
return self.weights.copy()
def set_weights(self, weights: Dict[str, float]):
"""
Modifie les poids de fusion.
Args:
weights: Dictionnaire des nouveaux poids
"""
self.weights.update(weights)
if self.logger:
self.logger.log_action({
"action": "weights_updated",
"new_weights": self.weights
})
def compute_similarity(
self,
embedding1: np.ndarray,
embedding2: np.ndarray,
metric: str = "cosine"
) -> float:
"""
Calcule la similarité entre deux embeddings.
Args:
embedding1: Premier embedding
embedding2: Deuxième embedding
metric: Métrique de similarité ("cosine" ou "euclidean")
Returns:
Score de similarité entre 0.0 et 1.0
"""
try:
if metric == "cosine":
# Similarité cosinus
dot_product = np.dot(embedding1, embedding2)
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(dot_product / (norm1 * norm2))
elif metric == "euclidean":
# Distance euclidienne (convertie en similarité)
distance = np.linalg.norm(embedding1 - embedding2)
return float(1.0 / (1.0 + distance))
else:
raise ValueError(f"Métrique non supportée: {metric}")
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "similarity_computation_error",
"metric": metric,
"error": str(e)
})
return 0.0
def load_fused_embedding(self, vector_id: str) -> Optional[np.ndarray]:
"""
Charge un embedding fusionné depuis son vector_id.
Args:
vector_id: ID du vecteur (chemin de fichier ou ID temporaire)
Returns:
Embedding numpy array ou None si non trouvé
"""
try:
if vector_id.startswith("temp_"):
# Embedding temporaire, générer un embedding aléatoire
return np.random.rand(self.embedding_dim)
# Charger depuis le fichier
path = Path(vector_id)
if path.exists():
return np.load(path)
else:
# Fichier non trouvé, générer un embedding par défaut
if self.logger:
self.logger.log_action({
"action": "fused_embedding_not_found",
"vector_id": vector_id
})
return np.random.rand(self.embedding_dim)
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "fused_embedding_load_error",
"vector_id": vector_id,
"error": str(e)
})
return None
def generate_multimodal_embedding(
self,
screen_state,
screenshot: Optional[np.ndarray] = None,
weights: Optional[EmbeddingWeights] = None,
save: bool = True
) -> StateEmbedding:
"""
Génère un embedding multi-modal complet pour un état d'écran.
Args:
screen_state: EnrichedScreenState
screenshot: Screenshot numpy array (optionnel)
weights: Poids de fusion (utilise les poids par défaut si None)
save: Sauvegarder les embeddings
Returns:
StateEmbedding avec composantes et embedding fusionné
"""
if weights is None:
# Utiliser les poids de configuration
weights = EmbeddingWeights(
image=self.weights.get('image', 0.4),
text=self.weights.get('text', 0.2),
title=self.weights.get('title', 0.1),
ui=self.weights.get('ui', 0.2),
context=self.weights.get('context', 0.1)
).normalize()
else:
weights = weights.normalize()
try:
# Pour l'instant, générer un embedding simulé
# TODO: Implémenter la vraie génération avec les embedders
# Créer les composantes
components = EmbeddingComponents()
# Image embedding
if screenshot is not None:
components.image_embedding = ComponentInfo(
provider="openclip_ViT-B-32",
vector_id=f"temp_{screen_state.screen_state_id}_image"
)
# Text embedding
if screen_state.perception.detected_text:
components.text_embedding = ComponentInfo(
provider="clip_text",
vector_id=f"temp_{screen_state.screen_state_id}_text"
)
# Title embedding
if screen_state.window.window_title:
components.title_embedding = ComponentInfo(
provider="clip_text",
vector_id=f"temp_{screen_state.screen_state_id}_title"
)
# UI embedding
if screen_state.ui_elements:
components.ui_embedding = ComponentInfo(
provider="ui_aggregation_v1",
vector_id=f"temp_{screen_state.screen_state_id}_ui"
)
# Context embedding
if screen_state.context.current_workflow_candidate or screen_state.context.tags:
components.context_embedding = ComponentInfo(
provider="context_embedding_v1",
vector_id=f"temp_{screen_state.screen_state_id}_context"
)
# Créer le StateEmbedding
state_embedding = StateEmbedding(
provider="multimodal_fusion_v1",
vector_id=f"temp_{screen_state.screen_state_id}_fused",
components=components
)
if self.logger:
self.logger.log_action({
"action": "multimodal_embedding_generated",
"screen_state_id": screen_state.screen_state_id,
"provider": state_embedding.provider
})
return state_embedding
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "multimodal_embedding_error",
"screen_state_id": screen_state.screen_state_id,
"error": str(e)
})
raise
if __name__ == "__main__":
# Tests basiques (sans dépendances lourdes)
print("MultiModalEmbeddingManager - Tests basiques")
print("=" * 50)
# Test normalisation (pas besoin de logger ou embedder)
print("\n1. Test normalisation:")
# Créer une instance minimale pour tester la normalisation
class MinimalManager:
def _normalize(self, vector):
norm = np.linalg.norm(vector)
if norm > 0:
return vector / norm
return vector
manager = MinimalManager()
vector = np.array([3.0, 4.0, 0.0])
normalized = manager._normalize(vector)
norm = np.linalg.norm(normalized)
print(f" Vecteur original: {vector}")
print(f" Vecteur normalisé: {normalized}")
print(f" Norme: {norm:.6f}")
assert abs(norm - 1.0) < 0.001, "La norme doit être 1.0"
print(f" ✓ Normalisation correcte")
# Test poids par défaut
print("\n2. Test configuration des poids:")
default_weights = {
'image': 0.5,
'text': 0.3,
'title': 0.1,
'ui': 0.1,
'context': 0.0
}
print(f" Poids par défaut: {default_weights}")
total = sum(default_weights.values())
print(f" Somme des poids: {total}")
print(f" ✓ Configuration valide")
print("\n✓ Tous les tests basiques réussis!")
def compute_similarity(
self,
embedding1: np.ndarray,
embedding2: np.ndarray,
metric: str = "cosine"
) -> float:
"""
Calcule la similarité entre deux embeddings.
Args:
embedding1: Premier embedding
embedding2: Deuxième embedding
metric: Métrique de similarité ("cosine" ou "euclidean")
Returns:
Score de similarité entre 0.0 et 1.0
"""
try:
if metric == "cosine":
# Similarité cosinus
dot_product = np.dot(embedding1, embedding2)
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(dot_product / (norm1 * norm2))
elif metric == "euclidean":
# Distance euclidienne (convertie en similarité)
distance = np.linalg.norm(embedding1 - embedding2)
return float(1.0 / (1.0 + distance))
else:
raise ValueError(f"Métrique non supportée: {metric}")
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "similarity_computation_error",
"metric": metric,
"error": str(e)
})
return 0.0
def load_fused_embedding(self, vector_id: str) -> Optional[np.ndarray]:
"""
Charge un embedding fusionné depuis son vector_id.
Args:
vector_id: ID du vecteur (chemin de fichier ou ID temporaire)
Returns:
Embedding numpy array ou None si non trouvé
"""
try:
if vector_id.startswith("temp_"):
# Embedding temporaire, générer un embedding aléatoire
return np.random.rand(self.embedding_dim)
# Charger depuis le fichier
path = Path(vector_id)
if path.exists():
return np.load(path)
else:
# Fichier non trouvé, générer un embedding par défaut
if self.logger:
self.logger.log_action({
"action": "fused_embedding_not_found",
"vector_id": vector_id
})
return np.random.rand(self.embedding_dim)
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "fused_embedding_load_error",
"vector_id": vector_id,
"error": str(e)
})
return None
def generate_multimodal_embedding(
self,
screen_state,
screenshot: Optional[np.ndarray] = None,
weights: Optional[EmbeddingWeights] = None,
save: bool = True
) -> StateEmbedding:
"""
Génère un embedding multi-modal complet pour un état d'écran.
Args:
screen_state: EnrichedScreenState
screenshot: Screenshot numpy array (optionnel)
weights: Poids de fusion (utilise les poids par défaut si None)
save: Sauvegarder les embeddings
Returns:
StateEmbedding avec composantes et embedding fusionné
"""
if weights is None:
# Utiliser les poids de configuration
weights = EmbeddingWeights(
image=self.weights.get('image', 0.4),
text=self.weights.get('text', 0.2),
title=self.weights.get('title', 0.1),
ui=self.weights.get('ui', 0.2),
context=self.weights.get('context', 0.1)
).normalize()
else:
weights = weights.normalize()
try:
# Pour l'instant, générer un embedding simulé
# TODO: Implémenter la vraie génération avec les embedders
# Créer les composantes
components = EmbeddingComponents()
# Image embedding
if screenshot is not None:
components.image_embedding = ComponentInfo(
provider="openclip_ViT-B-32",
vector_id=f"temp_{screen_state.screen_state_id}_image"
)
# Text embedding
if screen_state.perception.detected_text:
components.text_embedding = ComponentInfo(
provider="clip_text",
vector_id=f"temp_{screen_state.screen_state_id}_text"
)
# Title embedding
if screen_state.window.window_title:
components.title_embedding = ComponentInfo(
provider="clip_text",
vector_id=f"temp_{screen_state.screen_state_id}_title"
)
# UI embedding
if screen_state.ui_elements:
components.ui_embedding = ComponentInfo(
provider="ui_aggregation_v1",
vector_id=f"temp_{screen_state.screen_state_id}_ui"
)
# Context embedding
if screen_state.context.current_workflow_candidate or screen_state.context.tags:
components.context_embedding = ComponentInfo(
provider="context_embedding_v1",
vector_id=f"temp_{screen_state.screen_state_id}_context"
)
# Créer le StateEmbedding
state_embedding = StateEmbedding(
provider="multimodal_fusion_v1",
vector_id=f"temp_{screen_state.screen_state_id}_fused",
components=components
)
if self.logger:
self.logger.log_action({
"action": "multimodal_embedding_generated",
"screen_state_id": screen_state.screen_state_id,
"provider": state_embedding.provider
})
return state_embedding
except Exception as e:
if self.logger:
self.logger.log_action({
"action": "multimodal_embedding_error",
"screen_state_id": screen_state.screen_state_id,
"error": str(e)
})
raise