feat: replay visuel Windows opérationnel — template matching + VWB complet

- Bouton "Windows" dans VWB pour exécuter sur le PC distant
- Template matching OpenCV multi-scale pour localiser les ancres visuelles
- Proxy VWB→streaming server avec chargement ancre (thumb, pas full)
- Fix executor Windows : mss lazy, result reporting, debug prints
- Fix poll replay permanent (sans session active)
- Mapping types VWB→executor (click_anchor→click, type_text→type)
- CORS streaming server, capture Windows dans VWB
- Dédup heartbeats côté client (hash perceptuel)
- Mode cloud VLM configurable via RPA_VLM_MODEL
- Fix resolve_target : pas de ScreenAnalyzer fallback (trop lent)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-17 18:56:44 +01:00
parent dd149c1cbb
commit 371db69543
7 changed files with 361 additions and 15 deletions

View File

@@ -120,6 +120,60 @@ def capture_screen():
}), 500
@screen_capture_bp.route('/capture-windows', methods=['POST'])
@cross_origin()
def capture_windows():
"""
Récupère le dernier screenshot du PC Windows (via streaming server).
Le client Agent V1 envoie des heartbeats toutes les 5s.
On récupère le plus récent comme capture.
"""
import glob
from pathlib import Path
# Remonter jusqu'à la racine du projet (rpa_vision_v3/)
project_root = Path(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
live_dir = project_root / "data" / "training" / "live_sessions"
# Trouver la session la plus récente
sessions = sorted(live_dir.glob("sess_*/shots"), key=lambda p: p.parent.name, reverse=True)
if not sessions:
return jsonify({'error': 'Aucune session Windows trouvée'}), 404
# Chercher le screenshot plein écran le plus récent (full ou heartbeat, pas les crops)
latest_shot = None
for session_shots in sessions[:3]:
shots = [s for s in session_shots.glob("*.png")
if "full" in s.name or "heartbeat" in s.name or "focus" in s.name]
if shots:
shots.sort(key=lambda p: p.stat().st_mtime, reverse=True)
latest_shot = shots[0]
break
if not latest_shot:
return jsonify({'error': 'Aucun screenshot Windows disponible'}), 404
try:
from PIL import Image
img = Image.open(latest_shot)
buf = io.BytesIO()
img.save(buf, format='PNG')
img_base64 = base64.b64encode(buf.getvalue()).decode('utf-8')
return jsonify({
'image': img_base64,
'width': img.width,
'height': img.height,
'format': 'png',
'source': 'windows',
'file': str(latest_shot.name),
'session': latest_shot.parent.parent.name,
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@screen_capture_bp.route('/detect-elements', methods=['POST'])
@cross_origin()
def detect_elements():

View File

@@ -775,3 +775,198 @@ def upload_excel():
'filename': file.filename,
'suggested_table': suggested,
})
# ---------------------------------------------------------------------------
# Exécution sur Windows — proxy vers le streaming server (port 5005)
# ---------------------------------------------------------------------------
def _load_anchor_image_b64(anchor_id: str) -> Optional[str]:
"""Charger l'image d'une ancre et la retourner en base64.
Cherche dans 3 emplacements possibles :
1. data/anchors/{id}_full.png (nouveau format V3)
2. data/anchor_images/{id}/original.png (ancien format)
3. SQLite visual_anchors.image_path (chemin absolu en BDD)
"""
import base64 as b64
backend_dir = Path(__file__).resolve().parent.parent
# 1. Nouveau format : data/anchors/{id}_thumb.png (crop de l'ancre, pas le screenshot complet)
new_path = backend_dir / 'data' / 'anchors' / f'{anchor_id}_thumb.png'
if new_path.exists():
try:
with open(new_path, 'rb') as f:
return b64.b64encode(f.read()).decode('utf-8')
except Exception as e:
logger.error("Erreur lecture ancre %s : %s", new_path, e)
# 2. Ancien format : data/anchor_images/{id}/original.png
old_path = backend_dir / 'data' / 'anchor_images' / anchor_id / 'original.png'
if old_path.exists():
try:
with open(old_path, 'rb') as f:
return b64.b64encode(f.read()).decode('utf-8')
except Exception as e:
logger.error("Erreur lecture ancre %s : %s", old_path, e)
# 3. Chemin depuis la BDD
try:
import sqlite3
db_path = backend_dir / 'instance' / 'workflows.db'
conn = sqlite3.connect(str(db_path))
row = conn.execute("SELECT image_path FROM visual_anchors WHERE id=?", (anchor_id,)).fetchone()
conn.close()
if row and row[0] and Path(row[0]).exists():
with open(row[0], 'rb') as f:
return b64.b64encode(f.read()).decode('utf-8')
except Exception as e:
logger.error("Erreur lecture ancre BDD %s : %s", anchor_id, e)
logger.warning("Image ancre introuvable pour %s", anchor_id)
return None
def _load_anchor_metadata(anchor_id: str) -> Optional[Dict]:
"""Charger les métadonnées d'une ancre (bounding_box, taille, etc.)."""
backend_dir = Path(__file__).resolve().parent.parent
# 1. Ancien format : metadata.json
meta_path = backend_dir / 'data' / 'anchor_images' / anchor_id / 'metadata.json'
if meta_path.exists():
try:
with open(meta_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
pass
# 2. Depuis la BDD visual_anchors
try:
import sqlite3
db_path = backend_dir / 'instance' / 'workflows.db'
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT bbox_x, bbox_y, bbox_width, bbox_height, screen_width, screen_height "
"FROM visual_anchors WHERE id=?", (anchor_id,)
).fetchone()
conn.close()
if row:
return {
'bounding_box': {'x': row[0], 'y': row[1], 'width': row[2], 'height': row[3]},
'original_size': {'width': row[4] or 1920, 'height': row[5] or 1080},
}
except Exception:
pass
return None
@api_v3_bp.route('/execute-windows', methods=['POST'])
def execute_windows():
"""Proxy les actions du workflow vers le streaming server pour exécution sur Windows.
Le navigateur ne peut pas contacter le port 5005 directement (CORS/réseau),
donc le backend VWB sert de proxy.
Pour les actions click_anchor, charge l'image de l'ancre visuelle depuis le
disque et l'inclut en base64 dans target_spec afin que l'exécuteur Windows
puisse résoudre la position par template matching (visual_mode).
"""
import requests as req
data = request.get_json()
if not data:
return jsonify({'error': 'Aucune donnée'}), 400
# Mapper les types VWB → types executor Windows
TYPE_MAP = {
'click_anchor': 'click',
'double_click_anchor': 'click',
'right_click_anchor': 'click',
'type_text': 'type',
'type_secret': 'type',
'keyboard_shortcut': 'key_combo',
'hotkey': 'key_combo',
'scroll_to_anchor': 'scroll',
'wait_for_anchor': 'wait',
'visual_condition': 'wait',
}
# Types d'actions basées sur une ancre visuelle (nécessitent visual_mode)
_ANCHOR_CLICK_TYPES = {'click_anchor', 'double_click_anchor', 'right_click_anchor'}
if 'actions' in data:
for action in data['actions']:
vwb_type = action.get('type', '')
params = action.get('parameters', {})
# Mapper le type VWB → type executor
mapped_type = TYPE_MAP.get(vwb_type, vwb_type)
action['type'] = mapped_type
# ---------------------------------------------------------------
# Actions basées sur ancre visuelle → injecter visual_mode
# ---------------------------------------------------------------
if vwb_type in _ANCHOR_CLICK_TYPES:
anchor_id = action.get('anchor_id')
if anchor_id:
anchor_b64 = _load_anchor_image_b64(anchor_id)
if anchor_b64:
# Charger les métadonnées pour le bounding_box de référence
anchor_meta = _load_anchor_metadata(anchor_id)
target_spec = {
'anchor_image_base64': anchor_b64,
'anchor_id': anchor_id,
}
if anchor_meta:
target_spec['anchor_bbox'] = anchor_meta.get('bounding_box', {})
target_spec['original_size'] = anchor_meta.get('original_size', {})
action['visual_mode'] = True
action['target_spec'] = target_spec
logger.info(
"Action %s : ancre '%s' chargée (%d Ko), visual_mode activé",
action.get('action_id', '?'),
anchor_id,
len(anchor_b64) // 1024,
)
else:
logger.warning(
"Action %s : ancre '%s' introuvable, fallback blind mode",
action.get('action_id', '?'),
anchor_id,
)
# Mapper le bouton selon le type de clic VWB
if vwb_type == 'double_click_anchor':
action['button'] = 'double'
elif vwb_type == 'right_click_anchor':
action['button'] = 'right'
# ---------------------------------------------------------------
# type_text / type_secret → extraire le texte
# ---------------------------------------------------------------
if vwb_type in ('type_text', 'type_secret') and 'text' in params:
action['text'] = params['text']
# Ne pas forcer un clic préalable à (0,0) si pas de coordonnées
# L'exécuteur ne cliquera que si x_pct > 0 et y_pct > 0
# (le clic de positionnement est fait par l'action click_anchor précédente)
# ---------------------------------------------------------------
# keyboard_shortcut / hotkey → extraire les touches
# ---------------------------------------------------------------
if vwb_type in ('keyboard_shortcut', 'hotkey') and 'keys' in params:
action['keys'] = params['keys']
try:
resp = req.post(
'http://localhost:5005/api/v1/traces/stream/replay/raw',
json=data,
timeout=30, # Augmenté car le template matching peut prendre du temps
)
return jsonify(resp.json()), resp.status_code
except req.ConnectionError:
return jsonify({'error': 'Streaming server (port 5005) non disponible'}), 503
except Exception as e:
return jsonify({'error': str(e)}), 500