- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
430 lines
14 KiB
Python
430 lines
14 KiB
Python
"""
|
|
OmniParser Adapter pour RPA Vision V3
|
|
|
|
Intègre Microsoft OmniParser v2 pour la détection d'éléments UI.
|
|
OmniParser combine détection d'icônes (YOLO) + OCR + captioning en un seul pipeline.
|
|
|
|
Avantages:
|
|
- Détection précise des petits éléments (icônes, boutons)
|
|
- OCR intégré
|
|
- Description sémantique des éléments
|
|
- 60% plus rapide que le pipeline OWL+OpenCV+VLM
|
|
|
|
Usage:
|
|
adapter = OmniParserAdapter()
|
|
elements = adapter.detect(screenshot_pil)
|
|
# elements est une liste de dicts avec bbox, label, type, etc.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import base64
|
|
import io
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
from PIL import Image
|
|
import numpy as np
|
|
|
|
# Ajouter OmniParser au path
|
|
OMNIPARSER_PATH = "/home/dom/ai/OmniParser"
|
|
if OMNIPARSER_PATH not in sys.path:
|
|
sys.path.insert(0, OMNIPARSER_PATH)
|
|
|
|
# Configuration des modèles OmniParser
|
|
OMNIPARSER_CONFIG = {
|
|
'som_model_path': os.path.join(OMNIPARSER_PATH, 'weights/icon_detect/model.pt'),
|
|
'caption_model_name': 'florence2',
|
|
'caption_model_path': os.path.join(OMNIPARSER_PATH, 'weights/icon_caption_florence'),
|
|
'BOX_TRESHOLD': 0.05, # Seuil bas pour détecter plus d'éléments
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class DetectedElement:
|
|
"""Élément UI détecté par OmniParser"""
|
|
bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) en pixels
|
|
bbox_normalized: Tuple[float, float, float, float] # (x1, y1, x2, y2) normalisé 0-1
|
|
label: str # Description de l'élément
|
|
element_type: str # 'icon', 'text', 'button', etc.
|
|
confidence: float
|
|
center: Tuple[int, int] # Centre en pixels
|
|
is_interactable: bool
|
|
|
|
|
|
class OmniParserAdapter:
|
|
"""
|
|
Adapter pour utiliser OmniParser dans RPA Vision V3.
|
|
|
|
OmniParser détecte tous les éléments UI d'un screenshot et retourne
|
|
leurs positions, descriptions et types.
|
|
"""
|
|
|
|
_instance = None
|
|
_initialized = False
|
|
|
|
def __new__(cls):
|
|
"""Singleton pour éviter de charger les modèles plusieurs fois"""
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
"""Initialise OmniParser (lazy loading)"""
|
|
if OmniParserAdapter._initialized:
|
|
return
|
|
|
|
self.omniparser = None
|
|
self.available = False
|
|
self._check_availability()
|
|
|
|
def _check_availability(self):
|
|
"""Vérifie si OmniParser est disponible"""
|
|
try:
|
|
# Vérifier que les fichiers de modèles existent
|
|
if not os.path.exists(OMNIPARSER_CONFIG['som_model_path']):
|
|
print(f"⚠️ [OmniParser] Modèle de détection non trouvé: {OMNIPARSER_CONFIG['som_model_path']}")
|
|
return
|
|
|
|
if not os.path.exists(OMNIPARSER_CONFIG['caption_model_path']):
|
|
print(f"⚠️ [OmniParser] Modèle de caption non trouvé: {OMNIPARSER_CONFIG['caption_model_path']}")
|
|
return
|
|
|
|
self.available = True
|
|
print("✅ [OmniParser] Modèles disponibles, chargement différé")
|
|
|
|
except Exception as e:
|
|
print(f"❌ [OmniParser] Erreur vérification: {e}")
|
|
self.available = False
|
|
|
|
def _load_models(self):
|
|
"""Charge les modèles OmniParser (lazy loading) avec GPU"""
|
|
if self.omniparser is not None:
|
|
return True
|
|
|
|
if not self.available:
|
|
return False
|
|
|
|
try:
|
|
import torch
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
print(f"🔄 [OmniParser] Chargement des modèles sur {device}...")
|
|
|
|
from util.omniparser import Omniparser
|
|
self.omniparser = Omniparser(OMNIPARSER_CONFIG)
|
|
|
|
# Forcer YOLO sur GPU si disponible
|
|
if device == 'cuda' and hasattr(self.omniparser, 'som_model'):
|
|
self.omniparser.som_model.to(device)
|
|
print(f"✅ [OmniParser] YOLO déplacé sur {device}")
|
|
|
|
OmniParserAdapter._initialized = True
|
|
print(f"✅ [OmniParser] Modèles chargés avec succès sur {device}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ [OmniParser] Erreur chargement modèles: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
self.available = False
|
|
return False
|
|
|
|
def detect(self, image: Image.Image) -> List[DetectedElement]:
|
|
"""
|
|
Détecte tous les éléments UI dans une image.
|
|
|
|
Args:
|
|
image: Image PIL du screenshot
|
|
|
|
Returns:
|
|
Liste de DetectedElement avec bbox, label, type, etc.
|
|
"""
|
|
if not self._load_models():
|
|
print("⚠️ [OmniParser] Non disponible, retourne liste vide")
|
|
return []
|
|
|
|
try:
|
|
# Convertir PIL en base64
|
|
buffered = io.BytesIO()
|
|
image.save(buffered, format="PNG")
|
|
image_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
|
|
|
|
W, H = image.size
|
|
print(f"📸 [OmniParser] Analyse image {W}x{H}...")
|
|
|
|
# Appel OmniParser
|
|
labeled_img, parsed_content = self.omniparser.parse(image_base64)
|
|
|
|
print(f"🎯 [OmniParser] {len(parsed_content)} éléments détectés")
|
|
|
|
# Convertir en DetectedElement
|
|
elements = []
|
|
for item in parsed_content:
|
|
elem = self._parse_item(item, W, H)
|
|
if elem:
|
|
elements.append(elem)
|
|
|
|
return elements
|
|
|
|
except Exception as e:
|
|
print(f"❌ [OmniParser] Erreur détection: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return []
|
|
|
|
def _parse_item(self, item: Any, width: int, height: int) -> Optional[DetectedElement]:
|
|
"""Parse un élément OmniParser en DetectedElement"""
|
|
try:
|
|
# Format OmniParser: {'bbox': [x1, y1, x2, y2], 'label': 'description', ...}
|
|
# Les bbox sont normalisées (0-1)
|
|
|
|
if isinstance(item, dict):
|
|
bbox_norm = item.get('bbox', item.get('box', []))
|
|
label = item.get('label', item.get('content', item.get('text', 'unknown')))
|
|
elif isinstance(item, (list, tuple)) and len(item) >= 2:
|
|
# Format alternatif: (bbox, label)
|
|
bbox_norm = item[0] if isinstance(item[0], (list, tuple)) else []
|
|
label = item[1] if len(item) > 1 else 'unknown'
|
|
else:
|
|
return None
|
|
|
|
if not bbox_norm or len(bbox_norm) < 4:
|
|
return None
|
|
|
|
x1_n, y1_n, x2_n, y2_n = bbox_norm[:4]
|
|
|
|
# Convertir en pixels
|
|
x1 = int(x1_n * width)
|
|
y1 = int(y1_n * height)
|
|
x2 = int(x2_n * width)
|
|
y2 = int(y2_n * height)
|
|
|
|
# Calculer le centre
|
|
cx = (x1 + x2) // 2
|
|
cy = (y1 + y2) // 2
|
|
|
|
# Déterminer le type d'élément
|
|
element_type = self._classify_element(label, x2-x1, y2-y1)
|
|
|
|
# Confiance (OmniParser ne fournit pas toujours)
|
|
confidence = item.get('confidence', item.get('score', 0.8))
|
|
|
|
return DetectedElement(
|
|
bbox=(x1, y1, x2, y2),
|
|
bbox_normalized=(x1_n, y1_n, x2_n, y2_n),
|
|
label=str(label),
|
|
element_type=element_type,
|
|
confidence=float(confidence),
|
|
center=(cx, cy),
|
|
is_interactable=self._is_interactable(label, element_type)
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ [OmniParser] Erreur parsing item: {e}")
|
|
return None
|
|
|
|
def _classify_element(self, label: str, width: int, height: int) -> str:
|
|
"""Classifie le type d'élément basé sur le label et la taille"""
|
|
label_lower = label.lower() if label else ""
|
|
|
|
# Mots-clés pour classification
|
|
icon_keywords = ['icon', 'logo', 'image', 'picture', 'symbol']
|
|
button_keywords = ['button', 'btn', 'click', 'submit', 'ok', 'cancel', 'close']
|
|
input_keywords = ['input', 'text field', 'search', 'textbox', 'entry']
|
|
menu_keywords = ['menu', 'dropdown', 'select', 'option']
|
|
|
|
for kw in icon_keywords:
|
|
if kw in label_lower:
|
|
return 'icon'
|
|
|
|
for kw in button_keywords:
|
|
if kw in label_lower:
|
|
return 'button'
|
|
|
|
for kw in input_keywords:
|
|
if kw in label_lower:
|
|
return 'input'
|
|
|
|
for kw in menu_keywords:
|
|
if kw in label_lower:
|
|
return 'menu'
|
|
|
|
# Classification par taille
|
|
if width < 50 and height < 50:
|
|
return 'icon'
|
|
elif width > 100 and height < 40:
|
|
return 'input'
|
|
elif width < 150 and height < 50:
|
|
return 'button'
|
|
|
|
return 'element'
|
|
|
|
def _is_interactable(self, label: str, element_type: str) -> bool:
|
|
"""Détermine si l'élément est interactable"""
|
|
interactable_types = {'button', 'input', 'icon', 'menu', 'link', 'checkbox'}
|
|
return element_type in interactable_types
|
|
|
|
def find_element(
|
|
self,
|
|
screenshot: Image.Image,
|
|
anchor: Image.Image,
|
|
threshold: float = 0.5
|
|
) -> Optional[Tuple[int, int, str]]:
|
|
"""
|
|
Trouve un élément spécifique dans le screenshot en comparant avec une ancre.
|
|
|
|
Stratégie:
|
|
1. Détecte tous les éléments avec OmniParser
|
|
2. Pour chaque élément, compare avec l'ancre via template matching
|
|
3. Retourne le meilleur match
|
|
|
|
Args:
|
|
screenshot: Screenshot complet
|
|
anchor: Image de l'élément à trouver
|
|
threshold: Seuil de similarité (0-1)
|
|
|
|
Returns:
|
|
(x, y, method) si trouvé, None sinon
|
|
"""
|
|
import cv2
|
|
|
|
elements = self.detect(screenshot)
|
|
if not elements:
|
|
print("⚠️ [OmniParser] Aucun élément détecté")
|
|
return None
|
|
|
|
print(f"🔍 [OmniParser] Recherche parmi {len(elements)} éléments...")
|
|
|
|
# Convertir images en arrays
|
|
screenshot_np = np.array(screenshot)
|
|
anchor_np = np.array(anchor)
|
|
|
|
if len(screenshot_np.shape) == 3:
|
|
screenshot_gray = cv2.cvtColor(screenshot_np, cv2.COLOR_RGB2GRAY)
|
|
else:
|
|
screenshot_gray = screenshot_np
|
|
|
|
if len(anchor_np.shape) == 3:
|
|
anchor_gray = cv2.cvtColor(anchor_np, cv2.COLOR_RGB2GRAY)
|
|
else:
|
|
anchor_gray = anchor_np
|
|
|
|
best_match = None
|
|
best_score = -1
|
|
|
|
anchor_h, anchor_w = anchor_gray.shape[:2]
|
|
|
|
for elem in elements:
|
|
x1, y1, x2, y2 = elem.bbox
|
|
|
|
# Extraire la région
|
|
region = screenshot_gray[y1:y2, x1:x2]
|
|
|
|
if region.size == 0:
|
|
continue
|
|
|
|
# Resize pour matcher la taille de l'ancre
|
|
try:
|
|
region_resized = cv2.resize(region, (anchor_w, anchor_h))
|
|
|
|
# Template matching
|
|
result = cv2.matchTemplate(
|
|
region_resized,
|
|
anchor_gray,
|
|
cv2.TM_CCOEFF_NORMED
|
|
)
|
|
_, max_val, _, _ = cv2.minMaxLoc(result)
|
|
|
|
if max_val > best_score:
|
|
best_score = max_val
|
|
best_match = elem
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
if best_match and best_score >= threshold:
|
|
cx, cy = best_match.center
|
|
print(f"✅ [OmniParser] Trouvé: '{best_match.label}' à ({cx}, {cy}) score={best_score:.2f}")
|
|
return (cx, cy, f"omniparser_{best_match.element_type}")
|
|
|
|
print(f"⚠️ [OmniParser] Aucun match >= {threshold} (best={best_score:.2f})")
|
|
return None
|
|
|
|
def find_by_description(
|
|
self,
|
|
screenshot: Image.Image,
|
|
description: str,
|
|
threshold: float = 0.3
|
|
) -> Optional[Tuple[int, int, str]]:
|
|
"""
|
|
Trouve un élément par sa description textuelle.
|
|
|
|
Args:
|
|
screenshot: Screenshot complet
|
|
description: Description de l'élément ("bouton Document", "icône Excel", etc.)
|
|
threshold: Seuil de similarité textuelle
|
|
|
|
Returns:
|
|
(x, y, method) si trouvé, None sinon
|
|
"""
|
|
elements = self.detect(screenshot)
|
|
if not elements:
|
|
return None
|
|
|
|
description_lower = description.lower()
|
|
description_words = set(description_lower.split())
|
|
|
|
best_match = None
|
|
best_score = 0
|
|
|
|
for elem in elements:
|
|
label_lower = elem.label.lower()
|
|
label_words = set(label_lower.split())
|
|
|
|
# Score basé sur les mots communs
|
|
common_words = description_words & label_words
|
|
if description_words:
|
|
score = len(common_words) / len(description_words)
|
|
else:
|
|
score = 0
|
|
|
|
# Bonus si le type correspond
|
|
if elem.element_type in description_lower:
|
|
score += 0.2
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_match = elem
|
|
|
|
if best_match and best_score >= threshold:
|
|
cx, cy = best_match.center
|
|
print(f"✅ [OmniParser] Match description: '{best_match.label}' à ({cx}, {cy}) score={best_score:.2f}")
|
|
return (cx, cy, "omniparser_description")
|
|
|
|
return None
|
|
|
|
|
|
# Instance globale (singleton)
|
|
_omniparser_instance: Optional[OmniParserAdapter] = None
|
|
|
|
|
|
def get_omniparser() -> OmniParserAdapter:
|
|
"""Retourne l'instance singleton d'OmniParser"""
|
|
global _omniparser_instance
|
|
if _omniparser_instance is None:
|
|
_omniparser_instance = OmniParserAdapter()
|
|
return _omniparser_instance
|
|
|
|
|
|
def detect_elements(image: Image.Image) -> List[DetectedElement]:
|
|
"""Fonction utilitaire pour détecter les éléments"""
|
|
return get_omniparser().detect(image)
|
|
|
|
|
|
def find_element(
|
|
screenshot: Image.Image,
|
|
anchor: Image.Image,
|
|
threshold: float = 0.5
|
|
) -> Optional[Tuple[int, int, str]]:
|
|
"""Fonction utilitaire pour trouver un élément"""
|
|
return get_omniparser().find_element(screenshot, anchor, threshold)
|