Pipeline replay visuel : - VLM-first : l'agent appelle Ollama directement pour trouver les éléments - Template matching en fallback (seuil strict 0.90) - Stop immédiat si élément non trouvé (pas de clic blind) - Replay depuis session brute (/replay-session) sans attendre le VLM - Vérification post-action (screenshot hash avant/après) - Gestion des popups (Enter/Escape/Tab+Enter) Worker VLM séparé : - run_worker.py : process distinct du serveur HTTP - Communication par fichiers (_worker_queue.txt + _replay_active.lock) - Le serveur HTTP ne fait plus jamais de VLM → toujours réactif - Service systemd rpa-worker.service Capture clavier : - raw_keys (vk + press/release) pour replay exact indépendant du layout - Fix AZERTY : ToUnicodeEx + AltGr detection - Enter capturé comme \n, Tab comme \t - Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites) - Fusion text_input consécutifs, dédup key_combo Sécurité & Internet : - HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design) - Token API fixe dans .env.local - HTTP Basic Auth sur VWB - Security headers (HSTS, CSP, nosniff) - CORS domaines publics, plus de wildcard Infrastructure : - DPI awareness (SetProcessDpiAwareness) Python + Rust - Métadonnées système (dpi_scale, window_bounds, monitors, os_theme) - Template matching multi-scale [0.5, 2.0] - Résolution dynamique (plus de hardcode 1920x1080) - VLM prefill fix (47x speedup, 3.5s au lieu de 180s) Modules : - core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler - core/federation/ : LearningPack export/import anonymisé, FAISS global - deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt) UX : - Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant) - Bibliothèque persistante (cache local + SQLite) - Clustering hybride (titre fenêtre + DBSCAN) - EdgeConstraints + PostConditions peuplés - GraphBuilder compound actions (toutes les frappes) Agent Rust : - Token Bearer auth (network.rs) - sysinfo.rs (DPI, résolution, window bounds via Win32 API) - config.txt lu automatiquement - Support Chrome/Brave/Firefox (pas que Edge) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
350 lines
12 KiB
Python
350 lines
12 KiB
Python
"""
|
|
ScreenAnalyzer - Construction complète d'un ScreenState depuis un screenshot
|
|
|
|
Orchestre les 4 niveaux du ScreenState :
|
|
Niveau 1 (Raw) : métadonnées de l'image
|
|
Niveau 2 (Perception): OCR + embedding global
|
|
Niveau 3 (UI) : détection d'éléments UI
|
|
Niveau 4 (Contexte) : fenêtre active, workflow en cours
|
|
|
|
Ce module comble le chaînon manquant entre la capture brute (Couche 0)
|
|
et la construction d'embeddings (Couche 3).
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from PIL import Image
|
|
|
|
from core.models.screen_state import (
|
|
ScreenState,
|
|
RawLevel,
|
|
PerceptionLevel,
|
|
ContextLevel,
|
|
WindowContext,
|
|
EmbeddingRef,
|
|
)
|
|
from core.models.ui_element import UIElement
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ScreenAnalyzer:
|
|
"""
|
|
Construit un ScreenState complet (4 niveaux) depuis un screenshot.
|
|
|
|
Utilise le UIDetector pour la détection d'éléments et un OCR
|
|
(docTR ou Tesseract) pour l'extraction de texte.
|
|
|
|
Example:
|
|
>>> analyzer = ScreenAnalyzer()
|
|
>>> state = analyzer.analyze("/path/to/screenshot.png")
|
|
>>> print(state.perception.detected_text)
|
|
>>> print(len(state.ui_elements))
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ui_detector=None,
|
|
ocr_engine: Optional[str] = None,
|
|
session_id: str = "",
|
|
):
|
|
"""
|
|
Args:
|
|
ui_detector: Instance de UIDetector (créé si None)
|
|
ocr_engine: Moteur OCR à utiliser ("doctr", "tesseract", None=auto)
|
|
session_id: ID de la session en cours
|
|
"""
|
|
self._ui_detector = ui_detector
|
|
self._ocr_engine_name = ocr_engine
|
|
self._ocr = None
|
|
self.session_id = session_id
|
|
self._state_counter = 0
|
|
|
|
# Initialisation lazy pour éviter les imports lourds au démarrage
|
|
self._ui_detector_initialized = ui_detector is not None
|
|
self._ocr_initialized = False
|
|
|
|
# =========================================================================
|
|
# API publique
|
|
# =========================================================================
|
|
|
|
def analyze(
|
|
self,
|
|
screenshot_path: str,
|
|
window_info: Optional[Dict[str, Any]] = None,
|
|
context: Optional[Dict[str, Any]] = None,
|
|
) -> ScreenState:
|
|
"""
|
|
Analyser un screenshot et construire un ScreenState complet.
|
|
|
|
Args:
|
|
screenshot_path: Chemin vers le fichier image
|
|
window_info: Infos fenêtre active {"title": ..., "app_name": ...}
|
|
context: Contexte métier optionnel
|
|
|
|
Returns:
|
|
ScreenState avec les 4 niveaux remplis
|
|
"""
|
|
screenshot_path = str(screenshot_path)
|
|
self._state_counter += 1
|
|
|
|
state_id = f"{self.session_id}_state_{self._state_counter:04d}" if self.session_id else f"state_{self._state_counter:04d}"
|
|
|
|
# Niveau 1 : Raw
|
|
raw = self._build_raw_level(screenshot_path)
|
|
|
|
# Niveau 2 : Perception (OCR)
|
|
detected_text = self._extract_text(screenshot_path)
|
|
perception = PerceptionLevel(
|
|
embedding=EmbeddingRef(
|
|
provider="openclip_ViT-B-32",
|
|
vector_id=f"data/embeddings/screens/{state_id}.npy",
|
|
dimensions=512,
|
|
),
|
|
detected_text=detected_text,
|
|
text_detection_method=self._get_ocr_method_name(),
|
|
confidence_avg=0.85 if detected_text else 0.0,
|
|
)
|
|
|
|
# Niveau 3 : UI Elements
|
|
ui_elements = self._detect_ui_elements(screenshot_path, window_info)
|
|
|
|
# Niveau 4 : Contexte
|
|
window_ctx = self._build_window_context(window_info)
|
|
context_level = self._build_context_level(context)
|
|
|
|
state = ScreenState(
|
|
screen_state_id=state_id,
|
|
timestamp=datetime.now(),
|
|
session_id=self.session_id,
|
|
window=window_ctx,
|
|
raw=raw,
|
|
perception=perception,
|
|
context=context_level,
|
|
metadata={
|
|
"analyzer_version": "1.0",
|
|
"ui_elements_count": len(ui_elements),
|
|
"text_regions_count": len(detected_text),
|
|
},
|
|
ui_elements=ui_elements,
|
|
)
|
|
|
|
logger.info(
|
|
f"ScreenState {state_id} construit: "
|
|
f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés"
|
|
)
|
|
return state
|
|
|
|
def analyze_image(
|
|
self,
|
|
image: Image.Image,
|
|
save_dir: str = "data/screens",
|
|
window_info: Optional[Dict[str, Any]] = None,
|
|
context: Optional[Dict[str, Any]] = None,
|
|
) -> ScreenState:
|
|
"""
|
|
Analyser une PIL Image (utile quand on a déjà l'image en mémoire).
|
|
|
|
Sauvegarde l'image sur disque puis appelle analyze().
|
|
"""
|
|
save_path = Path(save_dir)
|
|
save_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
filename = f"screen_{timestamp}.png"
|
|
filepath = save_path / filename
|
|
|
|
image.save(str(filepath))
|
|
return self.analyze(str(filepath), window_info=window_info, context=context)
|
|
|
|
# =========================================================================
|
|
# Niveau 1 : Raw
|
|
# =========================================================================
|
|
|
|
def _build_raw_level(self, screenshot_path: str) -> RawLevel:
|
|
file_size = 0
|
|
try:
|
|
file_size = os.path.getsize(screenshot_path)
|
|
except OSError:
|
|
pass
|
|
|
|
return RawLevel(
|
|
screenshot_path=screenshot_path,
|
|
capture_method="mss",
|
|
file_size_bytes=file_size,
|
|
)
|
|
|
|
# =========================================================================
|
|
# Niveau 2 : Perception — OCR
|
|
# =========================================================================
|
|
|
|
def _extract_text(self, screenshot_path: str) -> List[str]:
|
|
"""Extraire le texte d'un screenshot via OCR."""
|
|
self._ensure_ocr()
|
|
|
|
if self._ocr is None:
|
|
return []
|
|
|
|
try:
|
|
return self._ocr(screenshot_path)
|
|
except Exception as e:
|
|
logger.warning(f"OCR échoué: {e}")
|
|
return []
|
|
|
|
def _ensure_ocr(self) -> None:
|
|
"""Initialiser le moteur OCR (lazy)."""
|
|
if self._ocr_initialized:
|
|
return
|
|
self._ocr_initialized = True
|
|
|
|
engine = self._ocr_engine_name
|
|
|
|
# Auto-détection : essayer docTR puis Tesseract
|
|
if engine is None or engine == "doctr":
|
|
try:
|
|
self._ocr = self._create_doctr_ocr()
|
|
logger.info("OCR initialisé avec docTR")
|
|
return
|
|
except Exception as e:
|
|
if engine == "doctr":
|
|
logger.warning(f"docTR non disponible: {e}")
|
|
return
|
|
|
|
if engine is None or engine == "tesseract":
|
|
try:
|
|
self._ocr = self._create_tesseract_ocr()
|
|
logger.info("OCR initialisé avec Tesseract")
|
|
return
|
|
except Exception as e:
|
|
logger.warning(f"Tesseract non disponible: {e}")
|
|
|
|
logger.warning("Aucun moteur OCR disponible — detected_text sera vide")
|
|
|
|
def _create_doctr_ocr(self):
|
|
"""Créer une fonction OCR basée sur docTR."""
|
|
from doctr.io import DocumentFile
|
|
from doctr.models import ocr_predictor
|
|
|
|
predictor = ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True)
|
|
|
|
def ocr_func(image_path: str) -> List[str]:
|
|
doc = DocumentFile.from_images(image_path)
|
|
result = predictor(doc)
|
|
texts = []
|
|
for page in result.pages:
|
|
for block in page.blocks:
|
|
for line in block.lines:
|
|
line_text = " ".join(word.value for word in line.words)
|
|
if line_text.strip():
|
|
texts.append(line_text.strip())
|
|
return texts
|
|
|
|
return ocr_func
|
|
|
|
def _create_tesseract_ocr(self):
|
|
"""Créer une fonction OCR basée sur Tesseract."""
|
|
import pytesseract
|
|
|
|
def ocr_func(image_path: str) -> List[str]:
|
|
img = Image.open(image_path)
|
|
raw_text = pytesseract.image_to_string(img, lang="fra+eng")
|
|
lines = [line.strip() for line in raw_text.split("\n") if line.strip()]
|
|
return lines
|
|
|
|
return ocr_func
|
|
|
|
def _get_ocr_method_name(self) -> str:
|
|
if self._ocr is None:
|
|
return "none"
|
|
if self._ocr_engine_name:
|
|
return self._ocr_engine_name
|
|
return "doctr"
|
|
|
|
# =========================================================================
|
|
# Niveau 3 : UI Elements
|
|
# =========================================================================
|
|
|
|
def _detect_ui_elements(
|
|
self,
|
|
screenshot_path: str,
|
|
window_info: Optional[Dict[str, Any]] = None,
|
|
) -> List[UIElement]:
|
|
"""Détecter les éléments UI dans le screenshot."""
|
|
self._ensure_ui_detector()
|
|
|
|
if self._ui_detector is None:
|
|
return []
|
|
|
|
try:
|
|
elements = self._ui_detector.detect(
|
|
screenshot_path, window_context=window_info
|
|
)
|
|
return elements
|
|
except Exception as e:
|
|
logger.warning(f"Détection UI échouée: {e}")
|
|
return []
|
|
|
|
def _ensure_ui_detector(self) -> None:
|
|
"""Initialiser le UIDetector (lazy)."""
|
|
if self._ui_detector_initialized:
|
|
return
|
|
self._ui_detector_initialized = True
|
|
|
|
try:
|
|
from core.detection.ui_detector import UIDetector, DetectionConfig
|
|
|
|
config = DetectionConfig(
|
|
use_owl_detection=False, # Désactiver OWL par défaut (lourd)
|
|
use_vlm_classification=True,
|
|
confidence_threshold=0.6,
|
|
)
|
|
self._ui_detector = UIDetector(config)
|
|
logger.info("UIDetector initialisé")
|
|
except Exception as e:
|
|
logger.warning(f"UIDetector non disponible: {e}")
|
|
self._ui_detector = None
|
|
|
|
# =========================================================================
|
|
# Niveau 4 : Contexte
|
|
# =========================================================================
|
|
|
|
def _build_window_context(
|
|
self, window_info: Optional[Dict[str, Any]] = None
|
|
) -> WindowContext:
|
|
if window_info:
|
|
return WindowContext(
|
|
app_name=window_info.get("app_name", "unknown"),
|
|
window_title=window_info.get("title", "Unknown"),
|
|
screen_resolution=window_info.get("screen_resolution", [1920, 1080]),
|
|
workspace=window_info.get("workspace", "main"),
|
|
monitor_index=window_info.get("monitor_index", 0),
|
|
dpi_scale=window_info.get("dpi_scale", 100),
|
|
window_bounds=window_info.get("window_bounds"),
|
|
monitors=window_info.get("monitors"),
|
|
os_theme=window_info.get("os_theme", "unknown"),
|
|
os_language=window_info.get("os_language", "unknown"),
|
|
)
|
|
return WindowContext(
|
|
app_name="unknown",
|
|
window_title="Unknown",
|
|
screen_resolution=[1920, 1080],
|
|
workspace="main",
|
|
)
|
|
|
|
def _build_context_level(
|
|
self, context: Optional[Dict[str, Any]] = None
|
|
) -> ContextLevel:
|
|
if context:
|
|
return ContextLevel(
|
|
current_workflow_candidate=context.get("workflow_candidate"),
|
|
workflow_step=context.get("workflow_step"),
|
|
user_id=context.get("user_id", ""),
|
|
tags=context.get("tags", []),
|
|
business_variables=context.get("business_variables", {}),
|
|
)
|
|
return ContextLevel()
|