feat: premier replay E2E + mode apprentissage supervisé

Premier replay fonctionnel de bout en bout (Bloc-notes, Chrome).

Corrections critiques :
- Fix double-lancement agent (Lea.bat start /b + verrou PID)
- Sérialisation replay (threading.Lock dans poll_and_execute)
- Garde UIA bbox >50% écran (rejet conteneurs "Bureau")
- Filtre fenêtres bruit système (systray overflow)
- Auto-nettoyage replays bloqués (paused_need_help)

Cascade visuelle complète dans session_cleaner :
- UIA local (10ms) → template matching (100ms) → serveur docTR/VLM
- Nettoyage bureau pré-replay (clic "Afficher le bureau")
- Crops 80x80 + vlm_description pour chaque clic

Grounding contraint à la fenêtre active :
- Capture croppée à la fenêtre au lieu de l'écran entier
- Conversion coordonnées fenêtre → écran
- Élimine les faux positifs taskbar/systray

Mode apprentissage supervisé (SUPERVISE → capture humaine) :
- Léa passe en mode capture quand elle est perdue
- Capture mini-workflow humain (clics + frappes + combos)
- Fin par Ctrl+Shift+L ou timeout inactivité 10s
- Correction stockée dans target_memory.db via serveur

Deploy Windows complet (grounding.py, policy.py, uia_helper.py).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-13 07:42:50 +02:00
parent 816b37af98
commit 33c198b827
12 changed files with 1561 additions and 60 deletions

View File

@@ -17,6 +17,7 @@ import base64
import hashlib
import io
import os
import threading
import time
import logging
@@ -72,6 +73,12 @@ class ActionExecutorV1:
# different de celui qui utilise l'instance).
self._sct = None
self.running = True
# ── Verrou de sérialisation replay ──
# Garantit qu'UNE SEULE action de replay s'exécute à la fois.
# Sans ce lock, deux threads (polling main.py + lea_ui) peuvent
# consommer deux actions simultanément → race condition + mss
# thread-unsafe retourne des résolutions fantômes (1024x768).
self._replay_lock = threading.Lock()
# Backoff exponentiel pour le polling replay (evite de marteler le serveur)
self._poll_backoff = 1.0 # Delai actuel (secondes)
self._poll_backoff_min = 1.0 # Delai minimal (reset apres succes)
@@ -340,6 +347,25 @@ class ActionExecutorV1:
)
return None
# ── GARDE : rejeter les éléments géants (conteneurs) ──
# Un élément qui couvre >50% de l'écran est un conteneur
# (Bureau, Rechercher, liste), pas un bouton cliquable.
# Cliquer au centre d'un conteneur = clic aveugle.
elem_w = element.width()
elem_h = element.height()
screen_area = screen_width * screen_height
elem_area = elem_w * elem_h
if screen_area > 0 and elem_area / screen_area > 0.5:
logger.warning(
f"UIA REJET : '{name}' couvre {elem_area / screen_area * 100:.0f}% "
f"de l'écran ({elem_w}x{elem_h}) — conteneur, pas un élément cliquable"
)
print(
f" [UIA] REJET — '{name}' trop grand "
f"({elem_w}x{elem_h}, {elem_area / screen_area * 100:.0f}% écran)"
)
return None
cx, cy = element.center()
if screen_width <= 0 or screen_height <= 0:
return None
@@ -499,10 +525,25 @@ class ActionExecutorV1:
"visual_resolved": False,
}
# ── Délai inter-actions (anti race condition mss) ──
wait_before = action.get("wait_before", 0.5)
if wait_before > 0:
time.sleep(wait_before)
try:
monitor = self.sct.monitors[1]
width, height = monitor["width"], monitor["height"]
# ── Diagnostic résolution ──
logger.info(
f"[REPLAY] Action {action_id} ({action_type}) — "
f"écran replay: {width}x{height}, "
f"x_pct={action.get('x_pct', 0):.4f}, "
f"y_pct={action.get('y_pct', 0):.4f} "
f"→ pixel ({int(action.get('x_pct', 0) * width)}, "
f"{int(action.get('y_pct', 0) * height)})"
)
# Resolution visuelle des coordonnees si demande
x_pct = action.get("x_pct", 0.0)
y_pct = action.get("y_pct", 0.0)
@@ -526,7 +567,7 @@ class ActionExecutorV1:
)
if expected_title and expected_title != "unknown_window":
from ..window_info_crossplatform import get_active_window_info
from ..ui.messages import est_fenetre_lea
from ..ui.messages import est_fenetre_lea, est_fenetre_bruit
# Polling court pour laisser le temps à la fenêtre de
# se stabiliser (évite les faux négatifs sur transitions
@@ -544,8 +585,9 @@ class ActionExecutorV1:
time.sleep(0.3)
continue
# Si on tombe sur unknown_window → on attend aussi
if not current_title or current_title == "unknown_window":
# Bruit système (systray overflow, taskbar, etc.)
# → on attend que la vraie fenêtre reprenne le focus
if est_fenetre_bruit(current_title):
time.sleep(0.3)
continue
@@ -686,8 +728,8 @@ class ActionExecutorV1:
if action_type == "click":
# Si visual_mode est activé, le resolve DOIT réussir.
# Pas de fallback blind — on arrête le replay si la cible
# n'est pas trouvée visuellement. C'est un RPA VISUEL.
# Pas de fallback blind — Léa VOIT l'écran et CHERCHE
# l'élément. Si toute la cascade échoue → pause supervisée.
if visual_mode and not result.get("visual_resolved"):
# ── Policy : décider quoi faire quand grounding échoue ──
from .policy import PolicyEngine, Decision
@@ -709,7 +751,6 @@ class ActionExecutorV1:
)
if policy_decision.decision == Decision.RETRY:
# Re-tenter le grounding après correction (popup fermée, etc.)
resolved2 = self._resolve_target_visual(
server_url, target_spec, x_pct, y_pct, width, height
)
@@ -719,7 +760,6 @@ class ActionExecutorV1:
result["visual_resolved"] = True
print(f" [POLICY] Re-resolve OK après {policy_decision.action_taken}")
else:
# Re-resolve échoué — SUPERVISE (rendre la main)
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
@@ -746,18 +786,55 @@ class ActionExecutorV1:
)
return result
else: # SUPERVISE ou CONTINUE
else: # SUPERVISE → mode apprentissage
# Léa est perdue. Au lieu de s'arrêter, elle
# passe en mode capture et enregistre ce que
# l'humain fait (mini-workflow de correction).
try:
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
except Exception:
pass
human_actions = self._capture_human_correction(
timeout_s=120,
)
if human_actions:
# L'humain a montré un mini-workflow
result["success"] = True
result["resolution_method"] = "human_supervised"
result["warning"] = "human_supervised"
# Stocker le dernier clic comme position résolue
last_click = None
for ha in reversed(human_actions):
if ha.get("type") == "click":
last_click = ha
break
if last_click:
result["actual_position"] = {
"x_pct": last_click["x_pct"],
"y_pct": last_click["y_pct"],
}
# Envoyer toute la correction au serveur
result["correction"] = {
"actions": human_actions,
"action_count": len(human_actions),
"last_click": last_click,
}
logger.info(
f"[APPRENTISSAGE] Correction reçue : "
f"{len(human_actions)} actions — je m'en souviendrai."
)
else:
# Timeout — l'humain n'a pas répondu
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
return result
real_x = int(x_pct * width)
real_y = int(y_pct * height)
@@ -1417,15 +1494,24 @@ Example: x_pct=0.50, y_pct=0.30"""
2. Execute l'action (clic, texte, etc.)
3. POST /replay/result avec le resultat + screenshot
Args:
session_id: Identifiant de la session courante
server_url: URL de base du serveur streaming
machine_id: Identifiant de la machine (pour le replay multi-machine)
Sérialisé par _replay_lock — une seule action à la fois.
Sans ce lock, deux threads concurrents consomment deux actions
et mss retourne des résolutions fantômes (thread-unsafe).
Retourne True si une action a ete executee, False sinon.
IMPORTANT: Si une action est recue, le resultat est TOUJOURS rapporte
au serveur (meme en cas d'erreur d'execution).
"""
# Sérialisation stricte : si un autre thread exécute déjà une
# action, on abandonne ce poll immédiatement (pas de file d'attente).
if not self._replay_lock.acquire(blocking=False):
return False
try:
return self._poll_and_execute_inner(session_id, server_url, machine_id)
finally:
self._replay_lock.release()
def _poll_and_execute_inner(self, session_id: str, server_url: str, machine_id: str) -> bool:
"""Implémentation interne de poll_and_execute (protégée par _replay_lock)."""
import requests
replay_next_url = f"{server_url}/traces/stream/replay/next"
@@ -1499,11 +1585,14 @@ Example: x_pct=0.50, y_pct=0.30"""
print(f">>> ERREUR EXECUTION : {e}")
logger.error(f"Erreur execute_replay_action: {e}")
import traceback
tb_str = traceback.format_exc()
traceback.print_exc()
result = {
"action_id": action_id,
"success": False,
"error": f"Exception executor: {e}",
# Inclure le traceback complet pour diagnostiquer
# les crashes côté agent depuis les logs serveur
"error": f"{e}\n---TRACEBACK---\n{tb_str[-500:]}",
"screenshot": None,
}
@@ -1525,6 +1614,8 @@ Example: x_pct=0.50, y_pct=0.30"""
# Champs enrichis pour target_not_found (pause supervisée)
"target_description": result.get("target_description"),
"target_spec": result.get("target_spec"),
# Correction humaine (mode apprentissage supervisé)
"correction": result.get("correction"),
}
try:
resp2 = requests.post(
@@ -2007,6 +2098,159 @@ Example: x_pct=0.50, y_pct=0.30"""
logger.debug(f"Texte saisi char-by-char ({len(text)} chars)")
# =========================================================================
# Mode apprentissage — l'humain montre, Léa apprend
# =========================================================================
# Hotkey pour signaler la fin de la correction humaine
_LEARNING_DONE_HOTKEY = {Key.ctrl_l, Key.shift, KeyCode.from_char("l")}
def _capture_human_correction(self, timeout_s: float = 120.0) -> list[dict]:
"""Capturer un mini-workflow de correction humaine.
Léa est perdue — elle passe en mode capture et enregistre
TOUTES les actions de l'humain (clics, frappes, combos)
jusqu'à ce que l'humain signale qu'il a fini :
- Ctrl+Shift+L (hotkey)
- Ou timeout d'inactivité (10s sans action)
- Ou timeout global (120s)
Retourne la liste des actions capturées (peut être vide si timeout).
C'est un mini-workflow, pas juste un clic.
"""
done_event = threading.Event()
actions: list[dict] = []
last_action_time = [time.time()]
keys_pressed: set = set()
INACTIVITY_TIMEOUT = 10.0 # secondes
monitor = self.sct.monitors[1]
screen_w, screen_h = monitor["width"], monitor["height"]
def _on_click(x, y, button, pressed):
if done_event.is_set():
return False
if pressed and button.name in ("left", "right"):
action = {
"type": "click",
"x_pct": round(x / screen_w, 6),
"y_pct": round(y / screen_h, 6),
"button": button.name,
"timestamp": time.time(),
}
# UIA snapshot
try:
from .uia_helper import get_shared_helper
helper = get_shared_helper()
if helper.available:
elem = helper.query_at(int(x), int(y), with_parents=True)
if elem:
action["uia_snapshot"] = elem.to_dict()
except Exception:
pass
actions.append(action)
last_action_time[0] = time.time()
logger.info(f"[APPRENTISSAGE] Clic ({x}, {y}) bouton={button.name}")
def _on_key_press(key):
if done_event.is_set():
return False
keys_pressed.add(key)
# Vérifier hotkey Ctrl+Shift+L
if self._LEARNING_DONE_HOTKEY.issubset(keys_pressed):
logger.info("[APPRENTISSAGE] Hotkey Ctrl+Shift+L — fin de correction")
print(" [APPRENTISSAGE] Ctrl+Shift+L reçu — merci !")
done_event.set()
return False
def _on_key_release(key):
keys_pressed.discard(key)
if done_event.is_set():
return False
# Capturer les frappes texte (pas les modifiers seuls)
if hasattr(key, "char") and key.char:
actions.append({
"type": "type",
"text": key.char,
"timestamp": time.time(),
})
last_action_time[0] = time.time()
elif key == Key.enter:
actions.append({
"type": "key_combo",
"keys": ["enter"],
"timestamp": time.time(),
})
last_action_time[0] = time.time()
from pynput.mouse import Listener as MouseListener
from pynput.keyboard import Listener as KeyboardListener
mouse_listener = MouseListener(on_click=_on_click)
kbd_listener = KeyboardListener(
on_press=_on_key_press, on_release=_on_key_release,
)
mouse_listener.start()
kbd_listener.start()
logger.info(
f"[APPRENTISSAGE] Mode capture activé (timeout={timeout_s}s, "
f"inactivité={INACTIVITY_TIMEOUT}s, hotkey=Ctrl+Shift+L)"
)
print(
f" [APPRENTISSAGE] Montre-moi comment faire.\n"
f" Quand tu as fini → Ctrl+Shift+L\n"
f" (ou j'attends {INACTIVITY_TIMEOUT}s sans action)"
)
# Attendre : hotkey OU inactivité OU timeout global
start = time.time()
while not done_event.is_set():
elapsed = time.time() - start
if elapsed > timeout_s:
logger.info("[APPRENTISSAGE] Timeout global")
break
# Timeout inactivité : si l'humain a fait au moins 1 action
# et n'a rien fait depuis INACTIVITY_TIMEOUT secondes
if actions and (time.time() - last_action_time[0]) > INACTIVITY_TIMEOUT:
logger.info(
f"[APPRENTISSAGE] Inactivité {INACTIVITY_TIMEOUT}s — "
f"fin automatique ({len(actions)} actions)"
)
print(f" [APPRENTISSAGE] Pas d'action depuis {INACTIVITY_TIMEOUT}s — je reprends.")
break
time.sleep(0.2)
mouse_listener.stop()
kbd_listener.stop()
logger.info(f"[APPRENTISSAGE] {len(actions)} actions capturées")
print(f" [APPRENTISSAGE] {len(actions)} actions capturées — merci !")
return actions
def _capture_crop_at(self, x: int, y: int, size: int = 80) -> str:
"""Capturer un crop carré autour d'une position."""
try:
from PIL import Image
with mss.mss() as local_sct:
monitor = local_sct.monitors[1]
raw = local_sct.grab(monitor)
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
half = size // 2
left = max(0, x - half)
top = max(0, y - half)
right = min(img.width, x + half)
bottom = min(img.height, y + half)
crop = img.crop((left, top, right, bottom))
buffer = io.BytesIO()
crop.save(buffer, format="JPEG", quality=85)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
except Exception:
return ""
def _click(self, pos, button_name):
"""Deplacer la souris via courbe de Bézier puis cliquer.

View File

@@ -15,6 +15,7 @@ Ref: docs/PLAN_ACTEUR_V1.md — Architecture MICRO (grounding + exécution)
"""
import base64
import io
import logging
import os
import time
@@ -126,19 +127,62 @@ class GroundingEngine:
)
t_start = time.time()
screenshot_b64 = self._executor._capture_screenshot_b64(max_width=0, quality=75)
# ── Capture contrainte à la fenêtre active ──
# Le grounding ne voit QUE la fenêtre attendue — pas la taskbar,
# pas le systray, pas les autres apps. Comme un humain qui regarde
# l'application sur laquelle il travaille.
window_rect = None
try:
from ..window_info_crossplatform import get_active_window_rect
win_info = get_active_window_rect()
if win_info and win_info.get("rect"):
r = win_info["rect"] # [left, top, right, bottom]
# Validation : fenêtre visible et pas minuscule
w = r[2] - r[0]
h = r[3] - r[1]
if w > 50 and h > 50:
window_rect = {
"left": max(0, r[0]),
"top": max(0, r[1]),
"width": min(w, screen_width),
"height": min(h, screen_height),
}
logger.info(
f"Grounding contraint à la fenêtre : "
f"{window_rect['width']}x{window_rect['height']} "
f"à ({window_rect['left']}, {window_rect['top']})"
)
except Exception as e:
logger.debug(f"Pas de window rect disponible : {e}")
screenshot_b64 = self._capture_window_or_screen(window_rect)
if not screenshot_b64:
return GroundingResult(
found=False, detail="Capture screenshot échouée",
elapsed_ms=(time.time() - t_start) * 1000,
)
# Dimensions de la zone capturée (fenêtre ou écran entier)
cap_w = window_rect["width"] if window_rect else screen_width
cap_h = window_rect["height"] if window_rect else screen_height
for strategy in strategies:
result = self._try_strategy(
strategy, server_url, screenshot_b64, target_spec,
fallback_x, fallback_y, screen_width, screen_height,
fallback_x, fallback_y, cap_w, cap_h,
)
if result.found:
# ── Conversion coords fenêtre → coords écran ──
if window_rect:
# Le grounding a retourné des coords relatives à la fenêtre
# On les convertit en coords relatives à l'écran entier
abs_x = window_rect["left"] + result.x_pct * cap_w
abs_y = window_rect["top"] + result.y_pct * cap_h
result.x_pct = abs_x / screen_width
result.y_pct = abs_y / screen_height
result.detail = f"{result.detail} [fenêtre {cap_w}x{cap_h}]"
result.elapsed_ms = (time.time() - t_start) * 1000
return result
@@ -148,6 +192,39 @@ class GroundingEngine:
elapsed_ms=(time.time() - t_start) * 1000,
)
def _capture_window_or_screen(self, window_rect: Optional[Dict]) -> str:
"""Capturer soit la fenêtre active (croppée), soit l'écran entier.
Si window_rect est fourni, capture uniquement cette zone.
Sinon, capture l'écran entier (fallback).
"""
try:
from PIL import Image
import mss as mss_lib
with mss_lib.mss() as local_sct:
if window_rect:
# Capture de la zone fenêtre uniquement
region = {
"left": window_rect["left"],
"top": window_rect["top"],
"width": window_rect["width"],
"height": window_rect["height"],
}
raw = local_sct.grab(region)
else:
# Fallback écran entier
raw = local_sct.grab(local_sct.monitors[1])
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=75)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
except Exception as e:
logger.warning(f"Capture échouée : {e}")
# Fallback sur la méthode existante de l'executor
return self._executor._capture_screenshot_b64(max_width=0, quality=75)
def _try_strategy(
self,
strategy: str,

View File

@@ -568,6 +568,35 @@ def est_fenetre_lea(titre_fenetre: str) -> bool:
return any(re.search(motif, titre_lower) for motif in _MOTIFS_FENETRE_LEA_REGEX)
# Fenêtres parasites Windows à ignorer dans les pré-vérifications.
# Ce ne sont pas des fenêtres applicatives — c'est du bruit système
# qui prend le focus de manière imprévisible.
_FENETRES_BRUIT_SYSTEME = (
"fenêtre de dépassement de capacité",
"overflow", # version anglaise systray
"program manager",
"barre des tâches",
"task bar",
"cortana",
"action center",
"centre de notifications",
)
def est_fenetre_bruit(titre_fenetre: str) -> bool:
"""Détecter si un titre de fenêtre est du bruit système Windows.
Ces fenêtres prennent le focus de manière imprévisible (systray overflow,
taskbar, Program Manager) et ne sont jamais la cible d'une action utilisateur.
"""
if not titre_fenetre:
return True # pas de titre = bruit
titre_lower = titre_fenetre.lower().strip()
if titre_lower == "unknown_window":
return True
return any(p in titre_lower for p in _FENETRES_BRUIT_SYSTEME)
# Conservé pour rétro-compatibilité avec le code qui listait MOTIFS_FENETRE_LEA
MOTIFS_FENETRE_LEA = (
"léa",

View File

@@ -17,6 +17,7 @@ import base64
import hashlib
import io
import os
import threading
import time
import logging
@@ -72,6 +73,8 @@ class ActionExecutorV1:
# different de celui qui utilise l'instance).
self._sct = None
self.running = True
# Verrou de sérialisation — une seule action replay à la fois
self._replay_lock = threading.Lock()
# Backoff exponentiel pour le polling replay (evite de marteler le serveur)
self._poll_backoff = 1.0 # Delai actuel (secondes)
self._poll_backoff_min = 1.0 # Delai minimal (reset apres succes)
@@ -241,6 +244,107 @@ class ActionExecutorV1:
logger.warning(f"Acteur gemma4 indisponible : {e}")
return "EXECUTER"
# =========================================================================
# UIA local — résolution via lea_uia.exe (helper Rust)
# =========================================================================
def _resolve_via_uia_local(
self, uia_target: dict, screen_width: int, screen_height: int,
):
"""Résoudre une cible via UIA local (lea_uia.exe).
Le plan contient un uia_target (nom, control_type, parent_path).
On appelle le helper Rust qui interroge UIAutomationCore.dll et
retourne les coordonnées pixel-perfect de l'élément.
STRICT : si l'élément trouvé n'appartient pas à la bonne fenêtre
parente (comparaison du parent_path), on REFUSE.
Retourne (x_pct, y_pct) si trouvé ET validé, None sinon.
"""
try:
from .uia_helper import get_shared_helper
helper = get_shared_helper()
if not helper.available:
return None
name = uia_target.get("name", "")
control_type = uia_target.get("control_type", "") or None
automation_id = uia_target.get("automation_id", "") or None
expected_parent_path = uia_target.get("parent_path", []) or []
if not name:
return None
element = helper.find_by_name(
name=name,
control_type=control_type,
automation_id=automation_id,
timeout_ms=1500,
)
if element is None or not element.is_clickable():
logger.debug(f"UIA: '{name}' non trouvé ou non cliquable")
return None
# ── VÉRIFICATION STRICTE du parent_path ──
if expected_parent_path:
expected_root = None
for p in expected_parent_path:
if p.get("control_type", "").lower() in ("fenêtre", "window"):
expected_root = p.get("name", "").strip()
break
if expected_root:
found_root = None
for p in element.parent_path:
if p.get("control_type", "").lower() in ("fenêtre", "window"):
found_root = p.get("name", "").strip()
break
if found_root and expected_root != found_root:
if (expected_root.lower() not in found_root.lower()
and found_root.lower() not in expected_root.lower()):
logger.warning(
f"UIA REJET : '{name}' trouvé dans '{found_root}' "
f"mais attendu dans '{expected_root}'"
)
print(
f" [UIA] REJET — '{name}' trouvé dans mauvaise fenêtre "
f"({found_root}{expected_root})"
)
return None
# ── GARDE : rejeter les éléments géants (conteneurs) ──
elem_w = element.width()
elem_h = element.height()
screen_area = screen_width * screen_height
elem_area = elem_w * elem_h
if screen_area > 0 and elem_area / screen_area > 0.5:
logger.warning(
f"UIA REJET : '{name}' couvre {elem_area / screen_area * 100:.0f}% "
f"de l'écran ({elem_w}x{elem_h}) — conteneur, pas un élément cliquable"
)
print(
f" [UIA] REJET — '{name}' trop grand "
f"({elem_w}x{elem_h}, {elem_area / screen_area * 100:.0f}% écran)"
)
return None
cx, cy = element.center()
if screen_width <= 0 or screen_height <= 0:
return None
x_pct = cx / screen_width
y_pct = cy / screen_height
if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0):
return None
return (x_pct, y_pct)
except Exception as e:
logger.debug(f"UIA local resolve erreur : {e}")
return None
# =========================================================================
# Observer — pré-analyse écran avant chaque action
# =========================================================================
@@ -385,6 +489,11 @@ class ActionExecutorV1:
"visual_resolved": False,
}
# ── Délai inter-actions (anti race condition mss) ──
wait_before = action.get("wait_before", 0.5)
if wait_before > 0:
time.sleep(wait_before)
try:
monitor = self.sct.monitors[1]
width, height = monitor["width"], monitor["height"]
@@ -393,6 +502,14 @@ class ActionExecutorV1:
x_pct = action.get("x_pct", 0.0)
y_pct = action.get("y_pct", 0.0)
# ── Diagnostic résolution ──
logger.info(
f"[REPLAY] Action {action_id} ({action_type}) — "
f"écran replay: {width}x{height}, "
f"x_pct={x_pct:.4f}, y_pct={y_pct:.4f} "
f"→ pixel ({int(x_pct * width)}, {int(y_pct * height)})"
)
# Extraire le nom de l'application depuis un titre de fenêtre
def _app_name(title):
for sep in [" ", " - ", ""]:
@@ -477,8 +594,27 @@ class ActionExecutorV1:
return result
# EXECUTER → continuer normalement
if visual_mode and target_spec and server_url:
# ── GROUNDING : localisation pure via GroundingEngine ──
# ── UIA local : résolution rapide via lea_uia.exe ──
uia_resolved = False
if visual_mode and target_spec and action_type == "click":
resolve_order = target_spec.get("resolve_order", [])
uia_target = target_spec.get("uia_target")
if resolve_order and resolve_order[0] == "uia" and uia_target:
uia_coords = self._resolve_via_uia_local(uia_target, width, height)
if uia_coords:
x_pct, y_pct = uia_coords
result["visual_resolved"] = True
result["resolution_method"] = "uia_local"
result["resolution_score"] = 0.95
uia_resolved = True
print(f" [UIA] résolu en local: ({x_pct:.4f}, {y_pct:.4f})")
logger.info(
f"UIA local OK : {uia_target.get('name', '?')} "
f"→ ({x_pct:.4f}, {y_pct:.4f})"
)
if not uia_resolved and visual_mode and target_spec and server_url:
# ── GROUNDING : localisation pure via GroundingEngine (fallback) ──
from .grounding import GroundingEngine
grounding = GroundingEngine(self)
grounding_result = grounding.locate(
@@ -510,8 +646,8 @@ class ActionExecutorV1:
if action_type == "click":
# Si visual_mode est activé, le resolve DOIT réussir.
# Pas de fallback blind — on arrête le replay si la cible
# n'est pas trouvée visuellement. C'est un RPA VISUEL.
# Pas de fallback blind — Léa VOIT l'écran et CHERCHE
# l'élément. Si toute la cascade échoue → pause supervisée.
if visual_mode and not result.get("visual_resolved"):
# ── Policy : décider quoi faire quand grounding échoue ──
from .policy import PolicyEngine, Decision
@@ -533,7 +669,6 @@ class ActionExecutorV1:
)
if policy_decision.decision == Decision.RETRY:
# Re-tenter le grounding après correction (popup fermée, etc.)
resolved2 = self._resolve_target_visual(
server_url, target_spec, x_pct, y_pct, width, height
)
@@ -543,7 +678,6 @@ class ActionExecutorV1:
result["visual_resolved"] = True
print(f" [POLICY] Re-resolve OK après {policy_decision.action_taken}")
else:
# Re-resolve échoué — SUPERVISE (rendre la main)
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
@@ -1200,20 +1334,17 @@ Example: x_pct=0.50, y_pct=0.30"""
def poll_and_execute(self, session_id: str, server_url: str, machine_id: str = "default") -> bool:
"""
Poll le serveur pour recuperer et executer la prochaine action.
1. GET /replay/next pour recuperer l'action
2. Execute l'action (clic, texte, etc.)
3. POST /replay/result avec le resultat + screenshot
Args:
session_id: Identifiant de la session courante
server_url: URL de base du serveur streaming
machine_id: Identifiant de la machine (pour le replay multi-machine)
Retourne True si une action a ete executee, False sinon.
IMPORTANT: Si une action est recue, le resultat est TOUJOURS rapporte
au serveur (meme en cas d'erreur d'execution).
Sérialisé par _replay_lock — une seule action à la fois.
"""
if not self._replay_lock.acquire(blocking=False):
return False
try:
return self._poll_and_execute_inner(session_id, server_url, machine_id)
finally:
self._replay_lock.release()
def _poll_and_execute_inner(self, session_id: str, server_url: str, machine_id: str) -> bool:
"""Implémentation interne de poll_and_execute (protégée par _replay_lock)."""
import requests
replay_next_url = f"{server_url}/traces/stream/replay/next"

View File

@@ -0,0 +1,214 @@
# agent_v1/core/grounding.py
"""
Module Grounding — localisation pure d'éléments UI sur l'écran.
Responsabilité unique : "Trouve l'élément X sur l'écran et retourne ses coordonnées."
Ne prend AUCUNE décision. Si l'élément n'est pas trouvé → retourne NOT_FOUND.
Stratégies disponibles (cascade configurable) :
1. Serveur SomEngine + VLM (GPU distant)
2. Template matching local (CPU, ~10ms)
3. VLM local direct (CPU/GPU local)
Séparé de Policy (qui décide quoi faire quand grounding échoue).
Ref: docs/PLAN_ACTEUR_V1.md — Architecture MICRO (grounding + exécution)
"""
import base64
import logging
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class GroundingResult:
"""Résultat d'une tentative de localisation visuelle."""
found: bool # L'élément a été trouvé
x_pct: float = 0.0 # Position X en % (0.0-1.0)
y_pct: float = 0.0 # Position Y en % (0.0-1.0)
method: str = "" # Méthode utilisée (server_som, anchor_template, vlm_direct...)
score: float = 0.0 # Confiance (0.0-1.0)
elapsed_ms: float = 0.0 # Temps de résolution
detail: str = "" # Info supplémentaire (label trouvé, raison échec)
raw: Optional[Dict] = None # Données brutes du resolver (pour debug)
def to_dict(self) -> Dict[str, Any]:
return {
"found": self.found,
"x_pct": self.x_pct,
"y_pct": self.y_pct,
"method": self.method,
"score": round(self.score, 3),
"elapsed_ms": round(self.elapsed_ms, 1),
"detail": self.detail,
}
# Résultat singleton pour "pas trouvé"
NOT_FOUND = GroundingResult(found=False, detail="Aucune méthode n'a trouvé l'élément")
class GroundingEngine:
"""Moteur de localisation visuelle d'éléments UI.
Encapsule la cascade de résolution (serveur → template → VLM local)
avec une interface unifiée. Ne prend aucune décision — c'est le rôle
de PolicyEngine.
Usage :
engine = GroundingEngine(executor)
result = engine.locate(screenshot_b64, target_spec, screen_w, screen_h)
if result.found:
click(result.x_pct, result.y_pct)
"""
def __init__(self, executor):
"""
Args:
executor: ActionExecutorV1 — fournit les méthodes de résolution existantes.
"""
self._executor = executor
def locate(
self,
server_url: str,
target_spec: Dict[str, Any],
fallback_x: float,
fallback_y: float,
screen_width: int,
screen_height: int,
strategies: Optional[List[str]] = None,
) -> GroundingResult:
"""Localiser un élément UI sur l'écran.
Exécute la cascade de stratégies dans l'ordre et retourne
dès qu'une stratégie trouve l'élément.
Args:
server_url: URL du serveur (SomEngine + VLM GPU)
target_spec: Spécification de la cible (by_text, anchor, vlm_description...)
fallback_x, fallback_y: Coordonnées de fallback (enregistrement)
screen_width, screen_height: Résolution écran
strategies: Liste ordonnée de stratégies à essayer.
Par défaut : ["server", "template", "vlm_local"]
Returns:
GroundingResult avec found=True et coordonnées, ou NOT_FOUND
"""
if strategies is None:
strategies = ["server", "template", "vlm_local"]
# ── Apprentissage : réordonner les stratégies selon l'historique ──
# Si le Learning sait quelle méthode marche pour cette cible,
# la mettre en premier. C'est la boucle d'apprentissage.
learned = target_spec.get("_learned_strategy", "")
if learned:
strategy_map = {
"som_text_match": "server",
"grounding_vlm": "server",
"server_som": "server",
"anchor_template": "template",
"template_matching": "template",
"hybrid_text_direct": "vlm_local",
"hybrid_vlm_text": "vlm_local",
"vlm_direct": "vlm_local",
}
preferred = strategy_map.get(learned, "")
if preferred and preferred in strategies:
strategies = [preferred] + [s for s in strategies if s != preferred]
logger.info(
f"Grounding: stratégie réordonnée par l'apprentissage → "
f"{strategies} (learned={learned})"
)
t_start = time.time()
screenshot_b64 = self._executor._capture_screenshot_b64(max_width=0, quality=75)
if not screenshot_b64:
return GroundingResult(
found=False, detail="Capture screenshot échouée",
elapsed_ms=(time.time() - t_start) * 1000,
)
for strategy in strategies:
result = self._try_strategy(
strategy, server_url, screenshot_b64, target_spec,
fallback_x, fallback_y, screen_width, screen_height,
)
if result.found:
result.elapsed_ms = (time.time() - t_start) * 1000
return result
return GroundingResult(
found=False,
detail=f"Toutes les stratégies ont échoué ({', '.join(strategies)})",
elapsed_ms=(time.time() - t_start) * 1000,
)
def _try_strategy(
self,
strategy: str,
server_url: str,
screenshot_b64: str,
target_spec: Dict[str, Any],
fallback_x: float,
fallback_y: float,
screen_width: int,
screen_height: int,
) -> GroundingResult:
"""Essayer une stratégie de grounding unique."""
if strategy == "server" and server_url:
raw = self._executor._server_resolve_target(
server_url, screenshot_b64, target_spec,
fallback_x, fallback_y, screen_width, screen_height,
)
if raw and raw.get("resolved"):
return GroundingResult(
found=True,
x_pct=raw["x_pct"],
y_pct=raw["y_pct"],
method=raw.get("method", "server"),
score=raw.get("score", 0.0),
detail=raw.get("matched_element", {}).get("label", ""),
raw=raw,
)
elif strategy == "template":
anchor_b64 = target_spec.get("anchor_image_base64", "")
if anchor_b64:
raw = self._executor._template_match_anchor(
screenshot_b64, anchor_b64, screen_width, screen_height,
)
if raw and raw.get("resolved"):
return GroundingResult(
found=True,
x_pct=raw["x_pct"],
y_pct=raw["y_pct"],
method="anchor_template",
score=raw.get("score", 0.0),
raw=raw,
)
elif strategy == "vlm_local":
by_text = target_spec.get("by_text", "")
vlm_desc = target_spec.get("vlm_description", "")
if vlm_desc or by_text:
raw = self._executor._hybrid_vlm_resolve(
screenshot_b64, target_spec, screen_width, screen_height,
)
if raw and raw.get("resolved"):
return GroundingResult(
found=True,
x_pct=raw["x_pct"],
y_pct=raw["y_pct"],
method=raw.get("method", "vlm_local"),
score=raw.get("score", 0.0),
detail=raw.get("matched_element", {}).get("label", ""),
raw=raw,
)
return GroundingResult(found=False, method=strategy, detail=f"{strategy}: pas trouvé")

View File

@@ -0,0 +1,152 @@
# agent_v1/core/policy.py
"""
Module Policy — décisions intelligentes quand le grounding échoue.
Responsabilité unique : "Le Grounding dit NOT_FOUND. Que fait-on ?"
Ne localise AUCUN élément — c'est le rôle du Grounding.
Décisions possibles :
- RETRY : re-tenter le grounding (après popup fermée, par exemple)
- SKIP : l'action n'est plus nécessaire (état déjà atteint)
- ABORT : arrêter le workflow (état incohérent)
- SUPERVISE : rendre la main à l'utilisateur
Séparé de Grounding (qui localise les éléments).
Ref: docs/PLAN_ACTEUR_V1.md — Architecture MÉSO (acteur intelligent)
"""
import logging
import os
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
class Decision(Enum):
"""Décisions possibles quand le grounding échoue."""
RETRY = "retry" # Re-tenter (après correction : popup fermée, navigation...)
SKIP = "skip" # Action inutile (état déjà atteint)
ABORT = "abort" # Arrêter le workflow (état incohérent)
SUPERVISE = "supervise" # Rendre la main à l'utilisateur (Léa dit "je bloque")
CONTINUE = "continue" # Continuer malgré l'échec (action non critique)
@dataclass
class PolicyDecision:
"""Résultat d'une décision Policy."""
decision: Decision
reason: str # Explication de la décision
action_taken: str = "" # Action corrective effectuée (ex: "popup fermée")
elapsed_ms: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
"decision": self.decision.value,
"reason": self.reason,
"action_taken": self.action_taken,
"elapsed_ms": round(self.elapsed_ms, 1),
}
class PolicyEngine:
"""Moteur de décision quand le grounding échoue.
Cascade de décision :
1. Popup détectée ? → fermer et RETRY
2. Acteur gemma4 → SKIP / ABORT / SUPERVISE
3. Fallback → SUPERVISE (rendre la main)
Usage :
policy = PolicyEngine(executor)
decision = policy.decide(action, target_spec, grounding_result)
if decision.decision == Decision.RETRY:
# re-tenter le grounding
elif decision.decision == Decision.SKIP:
# marquer comme réussi, passer à la suite
"""
def __init__(self, executor):
self._executor = executor
def decide(
self,
action: Dict[str, Any],
target_spec: Dict[str, Any],
retry_count: int = 0,
max_retries: int = 1,
) -> PolicyDecision:
"""Décider quoi faire quand le grounding a échoué.
Cascade :
1. Si c'est le premier essai → tenter de fermer une popup → RETRY
2. Si retry déjà fait → demander à l'acteur gemma4
3. Selon gemma4 : SKIP, ABORT, ou SUPERVISE
Args:
action: L'action qui a échoué
target_spec: La cible non trouvée
retry_count: Nombre de retries déjà faits
max_retries: Maximum de retries autorisés
"""
t_start = time.time()
# ── Étape 1 : Tentative de fermeture popup (premier essai) ──
if retry_count == 0:
popup_handled = self._try_close_popup()
if popup_handled:
return PolicyDecision(
decision=Decision.RETRY,
reason="Popup détectée et fermée, re-tentative",
action_taken="popup_closed",
elapsed_ms=(time.time() - t_start) * 1000,
)
# ── Étape 2 : Max retries atteint → acteur gemma4 ──
if retry_count >= max_retries:
actor_decision = self._ask_actor(action, target_spec)
if actor_decision == "PASSER":
return PolicyDecision(
decision=Decision.SKIP,
reason="Acteur gemma4 : l'état est déjà atteint",
elapsed_ms=(time.time() - t_start) * 1000,
)
elif actor_decision == "STOPPER":
return PolicyDecision(
decision=Decision.ABORT,
reason="Acteur gemma4 : état incohérent, arrêt",
elapsed_ms=(time.time() - t_start) * 1000,
)
else:
# EXECUTER ou inconnu → pause supervisée
return PolicyDecision(
decision=Decision.SUPERVISE,
reason=f"Acteur gemma4 : {actor_decision}, pause supervisée",
elapsed_ms=(time.time() - t_start) * 1000,
)
# ── Étape 3 : Encore des retries disponibles → RETRY ──
return PolicyDecision(
decision=Decision.RETRY,
reason=f"Retry {retry_count + 1}/{max_retries}",
elapsed_ms=(time.time() - t_start) * 1000,
)
def _try_close_popup(self) -> bool:
"""Tenter de fermer une popup via le handler VLM existant."""
try:
return self._executor._handle_popup_vlm()
except Exception as e:
logger.debug(f"Policy: popup handler échoué : {e}")
return False
def _ask_actor(self, action: Dict, target_spec: Dict) -> str:
"""Demander à gemma4 de décider (PASSER/EXECUTER/STOPPER)."""
try:
return self._executor._actor_decide(action, target_spec)
except Exception as e:
logger.debug(f"Policy: acteur gemma4 échoué : {e}")
return "EXECUTER" # Fallback → supervisé

View File

@@ -0,0 +1,294 @@
# core/workflow/uia_helper.py
"""
UIAHelper — Wrapper Python pour lea_uia.exe (helper Rust UI Automation).
Expose une API Python simple pour interroger UIA via le binaire Rust.
Communique via subprocess + stdin/stdout JSON.
Pourquoi un helper Rust ?
- 5-10x plus rapide que pywinauto (10-20ms vs 50-200ms)
- Binaire standalone ~500 Ko, aucune dépendance runtime
- Pas de problèmes de threading COM en Python
- Crash-safe (le crash du helper n'affecte pas l'agent Python)
Architecture :
Python executor
↓ subprocess.run
lea_uia.exe query --x 812 --y 436
↓ UIA API Windows
JSON response
↓ stdout
Python executor parse JSON
Si lea_uia.exe n'est pas disponible (Linux, binaire absent, crash) :
toutes les méthodes retournent None → fallback vision automatique.
"""
import json
import logging
import os
import platform
import subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Timeout par défaut pour les appels UIA (en secondes)
_DEFAULT_TIMEOUT = 5.0
# Masquer la fenêtre console lors du spawn de lea_uia.exe sur Windows.
# Sans ce flag, chaque appel (à chaque clic utilisateur pendant
# l'enregistrement) fait apparaître une fenêtre cmd noire brièvement
# visible à l'écran → ralentit la souris et pollue les screenshots
# capturés (le VLM peut "voir" le chemin lea_uia.exe comme texte cliqué).
#
# La valeur 0x08000000 correspond à CREATE_NO_WINDOW défini dans
# l'API Windows. Sur Linux/Mac, la valeur est 0 et `creationflags`
# est ignoré. getattr() gère le cas où Python expose déjà la constante
# sur Windows.
if platform.system() == "Windows":
_SUBPROCESS_CREATION_FLAGS = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000)
else:
_SUBPROCESS_CREATION_FLAGS = 0
@dataclass
class UiaElement:
"""Représentation Python d'un élément UIA."""
name: str = ""
control_type: str = ""
class_name: str = ""
automation_id: str = ""
bounding_rect: Tuple[int, int, int, int] = (0, 0, 0, 0)
is_enabled: bool = False
is_offscreen: bool = True
parent_path: List[Dict[str, str]] = field(default_factory=list)
process_name: str = ""
def center(self) -> Tuple[int, int]:
"""Retourner le centre du rectangle (pixels)."""
x1, y1, x2, y2 = self.bounding_rect
return ((x1 + x2) // 2, (y1 + y2) // 2)
def width(self) -> int:
return self.bounding_rect[2] - self.bounding_rect[0]
def height(self) -> int:
return self.bounding_rect[3] - self.bounding_rect[1]
def is_clickable(self) -> bool:
"""Peut-on cliquer dessus ?"""
return (
self.is_enabled
and not self.is_offscreen
and self.width() > 0
and self.height() > 0
)
def path_signature(self) -> str:
"""Signature du chemin parent (pour retrouver l'élément)."""
parts = [f"{p['control_type']}[{p['name']}]" for p in self.parent_path if p.get("name")]
parts.append(f"{self.control_type}[{self.name}]")
return " > ".join(parts)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"control_type": self.control_type,
"class_name": self.class_name,
"automation_id": self.automation_id,
"bounding_rect": list(self.bounding_rect),
"is_enabled": self.is_enabled,
"is_offscreen": self.is_offscreen,
"parent_path": self.parent_path,
"process_name": self.process_name,
}
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "UiaElement":
rect = d.get("bounding_rect", [0, 0, 0, 0])
if isinstance(rect, list) and len(rect) >= 4:
rect = tuple(rect[:4])
else:
rect = (0, 0, 0, 0)
return cls(
name=d.get("name", ""),
control_type=d.get("control_type", ""),
class_name=d.get("class_name", ""),
automation_id=d.get("automation_id", ""),
bounding_rect=rect,
is_enabled=d.get("is_enabled", False),
is_offscreen=d.get("is_offscreen", True),
parent_path=d.get("parent_path", []),
process_name=d.get("process_name", ""),
)
class UIAHelper:
"""Wrapper Python pour lea_uia.exe."""
def __init__(self, helper_path: str = "", timeout: float = _DEFAULT_TIMEOUT):
self._helper_path = helper_path or self._find_helper()
self._timeout = timeout
self._available = self._check_available()
def _find_helper(self) -> str:
"""Trouver lea_uia.exe dans les emplacements standards."""
candidates = [
r"C:\Lea\helpers\lea_uia.exe",
os.path.join(os.path.dirname(__file__), "..", "..",
"agent_rust", "lea_uia", "target",
"x86_64-pc-windows-gnu", "release", "lea_uia.exe"),
"./helpers/lea_uia.exe",
"lea_uia.exe",
]
for path in candidates:
if os.path.isfile(path):
return os.path.abspath(path)
return ""
def _check_available(self) -> bool:
"""Vérifier que le helper est utilisable (Windows + binaire + health OK)."""
if platform.system() != "Windows":
logger.debug("UIAHelper: Linux/Mac — helper désactivé")
return False
if not self._helper_path:
logger.debug("UIAHelper: lea_uia.exe introuvable")
return False
if not os.path.isfile(self._helper_path):
logger.debug(f"UIAHelper: chemin invalide {self._helper_path}")
return False
return True
@property
def available(self) -> bool:
return self._available
@property
def helper_path(self) -> str:
return self._helper_path
def _run(self, args: List[str]) -> Optional[Dict[str, Any]]:
"""Exécuter lea_uia.exe avec les arguments et parser le JSON."""
if not self._available:
return None
try:
result = subprocess.run(
[self._helper_path] + args,
capture_output=True,
text=True,
timeout=self._timeout,
encoding="utf-8",
errors="replace",
creationflags=_SUBPROCESS_CREATION_FLAGS,
)
if result.returncode != 0:
logger.debug(
f"UIAHelper: exit code {result.returncode}, "
f"stderr: {result.stderr[:200]}"
)
return None
output = result.stdout.strip()
if not output:
return None
return json.loads(output)
except subprocess.TimeoutExpired:
logger.debug(f"UIAHelper: timeout ({self._timeout}s) sur {args}")
return None
except json.JSONDecodeError as e:
logger.debug(f"UIAHelper: JSON invalide — {e}")
return None
except Exception as e:
logger.debug(f"UIAHelper: erreur {e}")
return None
def health(self) -> bool:
"""Vérifier que UIA répond."""
data = self._run(["health"])
return data is not None and data.get("status") == "ok"
def query_at(
self,
x: int,
y: int,
with_parents: bool = True,
) -> Optional[UiaElement]:
"""Récupérer l'élément UIA à une position écran.
Args:
x, y: Coordonnées pixel absolues
with_parents: Inclure la hiérarchie des parents
Returns:
UiaElement si trouvé, None sinon (pas d'élément ou UIA indispo)
"""
args = ["query", "--x", str(x), "--y", str(y)]
if not with_parents:
args.append("--with-parents=false")
data = self._run(args)
if not data or data.get("status") != "ok":
return None
elem_data = data.get("element")
if not elem_data:
return None
return UiaElement.from_dict(elem_data)
def find_by_name(
self,
name: str,
control_type: Optional[str] = None,
automation_id: Optional[str] = None,
window: Optional[str] = None,
timeout_ms: int = 2000,
) -> Optional[UiaElement]:
"""Rechercher un élément par son nom (+ filtres optionnels).
Args:
name: Nom exact de l'élément
control_type: Type de contrôle (Button, Edit, MenuItem...)
automation_id: ID d'automation
window: Restreindre à une fenêtre spécifique
timeout_ms: Timeout de recherche en millisecondes
"""
args = ["find", "--name", name, "--timeout-ms", str(timeout_ms)]
if control_type:
args.extend(["--control-type", control_type])
if automation_id:
args.extend(["--automation-id", automation_id])
if window:
args.extend(["--window", window])
data = self._run(args)
if not data or data.get("status") != "ok":
return None
elem_data = data.get("element")
if not elem_data:
return None
return UiaElement.from_dict(elem_data)
def capture_focused(self, max_depth: int = 3) -> Optional[UiaElement]:
"""Capturer l'élément ayant le focus + son contexte."""
data = self._run(["capture", "--max-depth", str(max_depth)])
if not data or data.get("status") != "ok":
return None
elem_data = data.get("element")
if not elem_data:
return None
return UiaElement.from_dict(elem_data)
# Instance globale partagée (singleton léger)
_SHARED_HELPER: Optional[UIAHelper] = None
def get_shared_helper() -> UIAHelper:
"""Retourner une instance partagée de UIAHelper."""
global _SHARED_HELPER
if _SHARED_HELPER is None:
_SHARED_HELPER = UIAHelper()
return _SHARED_HELPER

View File

@@ -1,12 +1,97 @@
# run_agent_v1.py
import sys
import os
import atexit
# Ajout du répertoire courant au PYTHONPATH pour permettre les imports de modules
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
# ---------------------------------------------------------------
# Verrou PID — empêche le lancement de plusieurs instances
# Même si Lea.bat est double-cliqué ou lancé deux fois,
# un seul agent tourne à la fois (defense-in-depth).
# ---------------------------------------------------------------
LOCK_FILE = os.path.join(current_dir, "lea_agent.lock")
def _pid_is_alive(pid: int) -> bool:
"""Vérifie si un processus avec ce PID existe encore (Windows + Unix)."""
if sys.platform == "win32":
try:
import ctypes
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
if handle:
kernel32.CloseHandle(handle)
return True
return False
except Exception:
# Fallback : tasklist
try:
import subprocess
result = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}", "/NH"],
capture_output=True, text=True, timeout=5,
)
return str(pid) in result.stdout
except Exception:
return False
else:
# Unix/Linux — os.kill(pid, 0) ne tue pas le process
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False
def _acquire_lock() -> bool:
"""Tente d'acquérir le verrou PID. Retourne False si une autre instance tourne."""
my_pid = os.getpid()
# Lire le PID existant
if os.path.isfile(LOCK_FILE):
try:
with open(LOCK_FILE, "r", encoding="utf-8") as f:
old_pid = int(f.read().strip())
# Le PID dans le lock est-il encore vivant ?
if old_pid != my_pid and _pid_is_alive(old_pid):
return False # Une autre instance tourne déjà
except (ValueError, OSError):
pass # Fichier corrompu — on l'écrase
# Écrire notre PID
try:
with open(LOCK_FILE, "w", encoding="utf-8") as f:
f.write(str(my_pid))
except OSError:
pass # Pas bloquant — on continue sans lock
return True
def _release_lock():
"""Supprime le fichier lock au shutdown."""
try:
if os.path.isfile(LOCK_FILE):
with open(LOCK_FILE, "r", encoding="utf-8") as f:
stored_pid = int(f.read().strip())
# Ne supprimer que si c'est bien NOTRE lock
if stored_pid == os.getpid():
os.remove(LOCK_FILE)
except (ValueError, OSError):
pass
# Vérification du lock AVANT toute initialisation lourde
if not _acquire_lock():
# Une autre instance de Léa tourne déjà — on quitte silencieusement
sys.exit(0)
atexit.register(_release_lock)
# Charger config.txt et .env comme variables d'environnement
# (équivalent du `set` dans Lea.bat, mais fonctionne aussi sans le .bat)
for config_file in ("config.txt", ".env"):
@@ -32,7 +117,7 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logging.info("=== Agent V1 démarrage — config chargée ===")
logging.info("=== Agent V1 démarrage — config chargée (PID %d) ===", os.getpid())
logging.info("RPA_SERVER_URL=%s", os.environ.get("RPA_SERVER_URL", "(non défini)"))
logging.info("RPA_SERVER_HOST=%s", os.environ.get("RPA_SERVER_HOST", "(non défini)"))
logging.info("RPA_API_TOKEN=%s", os.environ.get("RPA_API_TOKEN", "(non défini)")[:8] + "...")

View File

@@ -488,6 +488,8 @@ class ReplayResultReport(BaseModel):
# Champs enrichis pour target_not_found (pause supervisée)
target_description: Optional[str] = None # Description humaine de la cible
target_spec: Optional[Dict[str, Any]] = None # Spec complete de la cible
# Correction humaine (mode apprentissage supervisé)
correction: Optional[Dict[str, Any]] = None # {x_pct, y_pct, uia_snapshot, crop_b64}
class ErrorCallbackConfig(BaseModel):
@@ -1883,6 +1885,26 @@ async def start_raw_replay(request: RawReplayRequest):
resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default")
with _replay_lock:
# ── Nettoyage : annuler les replays bloqués pour cette machine ──
# Un replay en paused_need_help bloque tous les suivants.
# Quand on lance un nouveau replay, les anciens sont obsolètes.
stale_ids = [
rid for rid, state in _replay_states.items()
if state.get("machine_id") == resolved_machine_id
and state["status"] in ("paused_need_help", "running")
]
for rid in stale_ids:
old_state = _replay_states[rid]
old_sid = old_state.get("session_id", "")
old_state["status"] = "cancelled"
# Vider la queue associée
if old_sid in _replay_queues:
_replay_queues.pop(old_sid, None)
logger.info(
f"Replay {rid} annulé (remplacé par {replay_id}) — "
f"était {old_state.get('completed_actions', 0)}/{old_state.get('total_actions', 0)}"
)
_replay_queues[session_id] = list(actions)
_replay_states[replay_id] = _create_replay_state(
replay_id=replay_id,
@@ -3032,6 +3054,26 @@ async def report_action_result(report: ReplayResultReport):
except Exception as e:
logger.debug(f"Learning: échec enregistrement: {e}")
# === Correction humaine (mode apprentissage supervisé) ===
# L'humain a montré à Léa où cliquer. On stocke cette correction
# dans target_memory pour que la prochaine fois, Léa sache toute seule.
if report.correction and original_action:
try:
corr = report.correction
target_spec = original_action.get("target_spec", {})
logger.info(
f"[APPRENTISSAGE] Correction humaine reçue : "
f"({corr.get('x_pct', 0):.4f}, {corr.get('y_pct', 0):.4f}) "
f"pour '{target_spec.get('by_text', '?')}'"
)
_replay_learner.record_human_correction(
session_id=session_id,
action=original_action,
correction=corr,
)
except Exception as e:
logger.warning(f"Learning: échec stockage correction humaine: {e}")
# === Audit Trail : traçabilité complète pour conformité hospitalière ===
try:
_action = original_action or {"action_id": action_id, "type": "unknown"}

View File

@@ -175,6 +175,55 @@ class ReplayLearner:
self.record(outcome)
def record_human_correction(
self,
session_id: str,
action: Dict[str, Any],
correction: Dict[str, Any],
) -> None:
"""Enregistrer une correction humaine (mode apprentissage supervisé).
L'humain a montré à Léa où cliquer. On stocke cette correction
dans target_memory.db pour que la prochaine fois, Léa sache.
"""
target_spec = action.get("target_spec", {})
by_text = target_spec.get("by_text", "")
window_title = target_spec.get("window_title", "")
x_pct = correction.get("x_pct", 0.0)
y_pct = correction.get("y_pct", 0.0)
# Enregistrer dans le JSONL d'apprentissage
outcome = ActionOutcome(
session_id=session_id,
action_id=action.get("action_id", ""),
action_type="click",
target_description=by_text,
window_title=window_title,
resolution_method="human_supervised",
resolution_score=1.0, # Confiance maximale — l'humain a montré
success=True,
)
self.record(outcome)
# Stocker dans target_memory.db pour le lookup futur
try:
from .replay_memory import get_target_memory_store
store = get_target_memory_store()
if store:
store.record_success(
screen_signature="human_correction",
target_spec=target_spec,
resolved_position={"x_pct": x_pct, "y_pct": y_pct},
method="human_supervised",
score=1.0,
)
logger.info(
f"[APPRENTISSAGE] Correction stockée dans target_memory : "
f"'{by_text}' → ({x_pct:.4f}, {y_pct:.4f})"
)
except Exception as e:
logger.warning(f"Learning: échec stockage target_memory: {e}")
def query_similar(
self,
target_description: str = "",

View File

@@ -51,10 +51,14 @@ echo Pour arreter Lea : clic droit sur l'icone ^> "Quitter Lea"
echo Vous pouvez fermer cette fenetre.
echo.
.venv\Scripts\pythonw.exe run_agent_v1.py
start "" /b .venv\Scripts\pythonw.exe run_agent_v1.py
:: Attendre 3s puis verifier que Lea tourne
timeout /t 3 >nul
tasklist /FI "IMAGENAME eq pythonw.exe" /NH 2>nul | findstr /I "pythonw" >nul
if errorlevel 1 (
echo.
echo Lea a rencontre un probleme au demarrage.
echo Lea n'a pas demarre correctement.
echo Tentative avec affichage des erreurs...
echo.
.venv\Scripts\python.exe run_agent_v1.py

View File

@@ -875,17 +875,174 @@ def _find_session_dir(machine_id: str, session_id: str) -> Optional[Path]:
return None
def _load_crop_as_base64(session_dir: Path, screenshot_id: str) -> str:
"""Charger un crop screenshot et le retourner en base64.
Le crop (80x80 autour du clic) sert d'ancre pour le template matching —
le GroundingEngine compare cette vignette a l'ecran actuel via OpenCV.
"""
if not screenshot_id:
return ""
crop_path = session_dir / "shots" / f"{screenshot_id}_crop.png"
if not crop_path.is_file():
return ""
try:
import base64
data = crop_path.read_bytes()
return base64.b64encode(data).decode("ascii")
except Exception:
return ""
def _build_vlm_description(
uia_snapshot: Dict[str, Any], window_info: Dict[str, Any],
) -> str:
"""Construire une description naturelle pour le VLM.
Le VLM recoit cette phrase + le screenshot actuel et doit localiser
l'element decrit. Plus la description est precise, meilleur le grounding.
"""
name = uia_snapshot.get("name", "")
control_type = uia_snapshot.get("control_type", "")
window_title = window_info.get("title", "") if window_info else ""
parts = []
if control_type:
parts.append(f"le {control_type}")
if name:
parts.append(f"'{name}'")
if window_title and window_title != "unknown_window":
parts.append(f"dans la fenetre '{window_title}'")
if parts:
return " ".join(parts)
return ""
def _build_full_target_spec(
event: Dict[str, Any], session_dir: Path,
) -> Dict[str, Any]:
"""Construire un target_spec complet pour la cascade de resolution visuelle.
Exploite TOUTES les donnees capturees pendant l'enregistrement :
- uia_snapshot → resolution UIA locale (lea_uia.exe, 10-20ms)
- crop screenshot → template matching OpenCV (~100ms)
- nom UIA + window_title → OCR docTR + VLM grounding (1-5s)
La cascade : UIA → template → serveur (docTR+VLM) → VLM local.
Si tout echoue → pause supervisee (pas de clic aveugle).
"""
uia_snapshot = event.get("uia_snapshot", {})
window_info = event.get("window", {})
vision_info = event.get("vision_info", {})
screenshot_id = event.get("screenshot_id", "")
name = uia_snapshot.get("name", "") if uia_snapshot else ""
control_type = uia_snapshot.get("control_type", "") if uia_snapshot else ""
automation_id = uia_snapshot.get("automation_id", "") if uia_snapshot else ""
parent_path = uia_snapshot.get("parent_path", []) if uia_snapshot else []
window_title = window_info.get("title", "") if window_info else ""
# Cascade de resolution — UIA d'abord (rapide), puis vision
resolve_order = []
# UIA : disponible si on a un nom ou automation_id
has_uia = bool(name or automation_id)
if has_uia:
resolve_order.append("uia")
# Template matching : disponible si on a un crop
anchor_b64 = _load_crop_as_base64(session_dir, screenshot_id)
if anchor_b64:
resolve_order.append("template")
# Serveur (docTR OCR + SomEngine + VLM) : toujours en fallback
resolve_order.append("server")
# VLM local : dernier recours
resolve_order.append("vlm_local")
if not resolve_order:
return {}
target_spec: Dict[str, Any] = {
"resolve_order": resolve_order,
"window_title": window_title,
}
# UIA target
if has_uia:
target_spec["uia_target"] = {
"name": name,
"control_type": control_type,
"automation_id": automation_id,
"parent_path": parent_path,
}
# Anchor pour template matching
if anchor_b64:
target_spec["anchor_image_base64"] = anchor_b64
# Texte pour OCR (docTR sur le serveur)
if name:
target_spec["by_text"] = name
# Description VLM
vlm_desc = _build_vlm_description(uia_snapshot or {}, window_info or {})
if vlm_desc:
target_spec["vlm_description"] = vlm_desc
return target_spec
def _build_desktop_cleanup_actions(screen_w: int, screen_h: int) -> List[Dict[str, Any]]:
"""Construire les actions de nettoyage bureau AVANT le replay.
Sur Windows 11, un clic sur l'extreme droite de la barre des taches
(le pixel invisible 'Afficher le bureau') minimise toutes les fenetres.
C'est exactement ce qu'un humain ferait avant de commencer un travail :
repartir d'un bureau propre.
100% visuel — pas de raccourci clavier injecte (cf feedback_100pct_visual).
"""
# Le bouton 'Afficher le bureau' est au pixel tout en bas a droite
# de la taskbar. Sur Win11, c'est une fine bande cliquable.
x_pct = round((screen_w - 2) / screen_w, 6) # avant-dernier pixel
y_pct = round((screen_h - 2) / screen_h, 6) # idem vertical
return [
{
"action_id": f"act_setup_desktop_{uuid.uuid4().hex[:6]}",
"type": "click",
"x_pct": x_pct,
"y_pct": y_pct,
"button": "left",
"visual_mode": False, # position fixe, pas besoin de grounding
"wait_before": 0.3,
"_setup_action": True, # marqueur pour le distinguer des vrais clics
},
{
"action_id": f"act_setup_wait_{uuid.uuid4().hex[:6]}",
"type": "wait",
"duration_ms": 1000,
"wait_before": 0,
"_setup_action": True,
},
]
def _simple_build_replay(events: List[Dict[str, Any]], session_dir: Path) -> List[Dict[str, Any]]:
"""Construire un replay simplifie sans dependre de stream_processor.
"""Construire un replay visuel depuis les evenements bruts.
Convertit les evenements bruts en actions normalisees simples :
- mouse_click -> action click (coordonnees en pixels)
- text_input / type -> action type
- key_combo / key_press -> action key_combo
Chaque clic est enrichi avec un target_spec complet qui alimente
la cascade de resolution du GroundingEngine :
UIA local (10ms) → template matching (100ms) → serveur docTR/VLM (2-5s)
C'est un fallback pour quand build_replay_from_raw_events n'est pas disponible.
Les coordonnees ne sont PAS converties en pourcentages (le serveur les accepte
aussi en pixels).
Les coordonnees x_pct/y_pct sont incluses comme hint de derniere chance.
Lea ne clique pas en aveugle — elle VOIT l'ecran et CHERCHE l'element.
Le replay commence par un nettoyage du bureau (clic 'Afficher le bureau')
pour partir d'un etat propre — exactement comme un humain.
"""
actions: List[Dict[str, Any]] = []
click_count = 0
@@ -900,6 +1057,9 @@ def _simple_build_replay(events: List[Dict[str, Any]], session_dir: Path) -> Lis
screen_w, screen_h = int(res[0]), int(res[1])
break
# ── Étape 0 : nettoyer le bureau ──
actions.extend(_build_desktop_cleanup_actions(screen_w, screen_h))
for ev in events:
inner = ev.get("event", {})
etype = inner.get("type", "")
@@ -913,15 +1073,35 @@ def _simple_build_replay(events: List[Dict[str, Any]], session_dir: Path) -> Lis
pos = inner.get("pos", [0, 0])
click_count += 1
x_pct = round(pos[0] / screen_w, 6) if screen_w else 0.0
y_pct = round(pos[1] / screen_h, 6) if screen_h else 0.0
action = {
"action_id": action_id,
"type": "click",
"x_pct": round(pos[0] / screen_w, 6) if screen_w else 0.0,
"y_pct": round(pos[1] / screen_h, 6) if screen_h else 0.0,
"x_pct": x_pct,
"y_pct": y_pct,
"button": inner.get("button", "left"),
"visual_mode": False, # pas d'enrichissement → coords brutes
"wait_before": 0.5,
}
# Enrichir avec la cascade visuelle complete
target_spec = _build_full_target_spec(inner, session_dir)
if target_spec:
action["visual_mode"] = True
action["target_spec"] = target_spec
uia_name = inner.get("uia_snapshot", {}).get("name", "?")
methods = target_spec.get("resolve_order", [])
logger.info(
"Action %s enrichie [%s] : '%s' (%s)",
action_id, "+".join(methods), uia_name,
inner.get("uia_snapshot", {}).get("control_type", "?"),
)
else:
# Pas de donnee visuelle du tout → coords brutes en dernier recours
action["visual_mode"] = False
logger.warning("Action %s : aucune donnee visuelle, coords brutes", action_id)
actions.append(action)
elif etype in ("text_input", "type"):