feat(grounding): Phase 1-2 pipeline FAST→SMART — détection + matching

Phase 1 — FastDetector (core/grounding/fast_detector.py) :
- Détection RF-DETR de tous les éléments UI (~120ms à chaud)
- Enrichissement OCR (texte, voisins, position relative)
- Cache pHash (même écran → résultat instantané)
- 23 éléments détectés sur le benchmark, positions correctes

Phase 2 — SmartMatcher (core/grounding/smart_matcher.py) :
- Matching déterministe : texte exact (score 0.95) puis fuzzy (0.70+)
- Matching probabiliste : type, position, voisins contextuels
- Score combiné pondéré → seuil de confiance
- 5/5 éléments trouvés en < 1ms, 0 faux positif
- "Gorbeille" matche "Corbeille" par fuzzy (score 0.678)

Structures (core/grounding/fast_types.py) :
- DetectedUIElement, ScreenSnapshot, MatchCandidate, LocateResult
- Compatible GroundingResult via to_grounding_result()

Modules standalone — aucun impact sur le système existant.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-25 20:37:14 +02:00
parent 9da589c8c2
commit ea36bba5cc
3 changed files with 634 additions and 0 deletions

View File

@@ -0,0 +1,290 @@
"""
core/grounding/fast_detector.py — Layer FAST : détection rapide des éléments UI
Capture l'écran, détecte tous les éléments UI via RF-DETR (~120ms),
enrichit chaque élément avec le texte OCR et le contexte spatial.
Produit un ScreenSnapshot utilisable par le SmartMatcher.
Utilisation :
from core.grounding.fast_detector import FastDetector
detector = FastDetector()
snapshot = detector.detect()
print(f"{len(snapshot.elements)} éléments en {snapshot.total_time_ms:.0f}ms")
"""
from __future__ import annotations
import math
import time
from typing import Any, Dict, List, Optional, Tuple
from core.grounding.fast_types import DetectedUIElement, ScreenSnapshot
class FastDetector:
"""Détection rapide de tous les éléments UI visibles sur l'écran.
Combine RF-DETR (détection bbox) + docTR (OCR) pour produire
un ScreenSnapshot enrichi.
Le modèle RF-DETR est un singleton chargé au premier appel (~1s),
puis les appels suivants sont rapides (~120ms).
"""
def __init__(self, detection_threshold: float = 0.30):
self.detection_threshold = detection_threshold
self._last_snapshot: Optional[ScreenSnapshot] = None
self._last_phash: str = ""
def detect(
self,
screenshot_pil: Optional[Any] = None,
phash: str = "",
window_title: str = "",
) -> ScreenSnapshot:
"""Détecte et enrichit tous les éléments UI de l'écran.
Args:
screenshot_pil: Image PIL. Si None, capture via mss.
phash: Hash perceptuel pour le cache. Si identique au dernier, réutilise le cache.
window_title: Titre de la fenêtre active.
Returns:
ScreenSnapshot avec tous les éléments enrichis.
"""
t0 = time.time()
# Cache : même écran → même résultat
if phash and phash == self._last_phash and self._last_snapshot is not None:
print(f"⚡ [FAST] Cache hit (pHash identique)")
return self._last_snapshot
# Capture si pas fourni
if screenshot_pil is None:
screenshot_pil = self._capture_screen()
if screenshot_pil is None:
return ScreenSnapshot(elements=[], ocr_words=[], resolution=(0, 0))
w, h = screenshot_pil.size
# --- Détection RF-DETR (~120ms) ---
t_det = time.time()
raw_elements = self._detect_rfdetr(screenshot_pil)
detection_ms = (time.time() - t_det) * 1000
# --- OCR sur les crops des éléments détectés (pas full screen) ---
t_ocr = time.time()
ocr_words = self._ocr_extract(screenshot_pil)
ocr_ms = (time.time() - t_ocr) * 1000
# --- Enrichissement : attribuer texte + voisins + position ---
enriched = self._enrich_elements(raw_elements, ocr_words, w, h)
total_ms = (time.time() - t0) * 1000
snapshot = ScreenSnapshot(
elements=enriched,
ocr_words=ocr_words,
resolution=(w, h),
window_title=window_title,
phash=phash,
detection_time_ms=detection_ms,
ocr_time_ms=ocr_ms,
total_time_ms=total_ms,
)
# Mettre en cache
if phash:
self._last_phash = phash
self._last_snapshot = snapshot
print(f"⚡ [FAST] {len(enriched)} éléments détectés en {total_ms:.0f}ms "
f"(det={detection_ms:.0f}ms, ocr={ocr_ms:.0f}ms)")
return snapshot
# ------------------------------------------------------------------
# Détection RF-DETR
# ------------------------------------------------------------------
def _detect_rfdetr(self, image) -> List[DetectedUIElement]:
"""Détecte les éléments via RF-DETR (réutilise le singleton existant)."""
try:
import sys
sys.path.insert(0, 'visual_workflow_builder/backend')
from services.ui_detection_service import detect_ui_elements
result = detect_ui_elements(image, threshold=self.detection_threshold)
elements = []
for e in result.elements:
x1 = e.bbox["x1"]
y1 = e.bbox["y1"]
x2 = e.bbox["x2"]
y2 = e.bbox["y2"]
elements.append(DetectedUIElement(
id=e.id,
bbox=(x1, y1, x2, y2),
center=(e.center["x"], e.center["y"]),
confidence=e.confidence,
))
return elements
except Exception as ex:
print(f"⚠️ [FAST/detect] RF-DETR erreur: {ex}")
return []
# ------------------------------------------------------------------
# OCR
# ------------------------------------------------------------------
def _ocr_extract(self, image) -> List[Dict[str, Any]]:
"""Extrait les mots visibles via docTR."""
try:
import sys
sys.path.insert(0, 'visual_workflow_builder/backend')
from services.ocr_service import ocr_extract_words
words = ocr_extract_words(image)
return words if words else []
except Exception as ex:
print(f"⚠️ [FAST/ocr] docTR erreur: {ex}")
return []
# ------------------------------------------------------------------
# Enrichissement
# ------------------------------------------------------------------
def _enrich_elements(
self,
elements: List[DetectedUIElement],
ocr_words: List[Dict[str, Any]],
screen_w: int,
screen_h: int,
) -> List[DetectedUIElement]:
"""Enrichit chaque élément avec texte OCR, voisins et position relative."""
for elem in elements:
# 1. Attribuer le texte OCR par intersection bbox
elem.ocr_text = self._assign_ocr_text(elem, ocr_words)
# 2. Position relative dans l'écran (grille 3x3)
elem.relative_position = self._compute_relative_position(
elem.center, screen_w, screen_h
)
# 3. Classifier le type d'élément (heuristique taille + ratio)
elem.element_type = self._classify_element_type(elem)
# 4. Calculer les voisins (texte des éléments proches)
for elem in elements:
elem.neighbors = self._find_neighbors(elem, elements)
return elements
def _assign_ocr_text(
self,
elem: DetectedUIElement,
ocr_words: List[Dict[str, Any]],
) -> str:
"""Attribue le texte OCR à un élément par intersection géométrique."""
x1, y1, x2, y2 = elem.bbox
# Élargir la bbox de 20% pour capturer le texte autour
margin_x = int((x2 - x1) * 0.2)
margin_y = int((y2 - y1) * 0.2)
ex1, ey1 = x1 - margin_x, y1 - margin_y
ex2, ey2 = x2 + margin_x, y2 + margin_y
texts = []
for word in ocr_words:
wb = word.get('bbox', [0, 0, 0, 0])
if len(wb) < 4:
continue
wx1, wy1, wx2, wy2 = wb[0], wb[1], wb[2], wb[3]
# Intersection ?
if wx1 < ex2 and wx2 > ex1 and wy1 < ey2 and wy2 > ey1:
text = word.get('text', '').strip()
if text and len(text) > 1:
texts.append(text)
return ' '.join(texts)
@staticmethod
def _compute_relative_position(
center: Tuple[int, int],
screen_w: int,
screen_h: int,
) -> str:
"""Calcule la position relative dans une grille 3x3."""
cx, cy = center
col = "left" if cx < screen_w / 3 else ("right" if cx > 2 * screen_w / 3 else "center")
row = "top" if cy < screen_h / 3 else ("bottom" if cy > 2 * screen_h / 3 else "middle")
return f"{row}_{col}"
@staticmethod
def _classify_element_type(elem: DetectedUIElement) -> str:
"""Classifie le type d'élément par heuristique taille/ratio."""
w, h = elem.width, elem.height
if w == 0 or h == 0:
return "element"
ratio = w / h
area = w * h
# Petit carré → icône
if area < 5000 and 0.5 < ratio < 2.0:
return "icon"
# Large et fin → bouton ou champ
if ratio > 3.0 and h < 60:
return "input"
if ratio > 2.0 and h < 50:
return "button"
# Grand bloc → zone de contenu
if area > 50000:
return "container"
return "element"
@staticmethod
def _find_neighbors(
elem: DetectedUIElement,
all_elements: List[DetectedUIElement],
max_neighbors: int = 5,
) -> List[str]:
"""Trouve les textes OCR des éléments proches (rayon 1.5x diagonale)."""
diag = math.sqrt(elem.width**2 + elem.height**2)
radius = max(diag * 1.5, 100) # minimum 100px
neighbors = []
for other in all_elements:
if other.id == elem.id or not other.ocr_text:
continue
dx = other.center[0] - elem.center[0]
dy = other.center[1] - elem.center[1]
dist = math.sqrt(dx**2 + dy**2)
if dist < radius:
neighbors.append(other.ocr_text)
return neighbors[:max_neighbors]
# ------------------------------------------------------------------
# Capture écran
# ------------------------------------------------------------------
@staticmethod
def _capture_screen():
"""Capture l'écran via mss."""
try:
import mss
from PIL import Image
with mss.mss() as sct:
mon = sct.monitors[0]
grab = sct.grab(mon)
return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX')
except Exception as ex:
print(f"⚠️ [FAST/capture] Erreur: {ex}")
return None

View File

@@ -0,0 +1,81 @@
"""
core/grounding/fast_types.py — Structures de données pour le pipeline FAST→SMART→THINK
Utilisées exclusivement par le pipeline de localisation rapide.
Compatibles avec GroundingTarget/GroundingResult existants via conversion.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class DetectedUIElement:
"""Élément UI détecté par le layer FAST (RF-DETR) puis enrichi par OCR."""
id: int
bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) pixels absolus
center: Tuple[int, int] # (cx, cy)
confidence: float # confidence détecteur (0-1)
element_type: str = "element" # "button", "input", "icon", "text", "element"
ocr_text: str = "" # texte OCR extrait de la région
neighbors: List[str] = field(default_factory=list) # textes des éléments proches
relative_position: str = "" # "top_left", "center", "bottom_right", etc.
@property
def width(self) -> int:
return self.bbox[2] - self.bbox[0]
@property
def height(self) -> int:
return self.bbox[3] - self.bbox[1]
@property
def area(self) -> int:
return self.width * self.height
@dataclass
class ScreenSnapshot:
"""État complet de l'écran à un instant t — sortie du layer FAST."""
elements: List[DetectedUIElement]
ocr_words: List[Dict[str, Any]] # mots OCR bruts [{text, bbox}]
resolution: Tuple[int, int] # (width, height)
window_title: str = ""
phash: str = ""
detection_time_ms: float = 0.0
ocr_time_ms: float = 0.0
total_time_ms: float = 0.0
@dataclass
class MatchCandidate:
"""Résultat du matching SMART pour un élément candidat."""
element: DetectedUIElement
score: float # score combiné (0-1)
score_detail: Dict[str, float] = field(default_factory=dict)
method: str = "" # "exact_text", "fuzzy_text", "position", etc.
@dataclass
class LocateResult:
"""Résultat final du pipeline FAST→SMART→THINK."""
x: int
y: int
confidence: float
method: str # "fast_exact", "fast_fuzzy", "smart_vote", "think_vlm"
time_ms: float
tier: str = "fast" # "fast", "smart", "think"
element: Optional[DetectedUIElement] = None
candidates_count: int = 0
def to_grounding_result(self):
"""Conversion vers GroundingResult pour compatibilité."""
from core.grounding.target import GroundingResult
return GroundingResult(
x=self.x, y=self.y,
method=self.method,
confidence=self.confidence,
time_ms=self.time_ms,
)

View File

@@ -0,0 +1,263 @@
"""
core/grounding/smart_matcher.py — Layer SMART : matching déterministe/probabiliste
Étant donné un ScreenSnapshot (tous les éléments détectés) et un GroundingTarget
(ce qu'on cherche), trouve l'élément correspondant avec un score de confiance.
Pipeline de matching (court-circuit au premier match haute confiance) :
1. Texte exact (2ms) → score 0.95
2. Texte fuzzy ratio (5ms) → score 0.70-0.90
3. Type + position (2ms) → bonus/malus
4. Voisins contextuels (5ms) → bonus
5. Score combiné → MatchCandidate
Utilisation :
from core.grounding.smart_matcher import SmartMatcher
from core.grounding.fast_types import ScreenSnapshot
from core.grounding.target import GroundingTarget
matcher = SmartMatcher()
candidate = matcher.match(snapshot, GroundingTarget(text="Valider"))
if candidate and candidate.score >= 0.90:
print(f"Match direct : ({candidate.element.center}) score={candidate.score}")
"""
from __future__ import annotations
import re
from difflib import SequenceMatcher
from typing import Dict, List, Optional
from core.grounding.fast_types import DetectedUIElement, MatchCandidate, ScreenSnapshot
from core.grounding.target import GroundingTarget
class SmartMatcher:
"""Matching intelligent entre une cible et les éléments détectés.
Combine plusieurs signaux (texte, type, position, voisins) en un score
de confiance unique pour chaque candidat.
"""
def __init__(
self,
weight_text: float = 0.50,
weight_type: float = 0.10,
weight_position: float = 0.15,
weight_neighbors: float = 0.25,
):
self.w_text = weight_text
self.w_type = weight_type
self.w_position = weight_position
self.w_neighbors = weight_neighbors
def match(
self,
snapshot: ScreenSnapshot,
target: GroundingTarget,
signature: Optional[Dict] = None,
) -> Optional[MatchCandidate]:
"""Trouve le MEILLEUR élément correspondant à la cible.
Returns:
Le MatchCandidate avec le score le plus élevé, ou None si aucun match.
"""
candidates = self.match_all(snapshot, target, signature)
if not candidates:
return None
return candidates[0]
def match_all(
self,
snapshot: ScreenSnapshot,
target: GroundingTarget,
signature: Optional[Dict] = None,
) -> List[MatchCandidate]:
"""Trouve TOUS les candidats triés par score décroissant.
Args:
snapshot: État de l'écran (éléments détectés + OCR).
target: Ce qu'on cherche (texte, description, bbox d'origine).
signature: Signature apprise (optionnel, enrichit le matching).
Returns:
Liste de MatchCandidate triée par score décroissant.
"""
if not snapshot.elements:
return []
target_text = (target.text or "").strip()
target_desc = (target.description or "").strip()
search_text = target_text or target_desc
if not search_text:
return []
candidates = []
search_lower = self._normalize(search_text)
for elem in snapshot.elements:
score_detail: Dict[str, float] = {}
method = ""
# --- 1. Score texte ---
text_score = self._score_text(search_lower, elem.ocr_text)
score_detail["text"] = text_score
if text_score >= 0.95:
method = "exact_text"
elif text_score >= 0.70:
method = "fuzzy_text"
# --- 2. Score type (si signature connue) ---
type_score = 0.5 # neutre par défaut
if signature and signature.get("element_type"):
if elem.element_type == signature["element_type"]:
type_score = 1.0
elif elem.element_type == "element":
type_score = 0.5 # non classifié, neutre
else:
type_score = 0.2
score_detail["type"] = type_score
# --- 3. Score position (si bbox d'origine connue) ---
position_score = 0.5 # neutre
if target.original_bbox:
position_score = self._score_position(
elem.center, target.original_bbox,
snapshot.resolution[0], snapshot.resolution[1],
)
elif signature and signature.get("relative_position"):
if elem.relative_position == signature["relative_position"]:
position_score = 0.9
else:
position_score = 0.3
score_detail["position"] = position_score
# --- 4. Score voisins (si signature connue) ---
neighbor_score = 0.5 # neutre
if signature and signature.get("neighbors"):
neighbor_score = self._score_neighbors(
elem.neighbors, signature["neighbors"]
)
score_detail["neighbors"] = neighbor_score
# --- Score combiné ---
combined = (
self.w_text * text_score
+ self.w_type * type_score
+ self.w_position * position_score
+ self.w_neighbors * neighbor_score
)
# Seuil minimum : pas de candidat si le texte ne matche pas du tout
if text_score < 0.30:
continue
if not method:
method = "combined"
candidates.append(MatchCandidate(
element=elem,
score=combined,
score_detail=score_detail,
method=method,
))
# Trier par score décroissant
candidates.sort(key=lambda c: c.score, reverse=True)
return candidates
# ------------------------------------------------------------------
# Scoring texte
# ------------------------------------------------------------------
def _score_text(self, search: str, ocr_text: str) -> float:
"""Score de similarité textuelle (0-1)."""
if not ocr_text:
return 0.0
ocr_lower = self._normalize(ocr_text)
# Match exact
if search == ocr_lower:
return 1.0
# Inclusion (l'un contient l'autre)
if search in ocr_lower or ocr_lower in search:
overlap = min(len(search), len(ocr_lower))
total = max(len(search), len(ocr_lower))
if total > 0:
return 0.70 + 0.25 * (overlap / total)
# Fuzzy matching (SequenceMatcher, standard library)
ratio = SequenceMatcher(None, search, ocr_lower).ratio()
if ratio >= 0.60:
return 0.50 + 0.40 * ratio
return ratio * 0.3
# ------------------------------------------------------------------
# Scoring position
# ------------------------------------------------------------------
@staticmethod
def _score_position(
center: tuple,
original_bbox: dict,
screen_w: int,
screen_h: int,
) -> float:
"""Score de proximité par rapport à la position d'origine (0-1)."""
if not original_bbox:
return 0.5
orig_x = original_bbox.get("x", 0) + original_bbox.get("width", 0) / 2
orig_y = original_bbox.get("y", 0) + original_bbox.get("height", 0) / 2
dx = abs(center[0] - orig_x) / max(screen_w, 1)
dy = abs(center[1] - orig_y) / max(screen_h, 1)
distance_norm = (dx**2 + dy**2) ** 0.5
# distance 0 = score 1.0, distance 0.5 (demi-écran) = score ~0.2
return max(0.0, 1.0 - distance_norm * 2.0)
# ------------------------------------------------------------------
# Scoring voisins
# ------------------------------------------------------------------
@staticmethod
def _score_neighbors(
current_neighbors: List[str],
expected_neighbors: List[str],
) -> float:
"""Score Jaccard sur les ensembles de mots voisins (0-1)."""
if not expected_neighbors:
return 0.5
current_set = {n.lower().strip() for n in current_neighbors if n}
expected_set = {n.lower().strip() for n in expected_neighbors if n}
if not current_set and not expected_set:
return 0.5
intersection = current_set & expected_set
union = current_set | expected_set
if not union:
return 0.5
return len(intersection) / len(union)
# ------------------------------------------------------------------
# Utilitaires
# ------------------------------------------------------------------
@staticmethod
def _normalize(text: str) -> str:
"""Normalise un texte pour la comparaison."""
text = text.lower().strip()
text = re.sub(r'[_\-\./\\]', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text