Files
rpa_vision_v3/core/grounding/fast_pipeline.py
Dom b30d4b6656 feat(grounding): Phase 4 — Pipeline orchestré FAST→SMART→THINK
FastSmartThinkPipeline (core/grounding/fast_pipeline.py) :
- Cascade : FAST detect (120ms) → SMART match (<1ms) → THINK VLM si doute (3s)
- Seuils : ≥0.90 action directe, 0.60-0.90 VLM confirme, <0.60 VLM cherche
- Apprentissage automatique : SignatureStore enrichie à chaque succès
- Ancien pipeline en fallback (safety net)
- Singleton via get_instance()

Validé sur 5 éléments :
- 1ère exécution : 5/5 OK via smart_think_confirmed (24.5s total)
- 2ème exécution : 4/5 en FAST direct, 1/5 en THINK (10.5s total)
- L'apprentissage réduit le temps de 20x par élément connu

Module standalone — aucun impact sur le système existant.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 20:54:40 +02:00

217 lines
7.9 KiB
Python

"""
core/grounding/fast_pipeline.py — Pipeline FAST → SMART → THINK
Orchestrateur central : détecte les éléments (FAST), matche avec la cible (SMART),
et demande au VLM de trancher si le score est trop bas (THINK).
Seuils de confiance :
≥ 0.90 → action directe (FAST/SMART)
0.60-0.90 → VLM confirme (THINK)
< 0.60 → VLM cherche seul (THINK)
L'ancien GroundingPipeline est utilisé en fallback si tout échoue.
Utilisation :
from core.grounding.fast_pipeline import FastSmartThinkPipeline
from core.grounding.target import GroundingTarget
pipeline = FastSmartThinkPipeline()
result = pipeline.locate(GroundingTarget(text="Valider"))
if result:
print(f"({result.x}, {result.y}) via {result.method} en {result.time_ms:.0f}ms")
"""
from __future__ import annotations
import time
import threading
from typing import Optional
from core.grounding.target import GroundingTarget, GroundingResult
from core.grounding.fast_types import LocateResult
from core.grounding.fast_detector import FastDetector
from core.grounding.smart_matcher import SmartMatcher
from core.grounding.think_arbiter import ThinkArbiter
from core.grounding.element_signature import SignatureStore
# Singleton
_instance: Optional[FastSmartThinkPipeline] = None
_instance_lock = threading.Lock()
class FastSmartThinkPipeline:
"""Pipeline FAST → SMART → THINK pour la localisation d'éléments UI.
Chaque appel à locate() suit la cascade :
1. FAST : détection RF-DETR + OCR enrichissement (~120ms+1s)
2. SMART : matching texte/type/position/voisins (< 1ms)
3. THINK : VLM arbitre si score insuffisant (~3-5s)
4. Fallback : ancien pipeline si tout échoue
"""
def __init__(
self,
confidence_direct: float = 0.90,
confidence_think: float = 0.60,
enable_think: bool = True,
enable_learning: bool = True,
):
self.confidence_direct = confidence_direct
self.confidence_think = confidence_think
self.enable_think = enable_think
self.enable_learning = enable_learning
self._detector = FastDetector()
self._matcher = SmartMatcher()
self._arbiter = ThinkArbiter()
self._signatures = SignatureStore()
self._fallback_pipeline = None
@classmethod
def get_instance(cls) -> FastSmartThinkPipeline:
"""Retourne l'instance singleton."""
global _instance
if _instance is None:
with _instance_lock:
if _instance is None:
_instance = cls()
return _instance
def set_fallback_pipeline(self, pipeline) -> None:
"""Configure l'ancien pipeline comme safety net."""
self._fallback_pipeline = pipeline
# ------------------------------------------------------------------
# API principale
# ------------------------------------------------------------------
def locate(
self,
target: GroundingTarget,
screenshot_pil=None,
phash: str = "",
window_title: str = "",
) -> Optional[GroundingResult]:
"""Localise un élément UI via la cascade FAST → SMART → THINK.
Args:
target: Ce qu'on cherche (texte, description, bbox d'origine).
screenshot_pil: Image PIL. Si None, capture via mss.
phash: Hash perceptuel pour le cache.
window_title: Titre de la fenêtre active.
Returns:
GroundingResult compatible avec le pipeline existant, ou None.
"""
t0 = time.time()
# --- FAST : détecter tous les éléments ---
snapshot = self._detector.detect(
screenshot_pil=screenshot_pil,
phash=phash,
window_title=window_title,
)
if not snapshot.elements:
print(f"⚡ [Pipeline] FAST : aucun élément détecté")
return self._try_fallback(target)
# --- Lookup signature apprise ---
target_key = SignatureStore.make_target_key(
target.text or "", target.description or ""
)
screen_ctx = SignatureStore.make_screen_context(
window_title, snapshot.resolution
)
signature = self._signatures.lookup(target_key, screen_ctx)
# --- SMART : matcher avec la cible ---
candidate = self._matcher.match(snapshot, target, signature)
if candidate:
dt = (time.time() - t0) * 1000
# Score suffisant → action directe
if candidate.score >= self.confidence_direct:
print(f"✅ [Pipeline] FAST→SMART direct : '{candidate.element.ocr_text}' "
f"score={candidate.score:.3f} ({candidate.method}) "
f"→ ({candidate.element.center[0]}, {candidate.element.center[1]}) "
f"en {dt:.0f}ms")
# Apprentissage
if self.enable_learning:
self._signatures.record_success(
target_key, screen_ctx,
candidate.element, candidate.score,
)
return GroundingResult(
x=candidate.element.center[0],
y=candidate.element.center[1],
method=f"fast_{candidate.method}",
confidence=candidate.score,
time_ms=dt,
)
# Score moyen → demander au VLM de confirmer
if candidate.score >= self.confidence_think and self.enable_think:
print(f"🤔 [Pipeline] SMART score={candidate.score:.3f} — THINK pour confirmer")
think_result = self._arbiter.arbitrate(
target,
candidates=[candidate],
screenshot_pil=screenshot_pil or snapshot.elements[0] if False else screenshot_pil,
)
dt = (time.time() - t0) * 1000
if think_result:
# VLM a confirmé
if self.enable_learning:
self._signatures.record_success(
target_key, screen_ctx,
candidate.element, think_result.confidence,
)
return GroundingResult(
x=think_result.x, y=think_result.y,
method="smart_think_confirmed",
confidence=think_result.confidence,
time_ms=dt,
)
# --- THINK : score trop bas ou pas de candidat → VLM cherche seul ---
if self.enable_think:
score_info = f"score={candidate.score:.3f}" if candidate else "aucun candidat"
print(f"🤔 [Pipeline] {score_info} — THINK recherche complète")
think_result = self._arbiter.arbitrate(
target, candidates=[], screenshot_pil=screenshot_pil,
)
dt = (time.time() - t0) * 1000
if think_result:
return GroundingResult(
x=think_result.x, y=think_result.y,
method="think_vlm",
confidence=think_result.confidence,
time_ms=dt,
)
# --- Fallback : ancien pipeline ---
return self._try_fallback(target)
# ------------------------------------------------------------------
# Fallback
# ------------------------------------------------------------------
def _try_fallback(self, target: GroundingTarget) -> Optional[GroundingResult]:
"""Tente l'ancien pipeline en dernier recours."""
if self._fallback_pipeline is None:
print(f"❌ [Pipeline] Aucune méthode n'a trouvé '{target.text}'")
return None
print(f"⚠️ [Pipeline] Fallback ancien pipeline pour '{target.text}'")
try:
return self._fallback_pipeline.locate(target)
except Exception as ex:
print(f"⚠️ [Pipeline] Fallback échoué: {ex}")
return None