""" ScreenAnalyzer - Construction complète d'un ScreenState depuis un screenshot Orchestre les 4 niveaux du ScreenState : Niveau 1 (Raw) : métadonnées de l'image Niveau 2 (Perception): OCR + embedding global Niveau 3 (UI) : détection d'éléments UI Niveau 4 (Contexte) : fenêtre active, workflow en cours Ce module comble le chaînon manquant entre la capture brute (Couche 0) et la construction d'embeddings (Couche 3). """ import logging import os from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List from PIL import Image from core.models.screen_state import ( ScreenState, RawLevel, PerceptionLevel, ContextLevel, WindowContext, EmbeddingRef, ) from core.models.ui_element import UIElement logger = logging.getLogger(__name__) class ScreenAnalyzer: """ Construit un ScreenState complet (4 niveaux) depuis un screenshot. Utilise le UIDetector pour la détection d'éléments et un OCR (docTR ou Tesseract) pour l'extraction de texte. Example: >>> analyzer = ScreenAnalyzer() >>> state = analyzer.analyze("/path/to/screenshot.png") >>> print(state.perception.detected_text) >>> print(len(state.ui_elements)) """ def __init__( self, ui_detector=None, ocr_engine: Optional[str] = None, session_id: str = "", ): """ Args: ui_detector: Instance de UIDetector (créé si None) ocr_engine: Moteur OCR à utiliser ("doctr", "tesseract", None=auto) session_id: ID de la session en cours """ self._ui_detector = ui_detector self._ocr_engine_name = ocr_engine self._ocr = None self.session_id = session_id self._state_counter = 0 # Initialisation lazy pour éviter les imports lourds au démarrage self._ui_detector_initialized = ui_detector is not None self._ocr_initialized = False # ========================================================================= # API publique # ========================================================================= def analyze( self, screenshot_path: str, window_info: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> ScreenState: """ Analyser un screenshot et construire un ScreenState complet. Args: screenshot_path: Chemin vers le fichier image window_info: Infos fenêtre active {"title": ..., "app_name": ...} context: Contexte métier optionnel Returns: ScreenState avec les 4 niveaux remplis """ screenshot_path = str(screenshot_path) self._state_counter += 1 state_id = f"{self.session_id}_state_{self._state_counter:04d}" if self.session_id else f"state_{self._state_counter:04d}" # Niveau 1 : Raw raw = self._build_raw_level(screenshot_path) # Niveau 2 : Perception (OCR) detected_text = self._extract_text(screenshot_path) perception = PerceptionLevel( embedding=EmbeddingRef( provider="openclip_ViT-B-32", vector_id=f"data/embeddings/screens/{state_id}.npy", dimensions=512, ), detected_text=detected_text, text_detection_method=self._get_ocr_method_name(), confidence_avg=0.85 if detected_text else 0.0, ) # Niveau 3 : UI Elements ui_elements = self._detect_ui_elements(screenshot_path, window_info) # Niveau 4 : Contexte window_ctx = self._build_window_context(window_info) context_level = self._build_context_level(context) state = ScreenState( screen_state_id=state_id, timestamp=datetime.now(), session_id=self.session_id, window=window_ctx, raw=raw, perception=perception, context=context_level, metadata={ "analyzer_version": "1.0", "ui_elements_count": len(ui_elements), "text_regions_count": len(detected_text), }, ui_elements=ui_elements, ) logger.info( f"ScreenState {state_id} construit: " f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés" ) return state def analyze_image( self, image: Image.Image, save_dir: str = "data/screens", window_info: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> ScreenState: """ Analyser une PIL Image (utile quand on a déjà l'image en mémoire). Sauvegarde l'image sur disque puis appelle analyze(). """ save_path = Path(save_dir) save_path.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"screen_{timestamp}.png" filepath = save_path / filename image.save(str(filepath)) return self.analyze(str(filepath), window_info=window_info, context=context) # ========================================================================= # Niveau 1 : Raw # ========================================================================= def _build_raw_level(self, screenshot_path: str) -> RawLevel: file_size = 0 try: file_size = os.path.getsize(screenshot_path) except OSError: pass return RawLevel( screenshot_path=screenshot_path, capture_method="mss", file_size_bytes=file_size, ) # ========================================================================= # Niveau 2 : Perception — OCR # ========================================================================= def _extract_text(self, screenshot_path: str) -> List[str]: """Extraire le texte d'un screenshot via OCR.""" self._ensure_ocr() if self._ocr is None: return [] try: return self._ocr(screenshot_path) except Exception as e: logger.warning(f"OCR échoué: {e}") return [] def _ensure_ocr(self) -> None: """Initialiser le moteur OCR (lazy).""" if self._ocr_initialized: return self._ocr_initialized = True engine = self._ocr_engine_name # Auto-détection : essayer docTR puis Tesseract if engine is None or engine == "doctr": try: self._ocr = self._create_doctr_ocr() logger.info("OCR initialisé avec docTR") return except Exception as e: if engine == "doctr": logger.warning(f"docTR non disponible: {e}") return if engine is None or engine == "tesseract": try: self._ocr = self._create_tesseract_ocr() logger.info("OCR initialisé avec Tesseract") return except Exception as e: logger.warning(f"Tesseract non disponible: {e}") logger.warning("Aucun moteur OCR disponible — detected_text sera vide") def _create_doctr_ocr(self): """Créer une fonction OCR basée sur docTR.""" from doctr.io import DocumentFile from doctr.models import ocr_predictor predictor = ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True) def ocr_func(image_path: str) -> List[str]: doc = DocumentFile.from_images(image_path) result = predictor(doc) texts = [] for page in result.pages: for block in page.blocks: for line in block.lines: line_text = " ".join(word.value for word in line.words) if line_text.strip(): texts.append(line_text.strip()) return texts return ocr_func def _create_tesseract_ocr(self): """Créer une fonction OCR basée sur Tesseract.""" import pytesseract def ocr_func(image_path: str) -> List[str]: img = Image.open(image_path) raw_text = pytesseract.image_to_string(img, lang="fra+eng") lines = [line.strip() for line in raw_text.split("\n") if line.strip()] return lines return ocr_func def _get_ocr_method_name(self) -> str: if self._ocr is None: return "none" if self._ocr_engine_name: return self._ocr_engine_name return "doctr" # ========================================================================= # Niveau 3 : UI Elements # ========================================================================= def _detect_ui_elements( self, screenshot_path: str, window_info: Optional[Dict[str, Any]] = None, ) -> List[UIElement]: """Détecter les éléments UI dans le screenshot.""" self._ensure_ui_detector() if self._ui_detector is None: return [] try: elements = self._ui_detector.detect( screenshot_path, window_context=window_info ) return elements except Exception as e: logger.warning(f"Détection UI échouée: {e}") return [] def _ensure_ui_detector(self) -> None: """Initialiser le UIDetector (lazy).""" if self._ui_detector_initialized: return self._ui_detector_initialized = True try: from core.detection.ui_detector import UIDetector, DetectionConfig config = DetectionConfig( use_owl_detection=False, # Désactiver OWL par défaut (lourd) use_vlm_classification=True, confidence_threshold=0.6, ) self._ui_detector = UIDetector(config) logger.info("UIDetector initialisé") except Exception as e: logger.warning(f"UIDetector non disponible: {e}") self._ui_detector = None # ========================================================================= # Niveau 4 : Contexte # ========================================================================= def _build_window_context( self, window_info: Optional[Dict[str, Any]] = None ) -> WindowContext: if window_info: return WindowContext( app_name=window_info.get("app_name", "unknown"), window_title=window_info.get("title", "Unknown"), screen_resolution=window_info.get("screen_resolution", [1920, 1080]), workspace=window_info.get("workspace", "main"), ) return WindowContext( app_name="unknown", window_title="Unknown", screen_resolution=[1920, 1080], workspace="main", ) def _build_context_level( self, context: Optional[Dict[str, Any]] = None ) -> ContextLevel: if context: return ContextLevel( current_workflow_candidate=context.get("workflow_candidate"), workflow_step=context.get("workflow_step"), user_id=context.get("user_id", ""), tags=context.get("tags", []), business_variables=context.get("business_variables", {}), ) return ContextLevel()