perf: boucle fermée pHash (2s→150ms) + batch CLIP (90 appels→1)
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 13s
security-audit / pip-audit (CVE dépendances) (push) Successful in 10s
security-audit / Scan secrets (grep) (push) Successful in 9s
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 14s
tests / Tests sécurité (critique) (push) Has been skipped

Boucle fermée : time.sleep(2.0) remplacé par _wait_for_screen_change()
qui poll le pHash toutes les 150ms. Sort dès que l'écran change.
4 occurrences remplacées.

Batch CLIP : filtre par distance AVANT le CLIP (90→~20 éléments),
puis embed_image_batch() en un seul appel GPU + np.dot vectorisé.

Estimé : 42s→~20s total workflow.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-21 19:33:42 +02:00
parent 552e66dbf6
commit 6caab2c600
2 changed files with 143 additions and 51 deletions

View File

@@ -24,6 +24,48 @@ from . import api_v3_bp
logger = logging.getLogger(__name__)
def _wait_for_screen_change(pre_screen, max_wait=2.0, poll_interval=0.15):
"""
Attend que l'écran change après un clic, au lieu d'un sleep fixe.
Compare le pHash de l'écran actuel avec le screenshot pré-clic.
Sort dès que la distance de Hamming >= 5 (changement détecté).
Fallback: sort après max_wait secondes si aucun changement.
Args:
pre_screen: PIL Image du screenshot avant le clic (peut être None)
max_wait: Temps max d'attente en secondes (défaut 2.0)
poll_interval: Intervalle de polling en secondes (défaut 0.15)
"""
if pre_screen is None:
# Pas de screenshot pré-clic, fallback sur sleep classique
time.sleep(max_wait)
return
try:
from core.analytics.screen_change_detector import compute_phash
import mss as _mss
from PIL import Image as _PILImage
_pre_hash = compute_phash(pre_screen)
_start = time.time()
while time.time() - _start < max_wait:
time.sleep(poll_interval)
try:
with _mss.mss() as _sct:
_grab = _sct.grab(_sct.monitors[0])
_post_screen = _PILImage.frombytes('RGB', _grab.size, _grab.bgra, 'raw', 'BGRX')
_post_hash = compute_phash(_post_screen)
_dist = _pre_hash - _post_hash
if _dist >= 5: # Écran a changé
break
except Exception:
break
except Exception:
# Si pHash/mss non disponible, fallback sur sleep
time.sleep(max_wait)
from core.execution.input_handler import (
safe_type_text as _shared_safe_type_text,
check_screen_for_patterns as _shared_check_patterns,
@@ -722,6 +764,17 @@ def execute_action_with_coords(action_type: str, params: dict, coords: dict) ->
x, y = coords['x'], coords['y']
print(f"🖱️ [Self-Healing] Clic aux coordonnées choisies: ({x}, {y})")
# Capture pré-clic pour détection de changement
_pre_screen = None
try:
import mss as _mss
from PIL import Image as _PILImage
with _mss.mss() as _sct:
_grab = _sct.grab(_sct.monitors[0])
_pre_screen = _PILImage.frombytes('RGB', _grab.size, _grab.bgra, 'raw', 'BGRX')
except Exception:
pass
if action_type in ['double_click_anchor']:
pyautogui.doubleClick(x, y)
elif action_type in ['right_click_anchor']:
@@ -729,7 +782,7 @@ def execute_action_with_coords(action_type: str, params: dict, coords: dict) ->
else:
pyautogui.click(x, y)
time.sleep(2.0) # Délai après le clic
_wait_for_screen_change(_pre_screen)
return {
'success': True,
@@ -759,6 +812,17 @@ def execute_action_with_static_coords(action_type: str, params: dict) -> dict:
print(f"🖱️ [Self-Healing] Clic aux coordonnées statiques: ({x}, {y})")
# Capture pré-clic pour détection de changement
_pre_screen = None
try:
import mss as _mss
from PIL import Image as _PILImage
with _mss.mss() as _sct:
_grab = _sct.grab(_sct.monitors[0])
_pre_screen = _PILImage.frombytes('RGB', _grab.size, _grab.bgra, 'raw', 'BGRX')
except Exception:
pass
if action_type in ['double_click_anchor']:
pyautogui.doubleClick(x, y)
elif action_type in ['right_click_anchor']:
@@ -766,7 +830,7 @@ def execute_action_with_static_coords(action_type: str, params: dict) -> dict:
else:
pyautogui.click(x, y)
time.sleep(2.0)
_wait_for_screen_change(_pre_screen)
return {
'success': True,
@@ -844,6 +908,7 @@ def execute_action(action_type: str, params: dict) -> dict:
_fc_target_desc = params.get('visual_anchor', {}).get('description', '')
x, y, confidence, method_used = None, None, 0, ''
screen_img = None # Screenshot pré-clic pour détection de changement
# === MÉTHODE 1 : Template matching direct (~1-10ms) ===
try:
@@ -922,7 +987,7 @@ def execute_action(action_type: str, params: dict) -> dict:
else:
pyautogui.click(x, y)
time.sleep(2.0)
_wait_for_screen_change(screen_img)
return {
'success': True,
@@ -969,7 +1034,7 @@ def execute_action(action_type: str, params: dict) -> dict:
else:
pyautogui.click(gx, gy)
time.sleep(2.0)
_wait_for_screen_change(screen_img)
return {
'success': True,

View File

@@ -218,6 +218,9 @@ class IntelligentExecutor:
Matching par similarité d'embeddings CLIP + pondération par distance.
Combine le score sémantique avec la proximité à la position originale.
Utilise embed_image_batch() pour encoder tous les éléments en un seul
appel GPU au lieu de ~90 appels individuels.
SEUILS STRICTS pour éviter les faux positifs:
- MAX_DISTANCE_PX: Distance maximale absolue (500px)
- MIN_CLIP_SCORE: Score CLIP minimum (0.50)
@@ -253,31 +256,22 @@ class IntelligentExecutor:
# Obtenir l'embedding de l'ancre
anchor_embedding = self._clip_model.embed_image(anchor_image)
best_match = None
best_combined_score = 0.0
candidates = []
rejected_candidates = [] # Pour debug: garder trace des rejetés
print(f"🔍 [CLIP] {len(elements)} éléments détectés par UI-DETR-1")
# === ÉTAPE 1 : Filtrer par distance et préparer les crops ===
nearby_elements = [] # Éléments gardés (distance OK)
nearby_crops = [] # Crops PIL correspondants
nearby_distances = [] # Distances pré-calculées
nearby_distance_factors = [] # Facteurs de pondération
rejected_candidates = [] # Pour debug: garder trace des rejetés
for elem in elements:
# Extraire la région de l'élément
x1, y1 = elem.bbox['x1'], elem.bbox['y1']
x2, y2 = elem.bbox['x2'], elem.bbox['y2']
elem_crop = screen_image.crop((x1, y1, x2, y2))
# Obtenir l'embedding de l'élément
elem_embedding = self._clip_model.embed_image(elem_crop)
# Calculer la similarité cosinus (score sémantique CLIP)
clip_score = float(np.dot(anchor_embedding, elem_embedding) /
(np.linalg.norm(anchor_embedding) * np.linalg.norm(elem_embedding)))
# Calculer la pondération par distance si position originale connue
distance_factor = 1.0
# Calculer la distance si position originale connue
distance = None
rejected_reason = None
distance_factor = 1.0
if anchor_center_x is not None and anchor_center_y is not None:
elem_center_x = (x1 + x2) // 2
@@ -287,49 +281,82 @@ class IntelligentExecutor:
(elem_center_y - anchor_center_y) ** 2
)
# Pondération par distance
normalized_distance = distance / screen_diagonal
distance_factor = max(0.2, 1.0 - (normalized_distance * 5.0))
# REJET STRICT: distance > MAX_DISTANCE_PX
if distance > MAX_DISTANCE_PX:
rejected_reason = f"distance {distance:.0f}px > {MAX_DISTANCE_PX}px"
rejected_candidates.append({
'element_id': elem.id,
'clip_score': clip_score,
'clip_score': 0.0,
'distance': distance,
'reason': rejected_reason,
'reason': f"distance {distance:.0f}px > {MAX_DISTANCE_PX}px",
'center': {'x': elem_center_x, 'y': elem_center_y}
})
continue
# REJET STRICT: score CLIP < MIN_CLIP_SCORE
if clip_score < MIN_CLIP_SCORE:
rejected_reason = f"CLIP {clip_score:.2f} < {MIN_CLIP_SCORE}"
rejected_candidates.append({
# Pondération par distance
normalized_distance = distance / screen_diagonal
distance_factor = max(0.2, 1.0 - (normalized_distance * 5.0))
# Cropper l'élément
elem_crop = screen_image.crop((x1, y1, x2, y2))
nearby_elements.append(elem)
nearby_crops.append(elem_crop)
nearby_distances.append(distance)
nearby_distance_factors.append(distance_factor)
print(f"🔍 [CLIP] {len(nearby_elements)} éléments après filtre distance "
f"({len(rejected_candidates)} rejetés par distance)")
# === ÉTAPE 2 : Batch CLIP — un seul appel GPU ===
best_match = None
best_combined_score = 0.0
candidates = []
if nearby_crops:
# Encoder tous les crops en batch (1 appel GPU au lieu de N)
all_embeddings = self._clip_model.embed_image_batch(nearby_crops)
# === ÉTAPE 3 : Similarités vectorisées avec numpy ===
# anchor_embedding shape: (dim,), all_embeddings shape: (N, dim)
anchor_norm = np.linalg.norm(anchor_embedding)
elem_norms = np.linalg.norm(all_embeddings, axis=1)
# Similarité cosinus vectorisée
clip_scores = np.dot(all_embeddings, anchor_embedding) / (elem_norms * anchor_norm)
# === ÉTAPE 4 : Appliquer seuils et construire les candidats ===
for i, elem in enumerate(nearby_elements):
clip_score = float(clip_scores[i])
distance = nearby_distances[i]
distance_factor = nearby_distance_factors[i]
# REJET STRICT: score CLIP < MIN_CLIP_SCORE
if clip_score < MIN_CLIP_SCORE:
x1, y1 = elem.bbox['x1'], elem.bbox['y1']
x2, y2 = elem.bbox['x2'], elem.bbox['y2']
rejected_candidates.append({
'element_id': elem.id,
'clip_score': clip_score,
'distance': distance,
'reason': f"CLIP {clip_score:.2f} < {MIN_CLIP_SCORE}",
'center': {'x': (x1+x2)//2, 'y': (y1+y2)//2}
})
continue
# Score combiné: CLIP * distance_factor
combined_score = clip_score * distance_factor
candidates.append({
'element_id': elem.id,
'clip_score': clip_score,
'distance': distance,
'reason': rejected_reason,
'center': {'x': (x1+x2)//2, 'y': (y1+y2)//2}
'distance_factor': distance_factor,
'combined_score': combined_score,
'bbox': elem.bbox
})
continue
# Score combiné: CLIP * distance_factor
combined_score = clip_score * distance_factor
candidates.append({
'element_id': elem.id,
'clip_score': clip_score,
'distance': distance,
'distance_factor': distance_factor,
'combined_score': combined_score,
'bbox': elem.bbox
})
if combined_score > best_combined_score:
best_combined_score = combined_score
best_match = elem
if combined_score > best_combined_score:
best_combined_score = combined_score
best_match = elem
# Trier par score combiné
candidates.sort(key=lambda x: x['combined_score'], reverse=True)