fix(vision): Corriger les seuils CLIP/Template pour éviter les clics erronés

Problème résolu:
- Le workflow cliquait au mauvais endroit (200-500px de distance)
- Les seuils de matching étaient trop permissifs

Corrections apportées:
- CLIP: MAX_DISTANCE=120px, MIN_SCORE=0.55, MIN_COMBINED=0.5
- Template zonée: MAX_DISTANCE=150px
- Template global: MAX_DISTANCE=150px (était 500px)
- Ajout de logs détaillés pour debug des candidats rejetés
- Désactivation de l'overlay debug (polling intensif inutile)

Fichiers modifiés:
- intelligent_executor.py: Seuils stricts + logs
- execute.py: Logique d'exécution modes basic/intelligent/debug
- ui_detection_service.py: Backend UI-DETR-1
- App.tsx: Overlay désactivé
- ExecutionOverlay.tsx: URLs API corrigées

Documentation:
- docs/REFERENCE_VISION_RPA.md: Guide complet de référence

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-24 02:15:04 +01:00
parent d8d086dac5
commit f04f156144
6 changed files with 2088 additions and 156 deletions

View File

@@ -16,7 +16,26 @@ import threading
import time
import base64
import os
import subprocess
from . import api_v3_bp
def minimize_active_window():
"""Minimise la fenêtre active (Linux avec xdotool)"""
try:
# Attendre un court instant pour que la requête HTTP soit traitée
time.sleep(0.3)
# Minimiser la fenêtre active
subprocess.run(['xdotool', 'getactivewindow', 'windowminimize'],
capture_output=True, timeout=2)
print("📦 [Execute] Fenêtre du navigateur minimisée")
return True
except FileNotFoundError:
print("⚠️ [Execute] xdotool non installé - impossible de minimiser")
return False
except Exception as e:
print(f"⚠️ [Execute] Erreur minimisation: {e}")
return False
from db.models import db, Workflow, Step, Execution, ExecutionStep, VisualAnchor, get_session_state
from contracts.action_contracts import enforce_action_contract, ContractValidationError, get_required_params
@@ -32,7 +51,8 @@ _execution_state = {
'is_paused': False,
'should_stop': False,
'current_execution_id': None,
'thread': None
'thread': None,
'execution_mode': 'basic' # 'basic', 'intelligent', 'debug'
}
@@ -99,9 +119,11 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app):
if step.anchor_id:
anchor = VisualAnchor.query.get(step.anchor_id)
if anchor:
# Charger l'image base64 depuis le fichier
if anchor.image_path and os.path.exists(anchor.image_path):
with open(anchor.image_path, 'rb') as f:
# Charger l'image CROPPÉE (thumbnail) pour le template matching
# thumbnail_path = zone de l'ancre, image_path = écran complet
anchor_image_path = anchor.thumbnail_path or anchor.image_path
if anchor_image_path and os.path.exists(anchor_image_path):
with open(anchor_image_path, 'rb') as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
else:
image_base64 = None
@@ -202,57 +224,249 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app):
_execution_state['current_execution_id'] = None
def execute_ai_analyze(params: dict) -> dict:
"""
Exécute une analyse IA avec Ollama.
Capture la zone de l'ancre et envoie à l'IA pour analyse.
"""
import requests
try:
# Récupérer les paramètres
anchor = params.get('visual_anchor', {})
prompt = params.get('analysis_prompt', params.get('prompt', ''))
model = params.get('model', params.get('ollama_model', 'qwen2.5-vl:7b'))
output_variable = params.get('output_variable', 'resultat_analyse')
timeout_ms = params.get('timeout_ms', 60000)
temperature = params.get('temperature', 0.3)
# Récupérer l'image de l'ancre
screenshot_base64 = anchor.get('screenshot')
if not screenshot_base64:
# Capturer l'écran si pas d'image dans l'ancre
try:
from PIL import ImageGrab
import io
bbox = anchor.get('bounding_box', {})
if bbox:
# Capturer la zone spécifique
x, y = int(bbox.get('x', 0)), int(bbox.get('y', 0))
w, h = int(bbox.get('width', 100)), int(bbox.get('height', 100))
screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h))
else:
# Capturer tout l'écran
screenshot = ImageGrab.grab()
buffer = io.BytesIO()
screenshot.save(buffer, format='PNG')
screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
except Exception as cap_err:
return {'success': False, 'error': f"Erreur capture: {cap_err}"}
if not prompt:
prompt = "Décris ce que tu vois dans cette image."
print(f"🤖 [IA] Analyse avec {model}...")
print(f" Prompt: {prompt[:80]}...")
# Appeler Ollama
ollama_url = params.get('ollama_url', 'http://localhost:11434')
payload = {
"model": model,
"prompt": prompt,
"images": [screenshot_base64],
"stream": False,
"options": {
"temperature": temperature,
"num_predict": 1000
}
}
response = requests.post(
f"{ollama_url}/api/generate",
json=payload,
timeout=timeout_ms / 1000
)
if response.status_code == 200:
result = response.json()
analysis_text = result.get('response', '').strip()
print(f"✅ [IA] Analyse terminée ({len(analysis_text)} caractères)")
print(f" Résultat: {analysis_text[:150]}...")
# Stocker le résultat dans le contexte d'exécution pour les variables
global _execution_state
if 'variables' not in _execution_state:
_execution_state['variables'] = {}
_execution_state['variables'][output_variable] = analysis_text
return {
'success': True,
'output': {
'analysis': analysis_text,
'variable': output_variable,
'model': model
}
}
else:
return {'success': False, 'error': f"Erreur Ollama: {response.status_code}"}
except requests.exceptions.Timeout:
return {'success': False, 'error': f"Timeout Ollama après {timeout_ms}ms"}
except requests.exceptions.ConnectionError:
return {'success': False, 'error': "Ollama non accessible (vérifiez qu'il est lancé)"}
except Exception as e:
return {'success': False, 'error': str(e)}
def execute_action(action_type: str, params: dict) -> dict:
"""
Exécute une action RPA.
Utilise pyautogui pour les interactions.
En mode intelligent/debug, utilise la vision pour localiser les éléments.
"""
import pyautogui
import time
execution_mode = _execution_state.get('execution_mode', 'basic')
try:
if action_type in ['click_anchor', 'click', 'double_click_anchor', 'right_click_anchor']:
# Récupérer les coordonnées depuis l'ancre
anchor = params.get('visual_anchor', {})
bbox = anchor.get('bounding_box', {})
screenshot_base64 = anchor.get('screenshot')
if not bbox:
return {'success': False, 'error': 'Pas de bounding_box dans visual_anchor'}
# Calculer le centre
# Déterminer le type de clic
click_type = 'left'
if action_type == 'double_click_anchor':
click_type = 'double'
elif action_type == 'right_click_anchor':
click_type = 'right'
# === MODE INTELLIGENT / DEBUG ===
if execution_mode in ['intelligent', 'debug'] and screenshot_base64:
try:
from services.intelligent_executor import find_and_click
print(f"🧠 [Action] Mode {execution_mode}: recherche visuelle de l'ancre...")
# Convertir bbox au format attendu
anchor_bbox = {
'x': bbox.get('x', 0),
'y': bbox.get('y', 0),
'width': bbox.get('width', 0),
'height': bbox.get('height', 0)
}
# Trouver l'ancre avec la vision (CLIP + position - cf VISION_RPA_INTELLIGENT.md)
result = find_and_click(
anchor_image_base64=screenshot_base64,
anchor_bbox=anchor_bbox,
method='clip', # UI-DETR-1 + CLIP avec pondération par distance
detection_threshold=0.35
)
if result['found'] and result['coordinates']:
x, y = result['coordinates']['x'], result['coordinates']['y']
confidence = result['confidence']
print(f"✅ [Vision] Ancre trouvée à ({x}, {y}) - confiance: {confidence:.2f}")
# Effectuer le clic
if click_type == 'double':
pyautogui.doubleClick(x, y)
elif click_type == 'right':
pyautogui.rightClick(x, y)
else:
pyautogui.click(x, y)
# Délai après le clic pour que l'application réagisse
# 2 secondes pour laisser le temps aux applications de s'ouvrir
time.sleep(2.0)
return {
'success': True,
'output': {
'clicked_at': {'x': x, 'y': y},
'mode': execution_mode,
'confidence': confidence,
'method': result.get('method', 'template')
}
}
else:
# En mode intelligent/debug, on refuse d'utiliser les coordonnées statiques
# si l'ancre n'est pas trouvée - cela évite les clics au mauvais endroit
reason = result.get('reason', 'Ancre non trouvée à l\'écran')
confidence = result.get('confidence', 0)
print(f"❌ [Vision] Ancre NON trouvée (confiance: {confidence:.2f})")
print(f" Raison: {reason}")
return {
'success': False,
'error': f"Ancre non trouvée à l'écran (confiance: {confidence:.2f}). {reason}"
}
except Exception as vision_err:
print(f"❌ [Vision] Erreur: {vision_err}")
return {
'success': False,
'error': f"Erreur vision: {str(vision_err)}"
}
# === MODE BASIC (ou fallback) ===
# Calculer le centre depuis les coordonnées statiques
x = bbox.get('x', 0) + bbox.get('width', 0) / 2
y = bbox.get('y', 0) + bbox.get('height', 0) / 2
# TODO: Utiliser la détection visuelle (OmniParser/VLM) ici
# Pour l'instant, on utilise les coordonnées statiques
print(f"🖱️ [Action] Clic {click_type} à ({x}, {y}) [mode: {execution_mode}]")
print(f"🖱️ [Action] Clic à ({x}, {y})")
if action_type == 'double_click_anchor':
if click_type == 'double':
pyautogui.doubleClick(x, y)
elif action_type == 'right_click_anchor':
elif click_type == 'right':
pyautogui.rightClick(x, y)
else:
pyautogui.click(x, y)
return {'success': True, 'output': {'clicked_at': {'x': x, 'y': y}}}
return {'success': True, 'output': {'clicked_at': {'x': x, 'y': y}, 'mode': execution_mode}}
elif action_type in ['type_text', 'type']:
text = params.get('text', '')
if not text:
return {'success': False, 'error': 'Pas de texte à saisir'}
print(f"⌨️ [Action] Saisie: {text[:30]}...")
# Remplacer les variables {{variable}} par leur valeur
import re
variables = _execution_state.get('variables', {})
def replace_var(match):
var_name = match.group(1)
value = variables.get(var_name, match.group(0)) # Garder {{var}} si non trouvée
print(f" 📌 Variable {{{{{var_name}}}}}{str(value)[:50]}...")
return str(value)
text = re.sub(r'\{\{(\w+)\}\}', replace_var, text)
print(f"⌨️ [Action] Saisie: {text[:50]}...")
# Effacer avant si demandé
if params.get('clear_before', False):
pyautogui.hotkey('ctrl', 'a')
time.sleep(0.1)
# Petit délai pour s'assurer que le focus est bon
time.sleep(0.2)
if text.isascii():
pyautogui.typewrite(text, interval=0.05)
else:
pyautogui.write(text)
# Utiliser write() pour supporter l'unicode (caractères français, etc.)
pyautogui.write(text)
return {'success': True, 'output': {'typed': text}}
return {'success': True, 'output': {'typed': text[:100] + '...' if len(text) > 100 else text}}
elif action_type in ['wait_for_anchor', 'wait']:
timeout_ms = params.get('timeout_ms', params.get('timeout', 5000))
@@ -269,6 +483,10 @@ def execute_action(action_type: str, params: dict) -> dict:
pyautogui.hotkey(*keys)
return {'success': True, 'output': {'hotkey': keys}}
elif action_type == 'ai_analyze_text':
# Analyse de texte avec IA (Ollama)
return execute_ai_analyze(params)
else:
return {'success': False, 'error': f"Type d'action non supporté: {action_type}"}
@@ -297,6 +515,12 @@ def start_execution():
data = request.get_json() or {}
workflow_id = data.get('workflow_id')
execution_mode = data.get('execution_mode', 'basic')
minimize_browser = data.get('minimize_browser', True) # Activé par défaut
# Valider le mode
if execution_mode not in ['basic', 'intelligent', 'debug']:
execution_mode = 'basic'
# Utiliser le workflow actif si non spécifié
if not workflow_id:
@@ -340,6 +564,13 @@ def start_execution():
_execution_state['is_paused'] = False
_execution_state['should_stop'] = False
_execution_state['current_execution_id'] = execution.id
_execution_state['execution_mode'] = execution_mode
print(f"🎯 [API v3] Mode d'exécution: {execution_mode}")
# Minimiser la fenêtre du navigateur si demandé
if minimize_browser:
minimize_active_window()
# Lancer le thread d'exécution
from flask import current_app
@@ -474,6 +705,7 @@ def get_execution_status():
'success': True,
'is_running': _execution_state['is_running'],
'is_paused': _execution_state['is_paused'],
'execution_mode': _execution_state.get('execution_mode', 'basic'),
'execution': execution.to_dict() if execution else None,
'session': session.to_dict()
})