Files
rpa_vision_v3/core/detection/roi_optimizer.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

494 lines
15 KiB
Python

"""
ROI Optimizer - Optimisation de la détection UI par régions d'intérêt
Optimisations:
1. Redimensionnement intelligent des screenshots (max 1920x1080)
2. Détection rapide des régions d'intérêt (ROI)
3. Cache des résultats pour frames similaires
4. Traitement sélectif des zones actives
Gains de performance attendus:
- Réduction de 50-70% du temps de traitement
- Réduction de 60-80% de l'utilisation mémoire
- Cache hit rate de 30-50% sur workflows répétitifs
"""
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
from pathlib import Path
import numpy as np
from PIL import Image
import cv2
import hashlib
from collections import OrderedDict
from datetime import datetime
@dataclass
class ROI:
"""Région d'intérêt détectée"""
x: int
y: int
w: int
h: int
confidence: float
roi_type: str # "active", "changed", "interactive"
def to_dict(self) -> Dict[str, Any]:
"""Convertir en dictionnaire"""
return {
"x": self.x,
"y": self.y,
"w": self.w,
"h": self.h,
"confidence": self.confidence,
"roi_type": self.roi_type
}
@dataclass
class OptimizedFrame:
"""Frame optimisé avec ROIs"""
image: np.ndarray
original_size: Tuple[int, int]
resized_size: Tuple[int, int]
scale_factor: float
rois: List[ROI]
frame_hash: str
class ROICache:
"""
Cache pour résultats de détection ROI
Stocke les résultats de détection pour frames similaires
pour éviter les recalculs coûteux.
"""
def __init__(self, max_size: int = 100, similarity_threshold: float = 0.95):
"""
Initialiser le cache ROI
Args:
max_size: Nombre maximum de frames en cache
similarity_threshold: Seuil de similarité pour considérer 2 frames identiques
"""
self.max_size = max_size
self.similarity_threshold = similarity_threshold
self.cache: OrderedDict[str, Dict[str, Any]] = OrderedDict()
# Statistiques
self.hits = 0
self.misses = 0
self.total_time_saved = 0.0
def _compute_frame_hash(self, image: np.ndarray, quick: bool = True) -> str:
"""
Calculer un hash rapide de l'image
Args:
image: Image numpy
quick: Si True, utilise un hash rapide (downsampled)
Returns:
Hash hexadécimal
"""
if quick:
# Downsample pour hash rapide
small = cv2.resize(image, (64, 64))
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY) if len(small.shape) == 3 else small
return hashlib.md5(gray.tobytes()).hexdigest()
else:
# Hash complet (plus lent)
return hashlib.md5(image.tobytes()).hexdigest()
def get(self, image: np.ndarray) -> Optional[List[ROI]]:
"""
Récupérer les ROIs depuis le cache
Args:
image: Image à rechercher
Returns:
Liste de ROIs si trouvé, None sinon
"""
frame_hash = self._compute_frame_hash(image)
if frame_hash in self.cache:
# Déplacer à la fin (LRU)
self.cache.move_to_end(frame_hash)
self.hits += 1
cached_data = self.cache[frame_hash]
self.total_time_saved += cached_data.get("processing_time", 0.0)
return cached_data["rois"]
self.misses += 1
return None
def put(self, image: np.ndarray, rois: List[ROI], processing_time: float = 0.0):
"""
Ajouter des ROIs au cache
Args:
image: Image source
rois: ROIs détectés
processing_time: Temps de traitement (pour stats)
"""
frame_hash = self._compute_frame_hash(image)
# Évict si cache plein
if len(self.cache) >= self.max_size and frame_hash not in self.cache:
self.cache.popitem(last=False)
self.cache[frame_hash] = {
"rois": rois,
"processing_time": processing_time,
"timestamp": datetime.now()
}
def clear(self):
"""Vider le cache"""
self.cache.clear()
self.hits = 0
self.misses = 0
self.total_time_saved = 0.0
def get_stats(self) -> Dict[str, Any]:
"""Obtenir les statistiques du cache"""
total_requests = self.hits + self.misses
hit_rate = self.hits / total_requests if total_requests > 0 else 0.0
return {
"size": len(self.cache),
"max_size": self.max_size,
"hits": self.hits,
"misses": self.misses,
"hit_rate": hit_rate,
"total_time_saved_ms": self.total_time_saved * 1000
}
class ROIOptimizer:
"""
Optimiseur de détection UI par régions d'intérêt
Optimise la détection UI en:
1. Redimensionnant intelligemment les screenshots
2. Détectant rapidement les zones actives
3. Cachant les résultats pour frames similaires
"""
def __init__(self,
max_width: int = 1920,
max_height: int = 1080,
enable_cache: bool = True,
cache_size: int = 100):
"""
Initialiser l'optimiseur ROI
Args:
max_width: Largeur maximale des screenshots
max_height: Hauteur maximale des screenshots
enable_cache: Activer le cache de ROIs
cache_size: Taille du cache
"""
self.max_width = max_width
self.max_height = max_height
self.enable_cache = enable_cache
# Cache
self.cache = ROICache(max_size=cache_size) if enable_cache else None
# Statistiques
self.total_frames_processed = 0
self.total_frames_resized = 0
self.total_processing_time = 0.0
def optimize_frame(self, image_path: str) -> OptimizedFrame:
"""
Optimiser un frame pour la détection
Args:
image_path: Chemin vers l'image
Returns:
OptimizedFrame avec image redimensionnée et ROIs
"""
import time
start_time = time.time()
# Charger l'image
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Failed to load image: {image_path}")
original_h, original_w = image.shape[:2]
# Vérifier le cache d'abord
if self.cache:
cached_rois = self.cache.get(image)
if cached_rois is not None:
# Cache hit - retourner directement
return OptimizedFrame(
image=image,
original_size=(original_w, original_h),
resized_size=(original_w, original_h),
scale_factor=1.0,
rois=cached_rois,
frame_hash=self.cache._compute_frame_hash(image)
)
# Redimensionner si nécessaire
resized_image, scale_factor = self._resize_if_needed(image)
resized_h, resized_w = resized_image.shape[:2]
if scale_factor < 1.0:
self.total_frames_resized += 1
# Détecter les ROIs
rois = self._detect_rois(resized_image)
# Mettre en cache
processing_time = time.time() - start_time
if self.cache:
self.cache.put(image, rois, processing_time)
self.total_frames_processed += 1
self.total_processing_time += processing_time
return OptimizedFrame(
image=resized_image,
original_size=(original_w, original_h),
resized_size=(resized_w, resized_h),
scale_factor=scale_factor,
rois=rois,
frame_hash=self.cache._compute_frame_hash(image) if self.cache else ""
)
def _resize_if_needed(self, image: np.ndarray) -> Tuple[np.ndarray, float]:
"""
Redimensionner l'image si elle dépasse les limites
Args:
image: Image OpenCV
Returns:
(image_redimensionnée, facteur_d'échelle)
"""
h, w = image.shape[:2]
# Calculer le facteur d'échelle nécessaire
scale_w = self.max_width / w if w > self.max_width else 1.0
scale_h = self.max_height / h if h > self.max_height else 1.0
scale_factor = min(scale_w, scale_h)
# Redimensionner si nécessaire
if scale_factor < 1.0:
new_w = int(w * scale_factor)
new_h = int(h * scale_factor)
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
return resized, scale_factor
return image, 1.0
def _detect_rois(self, image: np.ndarray) -> List[ROI]:
"""
Détecter rapidement les régions d'intérêt
Utilise des techniques rapides pour identifier les zones actives:
- Détection de changements (si frame précédent disponible)
- Détection de contours
- Détection de zones de texte
Args:
image: Image OpenCV
Returns:
Liste de ROIs détectés
"""
rois = []
h, w = image.shape[:2]
# Convertir en niveaux de gris
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Méthode 1: Détection de contours (rapide)
# Appliquer un flou pour réduire le bruit
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# Détection de contours avec Canny
edges = cv2.Canny(blurred, 50, 150)
# Trouver les contours
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Filtrer et créer des ROIs
for contour in contours:
x, y, cw, ch = cv2.boundingRect(contour)
# Filtrer les régions trop petites ou trop grandes
area = cw * ch
if area < 100 or area > (w * h * 0.5): # Min 100px², max 50% de l'image
continue
# Ajouter une marge
margin = 5
x = max(0, x - margin)
y = max(0, y - margin)
cw = min(w - x, cw + 2 * margin)
ch = min(h - y, ch + 2 * margin)
rois.append(ROI(
x=x,
y=y,
w=cw,
h=ch,
confidence=0.8,
roi_type="contour"
))
# Méthode 2: Zones de texte (rapide avec EAST ou MSER)
# Pour l'instant, on utilise MSER (Maximally Stable Extremal Regions)
mser = cv2.MSER_create()
regions, _ = mser.detectRegions(gray)
for region in regions:
x, y, rw, rh = cv2.boundingRect(region)
# Filtrer
area = rw * rh
if area < 50 or area > (w * h * 0.3):
continue
rois.append(ROI(
x=x,
y=y,
w=rw,
h=rh,
confidence=0.7,
roi_type="text"
))
# Fusionner les ROIs qui se chevauchent
rois = self._merge_overlapping_rois(rois)
# Si aucun ROI détecté, utiliser l'image entière
if not rois:
rois.append(ROI(
x=0,
y=0,
w=w,
h=h,
confidence=1.0,
roi_type="full_frame"
))
return rois
def _merge_overlapping_rois(self, rois: List[ROI], iou_threshold: float = 0.5) -> List[ROI]:
"""
Fusionner les ROIs qui se chevauchent
Args:
rois: Liste de ROIs
iou_threshold: Seuil IoU pour fusion
Returns:
Liste de ROIs fusionnés
"""
if len(rois) <= 1:
return rois
# Trier par aire décroissante
rois = sorted(rois, key=lambda r: r.w * r.h, reverse=True)
merged = []
used = set()
for i, roi1 in enumerate(rois):
if i in used:
continue
# Trouver tous les ROIs qui se chevauchent
group = [roi1]
for j, roi2 in enumerate(rois[i+1:], start=i+1):
if j in used:
continue
# Calculer IoU
iou = self._calculate_iou(roi1, roi2)
if iou > iou_threshold:
group.append(roi2)
used.add(j)
# Fusionner le groupe
if len(group) == 1:
merged.append(roi1)
else:
merged_roi = self._merge_roi_group(group)
merged.append(merged_roi)
return merged
def _calculate_iou(self, roi1: ROI, roi2: ROI) -> float:
"""Calculer l'IoU entre deux ROIs"""
x1_inter = max(roi1.x, roi2.x)
y1_inter = max(roi1.y, roi2.y)
x2_inter = min(roi1.x + roi1.w, roi2.x + roi2.w)
y2_inter = min(roi1.y + roi1.h, roi2.y + roi2.h)
if x2_inter < x1_inter or y2_inter < y1_inter:
return 0.0
inter_area = (x2_inter - x1_inter) * (y2_inter - y1_inter)
union_area = (roi1.w * roi1.h) + (roi2.w * roi2.h) - inter_area
return inter_area / union_area if union_area > 0 else 0.0
def _merge_roi_group(self, rois: List[ROI]) -> ROI:
"""Fusionner un groupe de ROIs en un seul"""
min_x = min(r.x for r in rois)
min_y = min(r.y for r in rois)
max_x = max(r.x + r.w for r in rois)
max_y = max(r.y + r.h for r in rois)
avg_confidence = sum(r.confidence for r in rois) / len(rois)
return ROI(
x=min_x,
y=min_y,
w=max_x - min_x,
h=max_y - min_y,
confidence=avg_confidence,
roi_type="merged"
)
def scale_coordinates(self, x: int, y: int, scale_factor: float) -> Tuple[int, int]:
"""
Convertir des coordonnées de l'image redimensionnée vers l'originale
Args:
x, y: Coordonnées dans l'image redimensionnée
scale_factor: Facteur d'échelle utilisé
Returns:
(x_original, y_original)
"""
return (int(x / scale_factor), int(y / scale_factor))
def get_stats(self) -> Dict[str, Any]:
"""Obtenir les statistiques de l'optimiseur"""
stats = {
"total_frames_processed": self.total_frames_processed,
"total_frames_resized": self.total_frames_resized,
"resize_rate": self.total_frames_resized / self.total_frames_processed if self.total_frames_processed > 0 else 0.0,
"avg_processing_time_ms": (self.total_processing_time / self.total_frames_processed * 1000) if self.total_frames_processed > 0 else 0.0
}
if self.cache:
stats["cache"] = self.cache.get_stats()
return stats