Files
rpa_vision_v3/core/pipeline/screen_analyzer.py
Dom d5deac3029 feat: replay visuel VLM-first, worker séparé, package Léa, AZERTY, sécurité HTTPS
Pipeline replay visuel :
- VLM-first : l'agent appelle Ollama directement pour trouver les éléments
- Template matching en fallback (seuil strict 0.90)
- Stop immédiat si élément non trouvé (pas de clic blind)
- Replay depuis session brute (/replay-session) sans attendre le VLM
- Vérification post-action (screenshot hash avant/après)
- Gestion des popups (Enter/Escape/Tab+Enter)

Worker VLM séparé :
- run_worker.py : process distinct du serveur HTTP
- Communication par fichiers (_worker_queue.txt + _replay_active.lock)
- Le serveur HTTP ne fait plus jamais de VLM → toujours réactif
- Service systemd rpa-worker.service

Capture clavier :
- raw_keys (vk + press/release) pour replay exact indépendant du layout
- Fix AZERTY : ToUnicodeEx + AltGr detection
- Enter capturé comme \n, Tab comme \t
- Filtrage modificateurs seuls (Ctrl/Alt/Shift parasites)
- Fusion text_input consécutifs, dédup key_combo

Sécurité & Internet :
- HTTPS Let's Encrypt (lea.labs + vwb.labs.laurinebazin.design)
- Token API fixe dans .env.local
- HTTP Basic Auth sur VWB
- Security headers (HSTS, CSP, nosniff)
- CORS domaines publics, plus de wildcard

Infrastructure :
- DPI awareness (SetProcessDpiAwareness) Python + Rust
- Métadonnées système (dpi_scale, window_bounds, monitors, os_theme)
- Template matching multi-scale [0.5, 2.0]
- Résolution dynamique (plus de hardcode 1920x1080)
- VLM prefill fix (47x speedup, 3.5s au lieu de 180s)

Modules :
- core/auth/ : credential vault (Fernet AES), TOTP (RFC 6238), auth handler
- core/federation/ : LearningPack export/import anonymisé, FAISS global
- deploy/ : package Léa (config.txt, Lea.bat, install.bat, LISEZMOI.txt)

UX :
- Filtrage OS (VWB + Chat montrent que les workflows de l'OS courant)
- Bibliothèque persistante (cache local + SQLite)
- Clustering hybride (titre fenêtre + DBSCAN)
- EdgeConstraints + PostConditions peuplés
- GraphBuilder compound actions (toutes les frappes)

Agent Rust :
- Token Bearer auth (network.rs)
- sysinfo.rs (DPI, résolution, window bounds via Win32 API)
- config.txt lu automatiquement
- Support Chrome/Brave/Firefox (pas que Edge)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:19:18 +01:00

350 lines
12 KiB
Python

"""
ScreenAnalyzer - Construction complète d'un ScreenState depuis un screenshot
Orchestre les 4 niveaux du ScreenState :
Niveau 1 (Raw) : métadonnées de l'image
Niveau 2 (Perception): OCR + embedding global
Niveau 3 (UI) : détection d'éléments UI
Niveau 4 (Contexte) : fenêtre active, workflow en cours
Ce module comble le chaînon manquant entre la capture brute (Couche 0)
et la construction d'embeddings (Couche 3).
"""
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
from PIL import Image
from core.models.screen_state import (
ScreenState,
RawLevel,
PerceptionLevel,
ContextLevel,
WindowContext,
EmbeddingRef,
)
from core.models.ui_element import UIElement
logger = logging.getLogger(__name__)
class ScreenAnalyzer:
"""
Construit un ScreenState complet (4 niveaux) depuis un screenshot.
Utilise le UIDetector pour la détection d'éléments et un OCR
(docTR ou Tesseract) pour l'extraction de texte.
Example:
>>> analyzer = ScreenAnalyzer()
>>> state = analyzer.analyze("/path/to/screenshot.png")
>>> print(state.perception.detected_text)
>>> print(len(state.ui_elements))
"""
def __init__(
self,
ui_detector=None,
ocr_engine: Optional[str] = None,
session_id: str = "",
):
"""
Args:
ui_detector: Instance de UIDetector (créé si None)
ocr_engine: Moteur OCR à utiliser ("doctr", "tesseract", None=auto)
session_id: ID de la session en cours
"""
self._ui_detector = ui_detector
self._ocr_engine_name = ocr_engine
self._ocr = None
self.session_id = session_id
self._state_counter = 0
# Initialisation lazy pour éviter les imports lourds au démarrage
self._ui_detector_initialized = ui_detector is not None
self._ocr_initialized = False
# =========================================================================
# API publique
# =========================================================================
def analyze(
self,
screenshot_path: str,
window_info: Optional[Dict[str, Any]] = None,
context: Optional[Dict[str, Any]] = None,
) -> ScreenState:
"""
Analyser un screenshot et construire un ScreenState complet.
Args:
screenshot_path: Chemin vers le fichier image
window_info: Infos fenêtre active {"title": ..., "app_name": ...}
context: Contexte métier optionnel
Returns:
ScreenState avec les 4 niveaux remplis
"""
screenshot_path = str(screenshot_path)
self._state_counter += 1
state_id = f"{self.session_id}_state_{self._state_counter:04d}" if self.session_id else f"state_{self._state_counter:04d}"
# Niveau 1 : Raw
raw = self._build_raw_level(screenshot_path)
# Niveau 2 : Perception (OCR)
detected_text = self._extract_text(screenshot_path)
perception = PerceptionLevel(
embedding=EmbeddingRef(
provider="openclip_ViT-B-32",
vector_id=f"data/embeddings/screens/{state_id}.npy",
dimensions=512,
),
detected_text=detected_text,
text_detection_method=self._get_ocr_method_name(),
confidence_avg=0.85 if detected_text else 0.0,
)
# Niveau 3 : UI Elements
ui_elements = self._detect_ui_elements(screenshot_path, window_info)
# Niveau 4 : Contexte
window_ctx = self._build_window_context(window_info)
context_level = self._build_context_level(context)
state = ScreenState(
screen_state_id=state_id,
timestamp=datetime.now(),
session_id=self.session_id,
window=window_ctx,
raw=raw,
perception=perception,
context=context_level,
metadata={
"analyzer_version": "1.0",
"ui_elements_count": len(ui_elements),
"text_regions_count": len(detected_text),
},
ui_elements=ui_elements,
)
logger.info(
f"ScreenState {state_id} construit: "
f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés"
)
return state
def analyze_image(
self,
image: Image.Image,
save_dir: str = "data/screens",
window_info: Optional[Dict[str, Any]] = None,
context: Optional[Dict[str, Any]] = None,
) -> ScreenState:
"""
Analyser une PIL Image (utile quand on a déjà l'image en mémoire).
Sauvegarde l'image sur disque puis appelle analyze().
"""
save_path = Path(save_dir)
save_path.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"screen_{timestamp}.png"
filepath = save_path / filename
image.save(str(filepath))
return self.analyze(str(filepath), window_info=window_info, context=context)
# =========================================================================
# Niveau 1 : Raw
# =========================================================================
def _build_raw_level(self, screenshot_path: str) -> RawLevel:
file_size = 0
try:
file_size = os.path.getsize(screenshot_path)
except OSError:
pass
return RawLevel(
screenshot_path=screenshot_path,
capture_method="mss",
file_size_bytes=file_size,
)
# =========================================================================
# Niveau 2 : Perception — OCR
# =========================================================================
def _extract_text(self, screenshot_path: str) -> List[str]:
"""Extraire le texte d'un screenshot via OCR."""
self._ensure_ocr()
if self._ocr is None:
return []
try:
return self._ocr(screenshot_path)
except Exception as e:
logger.warning(f"OCR échoué: {e}")
return []
def _ensure_ocr(self) -> None:
"""Initialiser le moteur OCR (lazy)."""
if self._ocr_initialized:
return
self._ocr_initialized = True
engine = self._ocr_engine_name
# Auto-détection : essayer docTR puis Tesseract
if engine is None or engine == "doctr":
try:
self._ocr = self._create_doctr_ocr()
logger.info("OCR initialisé avec docTR")
return
except Exception as e:
if engine == "doctr":
logger.warning(f"docTR non disponible: {e}")
return
if engine is None or engine == "tesseract":
try:
self._ocr = self._create_tesseract_ocr()
logger.info("OCR initialisé avec Tesseract")
return
except Exception as e:
logger.warning(f"Tesseract non disponible: {e}")
logger.warning("Aucun moteur OCR disponible — detected_text sera vide")
def _create_doctr_ocr(self):
"""Créer une fonction OCR basée sur docTR."""
from doctr.io import DocumentFile
from doctr.models import ocr_predictor
predictor = ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True)
def ocr_func(image_path: str) -> List[str]:
doc = DocumentFile.from_images(image_path)
result = predictor(doc)
texts = []
for page in result.pages:
for block in page.blocks:
for line in block.lines:
line_text = " ".join(word.value for word in line.words)
if line_text.strip():
texts.append(line_text.strip())
return texts
return ocr_func
def _create_tesseract_ocr(self):
"""Créer une fonction OCR basée sur Tesseract."""
import pytesseract
def ocr_func(image_path: str) -> List[str]:
img = Image.open(image_path)
raw_text = pytesseract.image_to_string(img, lang="fra+eng")
lines = [line.strip() for line in raw_text.split("\n") if line.strip()]
return lines
return ocr_func
def _get_ocr_method_name(self) -> str:
if self._ocr is None:
return "none"
if self._ocr_engine_name:
return self._ocr_engine_name
return "doctr"
# =========================================================================
# Niveau 3 : UI Elements
# =========================================================================
def _detect_ui_elements(
self,
screenshot_path: str,
window_info: Optional[Dict[str, Any]] = None,
) -> List[UIElement]:
"""Détecter les éléments UI dans le screenshot."""
self._ensure_ui_detector()
if self._ui_detector is None:
return []
try:
elements = self._ui_detector.detect(
screenshot_path, window_context=window_info
)
return elements
except Exception as e:
logger.warning(f"Détection UI échouée: {e}")
return []
def _ensure_ui_detector(self) -> None:
"""Initialiser le UIDetector (lazy)."""
if self._ui_detector_initialized:
return
self._ui_detector_initialized = True
try:
from core.detection.ui_detector import UIDetector, DetectionConfig
config = DetectionConfig(
use_owl_detection=False, # Désactiver OWL par défaut (lourd)
use_vlm_classification=True,
confidence_threshold=0.6,
)
self._ui_detector = UIDetector(config)
logger.info("UIDetector initialisé")
except Exception as e:
logger.warning(f"UIDetector non disponible: {e}")
self._ui_detector = None
# =========================================================================
# Niveau 4 : Contexte
# =========================================================================
def _build_window_context(
self, window_info: Optional[Dict[str, Any]] = None
) -> WindowContext:
if window_info:
return WindowContext(
app_name=window_info.get("app_name", "unknown"),
window_title=window_info.get("title", "Unknown"),
screen_resolution=window_info.get("screen_resolution", [1920, 1080]),
workspace=window_info.get("workspace", "main"),
monitor_index=window_info.get("monitor_index", 0),
dpi_scale=window_info.get("dpi_scale", 100),
window_bounds=window_info.get("window_bounds"),
monitors=window_info.get("monitors"),
os_theme=window_info.get("os_theme", "unknown"),
os_language=window_info.get("os_language", "unknown"),
)
return WindowContext(
app_name="unknown",
window_title="Unknown",
screen_resolution=[1920, 1080],
workspace="main",
)
def _build_context_level(
self, context: Optional[Dict[str, Any]] = None
) -> ContextLevel:
if context:
return ContextLevel(
current_workflow_candidate=context.get("workflow_candidate"),
workflow_step=context.get("workflow_step"),
user_id=context.get("user_id", ""),
tags=context.get("tags", []),
business_variables=context.get("business_variables", {}),
)
return ContextLevel()