""" CLIP-based embedder implementation for RPA Vision V3. This module provides a wrapper around OpenCLIP for generating image and text embeddings using the CLIP (Contrastive Language-Image Pre-training) model. """ import torch import numpy as np from PIL import Image from typing import List, Optional import logging try: import open_clip except ImportError: open_clip = None from .base_embedder import EmbedderBase logger = logging.getLogger(__name__) class CLIPEmbedder(EmbedderBase): """ CLIP-based image and text embedder using OpenCLIP. This embedder uses the ViT-B/32 architecture by default, which produces 512-dimensional embeddings. It automatically handles GPU/CPU device selection. The embeddings are L2-normalized for cosine similarity calculations. """ def __init__( self, model_name: str = "ViT-B-32", pretrained: str = "openai", device: Optional[str] = None ): """ Initialize the CLIP embedder. Args: model_name: CLIP model architecture (default: ViT-B-32) Options: ViT-B-32, ViT-B-16, ViT-L-14, etc. pretrained: Pretrained weights to use (default: openai) device: Device to use ('cuda', 'cpu', or None for auto-detect) Defaults to CPU to save GPU memory for VLM models Raises: ImportError: If open_clip is not installed RuntimeError: If model loading fails """ if open_clip is None: raise ImportError( "OpenCLIP is not installed. " "Install it with: pip install open-clip-torch" ) if device is None: try: import torch if torch.cuda.is_available(): free_vram = torch.cuda.mem_get_info()[0] / 1024**3 if free_vram > 1.5: device = "cuda" else: device = "cpu" else: device = "cpu" except Exception: device = "cpu" self.model_name = model_name self.pretrained = pretrained self.device = device self._embedding_dim = None # Load model try: logger.info(f"Loading CLIP model: {model_name} ({pretrained}) on {device}...") self.model, _, self.preprocess = open_clip.create_model_and_transforms( model_name, pretrained=pretrained, device=device ) self.model.eval() # Get tokenizer for text self.tokenizer = open_clip.get_tokenizer(model_name) # Determine embedding dimension with torch.no_grad(): dummy_image = torch.zeros(1, 3, 224, 224).to(self.device) dummy_embedding = self.model.encode_image(dummy_image) self._embedding_dim = dummy_embedding.shape[-1] logger.info( f"✓ CLIP embedder loaded: {model_name} on {device}, " f"dimension={self._embedding_dim}" ) except Exception as e: raise RuntimeError(f"Failed to load CLIP model: {e}") def embed_image(self, image: Image.Image) -> np.ndarray: """ Generate embedding for a single image. Args: image: PIL Image to embed Returns: np.ndarray: Normalized embedding vector of shape (dimension,) Raises: ValueError: If image is invalid RuntimeError: If embedding generation fails """ if not isinstance(image, Image.Image): raise ValueError("Input must be a PIL Image") try: # Preprocess image image_tensor = self.preprocess(image).unsqueeze(0).to(self.device) # Generate embedding with torch.no_grad(): embedding = self.model.encode_image(image_tensor) # L2 normalize for cosine similarity embedding = embedding / embedding.norm(dim=-1, keepdim=True) return embedding.cpu().numpy().flatten() except Exception as e: raise RuntimeError(f"Failed to generate image embedding: {e}") def embed_text(self, text: str) -> np.ndarray: """ Generate embedding for text. Args: text: Text string to embed Returns: np.ndarray: Normalized embedding vector of shape (dimension,) Raises: ValueError: If text is invalid RuntimeError: If embedding generation fails """ if not isinstance(text, str): raise ValueError("Input must be a string") if not text.strip(): # Return zero vector for empty text return np.zeros(self.get_dimension(), dtype=np.float32) try: # Tokenize text text_tokens = self.tokenizer([text]).to(self.device) # Generate embedding with torch.no_grad(): embedding = self.model.encode_text(text_tokens) # L2 normalize for cosine similarity embedding = embedding / embedding.norm(dim=-1, keepdim=True) return embedding.cpu().numpy().flatten() except Exception as e: raise RuntimeError(f"Failed to generate text embedding: {e}") def embed_image_batch(self, images: List[Image.Image]) -> np.ndarray: """ Generate embeddings for multiple images (optimized batch processing). Args: images: List of PIL Images to embed Returns: np.ndarray: Array of embeddings with shape (len(images), dimension) Raises: ValueError: If any image is invalid RuntimeError: If embedding generation fails """ if not images: return np.array([]).reshape(0, self.get_dimension()) # Validate all images for i, img in enumerate(images): if not isinstance(img, Image.Image): raise ValueError(f"Image at index {i} is not a PIL Image") try: # Preprocess all images image_tensors = torch.stack([ self.preprocess(img) for img in images ]).to(self.device) # Generate embeddings in batch with torch.no_grad(): embeddings = self.model.encode_image(image_tensors) # L2 normalize for cosine similarity embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True) return embeddings.cpu().numpy() except Exception as e: raise RuntimeError(f"Failed to generate batch image embeddings: {e}") def embed_text_batch(self, texts: List[str]) -> np.ndarray: """ Generate embeddings for multiple texts (optimized batch processing). Args: texts: List of text strings to embed Returns: np.ndarray: Array of embeddings with shape (len(texts), dimension) Raises: ValueError: If any text is invalid RuntimeError: If embedding generation fails """ if not texts: return np.array([]).reshape(0, self.get_dimension()) # Validate all texts for i, text in enumerate(texts): if not isinstance(text, str): raise ValueError(f"Text at index {i} is not a string") try: # Handle empty texts processed_texts = [text if text.strip() else " " for text in texts] # Tokenize all texts text_tokens = self.tokenizer(processed_texts).to(self.device) # Generate embeddings in batch with torch.no_grad(): embeddings = self.model.encode_text(text_tokens) # L2 normalize for cosine similarity embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True) return embeddings.cpu().numpy() except Exception as e: raise RuntimeError(f"Failed to generate batch text embeddings: {e}") def get_dimension(self) -> int: """ Get the dimensionality of embeddings. Returns: int: Embedding dimension (512 for ViT-B/32) """ return self._embedding_dim def get_model_name(self) -> str: """ Get model identifier. Returns: str: Model name (e.g., "clip-vit-b32") """ return f"clip-{self.model_name.lower().replace('/', '-')}" # ============================================================================ # Factory functions # ============================================================================ def create_clip_embedder( model_name: str = "ViT-B-32", device: Optional[str] = None ) -> CLIPEmbedder: """ Create a CLIP embedder with default configuration. Args: model_name: CLIP model architecture (default: ViT-B-32) device: Device to use (default: CPU) Returns: CLIPEmbedder: Configured CLIP embedder """ return CLIPEmbedder(model_name=model_name, device=device) def get_default_embedder() -> CLIPEmbedder: """ Get the default CLIP embedder (ViT-B/32 on CPU). Returns: CLIPEmbedder: Default embedder """ return CLIPEmbedder()