17 Commits

Author SHA1 Message Date
Dom
203dc00d53 fix: UIA compare les noms d'app au lieu des titres complets
"Fichier" dans "*,Ceci est un test – Bloc-notes" était rejeté
parce que le titre attendu était "test.txt – Bloc-notes".
Maintenant la comparaison extrait le nom d'app (Bloc-notes)
et accepte le match si c'est la même application.

Résout : "Ajouter un nouvel onglet" bloqué quand un fichier
différent est ouvert dans Bloc-notes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 10:27:08 +02:00
Dom
e9a028134a feat: blocs conditionnels — skip automatique des dialogues absents
Le session_cleaner détecte les dialogues système (Enregistrer sous,
Ouvrir, Confirmer, etc.) et marque les actions correspondantes comme
conditionnelles. Au replay, si le dialogue n'apparaît pas (ex: Ctrl+S
sauve silencieusement car le fichier existe), les actions du dialogue
sont skippées automatiquement.

Détection basée sur des patterns de noms de dialogues Windows FR/EN.
Testé : seul le clic dans "Enregistrer sous" est conditionnel,
les actions Bloc-notes/Rechercher/systray restent normales.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 10:20:00 +02:00
Dom
01bba7bc6c feat: wrong_window déclenche le mode apprentissage au lieu de bloquer
Quand la fenêtre attendue ne correspond pas (ex: Ctrl+S a sauvé sans
dialogue "Enregistrer sous"), Léa passe en mode capture au lieu de
retourner paused_need_help. Si l'humain ne fait rien pendant 10s,
l'action est skippée (l'état est considéré déjà atteint).

4 déclencheurs apprentissage maintenant couverts :
- retry_failed : grounding + retry échouent
- no_screen_change : clic sans effet visible
- wrong_window : fenêtre attendue absente
- SUPERVISE direct : Policy décide de demander

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 09:27:01 +02:00
Dom
d5285de99c feat: mode apprentissage — retry échoué + écran inchangé déclenchent la capture humaine
Trois chemins vers le mode apprentissage supervisé :
1. Grounding échoue → Policy RETRY → retry échoue → capture humaine
2. Clic visuel sans effet (écran inchangé 3s) → capture humaine
3. Policy SUPERVISE direct → capture humaine

La capture enregistre un mini-workflow complet (clics + frappes + combos)
jusqu'à Ctrl+Shift+L ou 10s d'inactivité. Correction envoyée au serveur.

Testé E2E : workflow Chrome avec résultats Google dynamiques +
bandeau cookies — Léa demande l'aide, capture, reprend.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 08:33:57 +02:00
Dom
33c198b827 feat: premier replay E2E + mode apprentissage supervisé
Premier replay fonctionnel de bout en bout (Bloc-notes, Chrome).

Corrections critiques :
- Fix double-lancement agent (Lea.bat start /b + verrou PID)
- Sérialisation replay (threading.Lock dans poll_and_execute)
- Garde UIA bbox >50% écran (rejet conteneurs "Bureau")
- Filtre fenêtres bruit système (systray overflow)
- Auto-nettoyage replays bloqués (paused_need_help)

Cascade visuelle complète dans session_cleaner :
- UIA local (10ms) → template matching (100ms) → serveur docTR/VLM
- Nettoyage bureau pré-replay (clic "Afficher le bureau")
- Crops 80x80 + vlm_description pour chaque clic

Grounding contraint à la fenêtre active :
- Capture croppée à la fenêtre au lieu de l'écran entier
- Conversion coordonnées fenêtre → écran
- Élimine les faux positifs taskbar/systray

Mode apprentissage supervisé (SUPERVISE → capture humaine) :
- Léa passe en mode capture quand elle est perdue
- Capture mini-workflow humain (clics + frappes + combos)
- Fin par Ctrl+Shift+L ou timeout inactivité 10s
- Correction stockée dans target_memory.db via serveur

Deploy Windows complet (grounding.py, policy.py, uia_helper.py).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 07:42:50 +02:00
Dom
816b37af98 fix: session_cleaner utilise le fallback simple exclusivement
build_replay_from_raw_events transforme les events (réordonne, injecte
du setup "ouvrir l'app", fusionne les actions, ajoute des waits) ce qui
décale les clics par rapport à l'enregistrement original. Le texte était
saisi dans le mauvais champ parce que les actions n'étaient plus en 1:1
avec la session.

Le fallback _simple_build_replay reproduit les events tels quels en
coords brutes — exactement ce qu'on veut pour "nettoyer et rejouer".
Le session_cleaner l'utilise maintenant exclusivement.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 16:29:07 +02:00
Dom
d82aad984f fix: session_cleaner force visual_mode=False sur les clics
Contournement temporaire du crash agent "cannot unpack non-iterable
NoneType object" qui se produit quand l'agent Windows tente une
résolution visuelle (visual_mode=True) sur les actions replay.

Les actions construites par build_replay_from_raw_events gardent
leurs coordonnées enrichies (x_pct, y_pct calculés depuis la
session) mais sont envoyées avec visual_mode=False pour que l'agent
clique aux coords brutes sans passer par le grounding.

C'est un compromis temporaire : moins intelligent (pas de résolution
adaptative) mais fonctionnel (les clics arrivent aux bonnes coords).
Le mode visuel sera réactivé quand le bug agent sera diagnostiqué
et corrigé (le traceback n'est pas visible côté serveur, le
redéploiement de l'agent avec debug n'a pas pris effet).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 16:13:43 +02:00
Dom
057c37131f fix: session_cleaner fallback — x_pct/y_pct + visual_mode=False
Deux bugs dans _simple_build_replay :

1. Mauvais noms de champs : x_percent/y_percent au lieu de x_pct/y_pct
   attendus par l'agent executor. Et valeurs en 0-100 au lieu de 0-1.
   Résultat : l'agent recevait x_pct=None → crash "cannot unpack
   non-iterable NoneType object".

2. Pas de visual_mode=False explicite. Sans enrichissement
   (target_spec vide, pas d'anchor), l'agent tentait une résolution
   visuelle sur du vide → crash.

Aussi : la condition de fallback empêchait le déclenchement quand
build_replay_from_raw_events crashait (error_message non vide bloquait
la branche). Corrigé : le fallback se déclenche sur `not replay_actions`
(couvre None, liste vide, et crash du build principal).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 15:51:40 +02:00
Dom
9bcce3fc68 feat: session_cleaner — outil leger de nettoyage de sessions avant replay
Petit serveur Flask standalone (tools/session_cleaner.py) qui permet de :
- Lister les sessions enregistrees recentes
- Visualiser chaque session avec ses screenshots (crop + full)
- Marquer les clics parasites a supprimer (auto-detection des toasts,
  clics droit, fenetres Lea/systray, derniers 3 evenements)
- Re-construire un replay nettoye et l'injecter dans la queue via
  POST /api/v1/traces/stream/replay/raw

Option A du rapport audit VWB : "Le besoin reel est supprimer 3 clics
parasites et relancer — c'est 30 secondes d'UX, pas un Visual Workflow
Builder."

Port : 5006
Dependencies : Flask (deja dans le venv), aucune nouvelle

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 11:35:31 +02:00
Dom
f96f6322ec chore: nettoyage code mort — suppression _a_trier/, archives/, .bak, scaffold vide
Supprime ~8.2 Go de fichiers parasites qui polluent les grep, consomment
des tokens, et ajoutent du bruit au repo :

- _a_trier/ (561 Mo) — scripts legacy, backups, sessions logs, démos
- archives/ (21 Mo) — copie figée code décembre 2024 (déjà dans git history)
- visual_workflow_builder/_a_trier/ (7.6 Go) — backups VWB legacy + anciens frontends
- web_dashboard/app.py.bak_20260304_2225 — fichier .bak oublié
- agent_v1/ (top-level) — scaffold vide jamais alimenté
- core/detection/ui_detector_old.py.bak — .bak traqué par erreur

Retire aussi du tracking git :
- 2 fichiers __pycache__ traqués par erreur dans VWB backend

Met à jour .gitignore pour prévenir la récurrence :
- *.bak, *.bak_*, *.orig, *.old
- _a_trier/, archives/

Tout ce contenu reste récupérable via git history (tag pre-cleanup-phase1-20260410).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 11:35:31 +02:00
Dom
02ee2d7b5b fix: Fenêtre incorrecte strict → pause supervisée pour apprentissage
Symétrie avec le fix 7cc03f6f1 (no_screen_change strict → paused_need_help).

Avant : si l'agent détecte en pré-vérification que la fenêtre active
n'est pas celle attendue, l'erreur retombait dans la branche retry+stop
legacy → 3 retries inutiles puis status=error et queue vidée.

C'est une violation de feedback_failure_is_learning.md : un échec Léa
n'est jamais un "stop avec error", c'est un moment pédagogique.

Maintenant :
  1. L'agent envoie warning="wrong_window" dans le résultat (en plus
     de l'error textuel existant). Ajouté aux 2 chemins :
     - pré-vérif (expected_window_before mismatch, executor.py ~587)
     - post-vérif strict (expected_window_title timeout, executor.py ~820)
  2. Le serveur détecte warning="wrong_window" AVANT la branche
     retry+stop legacy → redirection vers paused_need_help
  3. pause_message explicite : "Je m'attendais à voir la bonne fenêtre
     mais je vois autre chose. Peux-tu vérifier que l'application est
     au premier plan ?"
  4. Queue intacte (l'action reste en tête, prête à être relancée)
  5. log_replay_failure pour l'apprentissage futur

Cause fréquente identifiée : les popups de Léa elle-même (notifications,
fenêtre de chat) volent le focus Windows pendant le replay → l'app cible
perd le premier plan → pré-vérif détecte le mismatch. Bug UX séparé à
traiter (Léa ne devrait pas prendre le focus pendant un replay actif).

Appliqué aux 2 copies de l'agent (dev + deploy).

Tests : 56 E2E + Phase0 passent, 0 régression.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 10:41:29 +02:00
Dom
47993e2ee9 chore: ajouter replay_failure_logger.py au tracking git
Ce fichier existe sur disque depuis le 4 avril mais n'a jamais été ajouté
à git. Il est importé par api_stream.py (ligne 29) — un fresh clone sans
ce fichier ne peut pas démarrer le serveur streaming.

Découvert par le project-quality-guardian lors de l'audit global du
11 avril (item C1, priorité P0 bloquant absolu).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 10:35:51 +02:00
Dom
7cc03f6f10 fix: no_screen_change strict → pause supervisée pour apprentissage
Rectification de la branche C introduite dans a21f1ea9f.

## Ce qui était faux

a21f1ea9f faisait :
  strict + no_screen_change → retry × 3 → status=error → queue vidée

C'est le réflexe d'un RPA classique qui se casse la figure quand ça
rate. Ce n'est PAS la philosophie Léa.

Dom m'a rappelé que j'avais oublié ma propre vision documentée dans
project_lea_apprentissage_plan.md et feedback_not_a_click_box.md :
*"Quand elle dit qu'elle n'a pas trouvé X, elle demande montre-moi.
C'est à ce moment qu'il faudrait passer en mode apprentissage."*

## Ce qui est correct maintenant

  strict + no_screen_change
    → status = "paused_need_help"
    → failed_action stocké (target, screenshot, method, score, reason)
    → pause_message demandant l'intervention humaine
    → queue intacte (l'action reste en tête, prête à être relancée)
    → log_replay_failure pour l'apprentissage futur
    → l'agent reçoit replay_paused=True dans /replay/next et s'arrête
    → l'humain corrige physiquement sur la machine cible
    → le replay reprend via /replay/{replay_id}/resume

Redirection vers le mécanisme paused_need_help qui existe déjà pour le
cas target_not_found. Zéro nouveau code de pause, juste une 2ème entrée
dans ce mécanisme.

Le comportement legacy (success_strict=False) reste inchangé : on
log un warning et on continue, comportement tolérant pour les actions
non-critiques.

## Lesson apprises

1. Toujours relire les fichiers mémoire pertinents AVANT d'implémenter
   une branche de gestion d'erreur (nouvelle règle dans
   feedback_reread_before_code.md)
2. Un échec Léa n'est jamais un "stop avec error" — c'est un moment
   pédagogique (nouvelle règle dans feedback_failure_is_learning.md)
3. Ne pas s'auto-presser quand Dom n'a jamais demandé d'aller vite

## Tests

- 56 tests E2E + Phase0 passent, 0 régression
- Comportement vérifié par inspection du code : pause_message formé
  correctement, queue préservée, log_replay_failure appelé

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 09:27:45 +02:00
Dom
a21f1ea9fa feat: garde qualité résolution (B) + no_screen_change strict (C)
Deux garde-fous qui ferment des trous identifiés lors du test de replay
chirurgical du 11 avril 2026 sur sess_20260411T084629_2d588e.

## B — Garde qualité en sortie de cascade (_validate_resolution_quality)

Couche de validation ajoutée en sortie du handler /resolve_target, après
que la cascade (_resolve_target_sync) a produit son meilleur candidat.
Single point of insertion, n'altère pas la cascade existante.

Deux checks :

  1. Seuil de score minimum par méthode (_RESOLUTION_MIN_SCORES)
     - hybrid_text_direct ≥ 0.80
     - som_anchor_match / som_text_match ≥ 0.75
     - template_matching ≥ 0.85
     - vlm_* / grounding ≥ 0.60
     - memory_* : pas de seuil (confiance cristallisée)
     - v4_uia_local / uia ≥ 0.90

  2. Garde de proximité contre coords enregistrées
     Si fallback_x/y_pct sont significatifs (pas placeholder 0.5/0.5 ni
     0.0/0.0), rejette si drift > 20% de l'écran dans un axe.
     Reproduit un faux positif vu en production : SoM a trouvé
     "Enregistrer" à (0.505, 0.770) alors que l'enregistrement était à
     (0.093, 0.356) — écart de 0.41.

Quand un check rejette : retourne resolved=False avec method=
"rejected_low_score_*" ou "rejected_drift_*" et reason détaillée.
L'action passe alors par le chemin "visual_resolve_failed" côté agent
→ Policy → pause supervisée ou retry selon contexte.

7 tests unitaires inline validés (score bas, drift, mémoire qui passe
toujours, placeholders V4 qui skip la garde drift, etc.).

## C — no_screen_change devient un échec strict en mode strict

Avant : si un clic retourne warning='no_screen_change' (écran inchangé
après action), le replay loggait un warning et CONTINUAIT à l'action
suivante. Trop indulgent pour les workflows critiques.

Maintenant : la branche no_screen_change consulte le flag
success_strict de l'action courante.

  - success_strict=True : traité comme vrai échec
      → retry si retry_count < MAX_RETRIES_PER_ACTION
      → stop définitif sinon (status=error, queue vidée, callback)

  - success_strict=False (legacy) : comportement inchangé, on continue

Prérequis : _create_replay_state copie maintenant success_strict,
expected_window_before, expected_window_title, intention dans la
version slim de actions stockée dans replay_state. Nécessaire pour
lire le flag depuis current_action_index dans /replay/result.

## Tests

- 7 tests unitaires inline sur _validate_resolution_quality
- 56 tests E2E + Phase0 passent, zéro régression
- Instrumentation [REPLAY] reste pleinement fonctionnelle

## Limites non traitées ici (explicites)

- La latence de 14s entre deux clics (pre-analyze + cascade + agent
  polling) reste inchangée. Les menus déroulants Windows peuvent encore
  se refermer avant le 2ème clic. Piste A du plan, à traiter séparément.
- L'intégration d'OS-Atlas-Base-7B comme grounder spécialisé reste
  dans les cartons (recommandation du rapport état de l'art).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 09:11:41 +02:00
Dom
9188bd7df1 fix: masquer la fenêtre console lors du spawn lea_uia.exe sur Windows
Ajoute creationflags=CREATE_NO_WINDOW (0x08000000) au subprocess.run()
qui appelle lea_uia.exe dans UIAHelper._run(). Sans ce flag, Windows
ouvre brièvement une fenêtre cmd noire à CHAQUE appel — et le captor
appelle UIA à chaque clic utilisateur pendant l'enregistrement.

Symptômes rapportés par Dom :
- Flash de fenêtre terminal à chaque clic (visible à l'œil)
- Ralentissement de la souris pendant les enregistrements
- Pollution des données d'apprentissage : le VLM de post-analyse
  "voit" la fenêtre cmd et l'enregistre comme élément cliqué
  (log serveur : "gemma4 a lu l'élément : 'C:\\Lea\\helpers\\lea_uia.exe'")

Implémentation portable :
- Flag calculé au niveau module : 0x08000000 sur Windows, 0 sur Linux/Mac
- getattr(subprocess, "CREATE_NO_WINDOW", ...) pour gérer l'absence de
  la constante sur Linux
- creationflags=0 est un no-op sur Linux, safe

Appliqué aux 2 copies synchronisées :
- agent_v0/agent_v1/core/uia_helper.py (source active pour l'agent)
- core/workflow/uia_helper.py (copie identique)

85 tests in silico OK (29 UIA + 56 E2E/Phase0). Le vrai test c'est
Dom qui refait un enregistrement et vérifie qu'il n'y a plus de
flash de terminal.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 22:18:11 +02:00
Dom
f82753debe chore: instrumentation [REPLAY] pour diagnostic chaîne replay
Ajoute 6 points de log structurés homogénéisés avec le préfixe [REPLAY]
aux endroits clés de la chaîne de replay, pour permettre de suivre
précisément ce qui se passe pendant un test humain et diagnostiquer
les points de rupture sans déduire à l'aveugle.

Points de log :
1. DISPATCH          — /replay/next envoie une action (expected_before/after,
                       resolve_order, has_uia, has_anchor, by_text, strict)
2. RESOLVE_ENTRY     — _resolve_target_sync reçoit la demande (window_title,
                       uia_target, anchor, strict_mode)
3. RESOLVE_EXIT      — résolution terminée (method, coords, score, from_memory)
4. RESOLVE_EXCEPTION — crash rare dans la résolution
5. REPORT            — /replay/result reçoit le rapport agent (success, error,
                       warning, resolution_method, actual_position)
6. VERIFY            — décision finale post-vérification (agent_success,
                       ver_verified, sem_verified, final_success)

Usage : journalctl --user -u rpa-streaming -f | grep REPLAY

Aucune modif de logique, uniquement des logger.info() aux points de
décision critiques. 56 tests E2E + Phase0 restent verts.

Ces logs sont là pour stabiliser la chaîne après les modifications
robustesse du matin (strict control, UIA strict, filtre UIA-aware)
qui ont cassé les replays réels de Dom et ne se voient pas dans les
tests automatisés in silico.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 22:07:56 +02:00
Dom
b92cb9db03 feat: Phase 1 apprentissage — greffe TargetMemoryStore sur V4
Greffe minimale du mécanisme d'apprentissage persistant (Fiche #18,
target_memory_store.py) sur le pipeline streaming V4 sans toucher à V3.

Architecture (docs/PLAN_APPRENTISSAGE_LEA.md) :
- Lookup mémoire AVANT la cascade résolution coûteuse OCR/template/VLM
  dans _resolve_target_sync → hit = <10ms, miss = overhead zéro
- Record APRÈS validation post-condition (title_match strict)
  dans /replay/result → 2 succès → cristallisation par répétition
- Single source of truth : l'agent remplit report.actual_position avec
  les coords effectivement cliquées, le serveur les lit directement.
  Pas de cache intermédiaire (option C du plan).

Signature écran V4 : sha256(normalize(window_title))[:16]. Robuste aux
données variables, faux positifs rattrapés par le post-cond qui
décrémente la fiabilité via record_failure().

Fichiers :
- agent_v0/server_v1/replay_memory.py : nouveau wrapper 316 lignes
  exposant compute_screen_sig/memory_lookup/record_success/failure,
  lazy-init du store, normalisation texte stable, garde sanity coords
- agent_v0/server_v1/resolve_engine.py : lookup mémoire en tête de
  _resolve_target_sync (30 lignes)
- agent_v0/server_v1/replay_engine.py : _create_replay_state stocke
  une copie slim des actions (sans anchor base64) pour retrouver le
  target_spec par current_action_index
- agent_v0/server_v1/api_stream.py : 4 callers passent actions=...,
  record success/failure dans /replay/result lit actual_position
  du rapport (click-only), correction du commentaire Pydantic
- agent_v0/agent_v1/core/executor.py : remplit result["actual_position"]
  après self._click(), transmis dans le report de poll_and_execute

Tests : 56 E2E + Phase0 passent, zéro régression. Cycle Phase 1 validé
en simulation : miss → record → miss → record → HIT au 3ème passage.

Le deploy copy executor.py a une divergence pré-existante de 1302
lignes non committées — traité séparément lors du cleanup prochain.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 21:08:14 +02:00
24 changed files with 5418 additions and 850 deletions

8
.gitignore vendored
View File

@@ -75,3 +75,11 @@ htmlcov/
# === Backups ===
*_backup_*
backups/
*.bak
*.bak_*
*.orig
*.old
# === Legacy / Triage ===
_a_trier/
archives/

View File

@@ -17,6 +17,7 @@ import base64
import hashlib
import io
import os
import threading
import time
import logging
@@ -72,6 +73,12 @@ class ActionExecutorV1:
# different de celui qui utilise l'instance).
self._sct = None
self.running = True
# ── Verrou de sérialisation replay ──
# Garantit qu'UNE SEULE action de replay s'exécute à la fois.
# Sans ce lock, deux threads (polling main.py + lea_ui) peuvent
# consommer deux actions simultanément → race condition + mss
# thread-unsafe retourne des résolutions fantômes (1024x768).
self._replay_lock = threading.Lock()
# Backoff exponentiel pour le polling replay (evite de marteler le serveur)
self._poll_backoff = 1.0 # Delai actuel (secondes)
self._poll_backoff_min = 1.0 # Delai minimal (reset apres succes)
@@ -327,19 +334,50 @@ class ActionExecutorV1:
break
if found_root and expected_root != found_root:
# Match souple : une sous-partie commune (ex: "Bloc-notes")
if (expected_root.lower() not in found_root.lower()
and found_root.lower() not in expected_root.lower()):
# Match souple : même app (ex: "Bloc-notes")
# Le titre peut changer (fichier différent) mais
# l'app est la même → "Fichier" est au bon endroit.
def _app_from(t):
for s in [" ", " - ", ""]:
if s in t:
return t.split(s)[-1].strip().lower()
return t.strip().lower()
same_app = _app_from(expected_root) == _app_from(found_root)
substring_match = (
expected_root.lower() in found_root.lower()
or found_root.lower() in expected_root.lower()
)
if not same_app and not substring_match:
logger.warning(
f"UIA REJET : '{name}' trouvé dans '{found_root}' "
f"mais attendu dans '{expected_root}'"
)
print(
f" [UIA] REJET — '{name}' trouvé dans mauvaise fenêtre "
f"({found_root}{expected_root})"
f" [UIA] REJET — '{name}' dans mauvaise app "
f"({_app_from(found_root)}{_app_from(expected_root)})"
)
return None
# ── GARDE : rejeter les éléments géants (conteneurs) ──
# Un élément qui couvre >50% de l'écran est un conteneur
# (Bureau, Rechercher, liste), pas un bouton cliquable.
# Cliquer au centre d'un conteneur = clic aveugle.
elem_w = element.width()
elem_h = element.height()
screen_area = screen_width * screen_height
elem_area = elem_w * elem_h
if screen_area > 0 and elem_area / screen_area > 0.5:
logger.warning(
f"UIA REJET : '{name}' couvre {elem_area / screen_area * 100:.0f}% "
f"de l'écran ({elem_w}x{elem_h}) — conteneur, pas un élément cliquable"
)
print(
f" [UIA] REJET — '{name}' trop grand "
f"({elem_w}x{elem_h}, {elem_area / screen_area * 100:.0f}% écran)"
)
return None
cx, cy = element.center()
if screen_width <= 0 or screen_height <= 0:
return None
@@ -499,10 +537,63 @@ class ActionExecutorV1:
"visual_resolved": False,
}
# ── Bloc conditionnel : skip si le dialogue n'est pas apparu ──
# Les actions marquées conditional_on_window ne s'exécutent que
# si la fenêtre attendue est effectivement présente. Sinon → skip.
# Ex: Ctrl+S a sauvé silencieusement → pas de "Enregistrer sous"
# → les clics dans le dialogue sont skippés automatiquement.
cond_window = action.get("conditional_on_window")
if cond_window:
try:
from ..window_info_crossplatform import get_active_window_info
current_info = get_active_window_info()
current_title = current_info.get("title", "")
# Comparaison souple (sous-chaîne)
cond_lower = cond_window.lower()
current_lower = current_title.lower() if current_title else ""
match = (
cond_lower in current_lower
or current_lower in cond_lower
)
if not match:
logger.info(
f"[CONDITIONNEL] Skip action {action_id}"
f"dialogue '{cond_window}' absent "
f"(fenêtre actuelle: '{current_title}')"
)
print(
f" [SKIP] Dialogue '{cond_window}' absent → action skippée"
)
result["success"] = True
result["warning"] = "conditional_skipped"
return result
else:
logger.info(
f"[CONDITIONNEL] Dialogue '{cond_window}' présent → exécution"
)
except Exception as e:
logger.debug(f"Vérif conditionnelle échouée : {e}")
# ── Délai inter-actions (anti race condition mss) ──
wait_before = action.get("wait_before", 0.5)
if wait_before > 0:
time.sleep(wait_before)
try:
monitor = self.sct.monitors[1]
width, height = monitor["width"], monitor["height"]
# ── Diagnostic résolution ──
logger.info(
f"[REPLAY] Action {action_id} ({action_type}) — "
f"écran replay: {width}x{height}, "
f"x_pct={action.get('x_pct', 0):.4f}, "
f"y_pct={action.get('y_pct', 0):.4f} "
f"→ pixel ({int(action.get('x_pct', 0) * width)}, "
f"{int(action.get('y_pct', 0) * height)})"
)
# Resolution visuelle des coordonnees si demande
x_pct = action.get("x_pct", 0.0)
y_pct = action.get("y_pct", 0.0)
@@ -526,7 +617,7 @@ class ActionExecutorV1:
)
if expected_title and expected_title != "unknown_window":
from ..window_info_crossplatform import get_active_window_info
from ..ui.messages import est_fenetre_lea
from ..ui.messages import est_fenetre_lea, est_fenetre_bruit
# Polling court pour laisser le temps à la fenêtre de
# se stabiliser (évite les faux négatifs sur transitions
@@ -544,8 +635,9 @@ class ActionExecutorV1:
time.sleep(0.3)
continue
# Si on tombe sur unknown_window → on attend aussi
if not current_title or current_title == "unknown_window":
# Bruit système (systray overflow, taskbar, etc.)
# → on attend que la vraie fenêtre reprenne le focus
if est_fenetre_bruit(current_title):
time.sleep(0.3)
continue
@@ -578,13 +670,54 @@ class ActionExecutorV1:
f"[LEA] Fenêtre incorrecte : attendu '{expected_title}', "
f"actuel '{current_title}'"
)
print(f" [PRÉ-VÉRIF] STOP — fenêtre '{current_title}' ≠ attendu '{expected_title}'")
print(
f" [PRÉ-VÉRIF] Fenêtre '{current_title}'"
f"attendu '{expected_title}' → mode apprentissage"
)
try:
self.notifier.replay_wrong_window(current_title, expected_title)
except Exception:
pass
result["success"] = False
result["error"] = f"Fenêtre incorrecte: '{current_title}' (attendu: '{expected_title}')"
# Mode apprentissage : la fenêtre attendue n'est
# pas là. Soit l'action précédente a changé l'état
# (ex: Ctrl+S a sauvé sans dialogue), soit l'app
# est dans un état différent. L'humain montre.
human_actions = self._capture_human_correction(
timeout_s=120,
)
if human_actions:
result["success"] = True
result["resolution_method"] = "human_supervised"
result["warning"] = "human_supervised_wrong_window"
last_click = None
for ha in reversed(human_actions):
if ha.get("type") == "click":
last_click = ha
break
if last_click:
result["actual_position"] = {
"x_pct": last_click["x_pct"],
"y_pct": last_click["y_pct"],
}
result["correction"] = {
"actions": human_actions,
"action_count": len(human_actions),
"last_click": last_click,
"trigger": "wrong_window",
"expected_window": expected_title,
"actual_window": current_title,
}
else:
# Timeout ou pas d'action → skipper cette action
# L'état est peut-être déjà correct (ex: Ctrl+S
# a sauvé sans dialogue → action de dialogue inutile)
result["success"] = True
result["warning"] = "wrong_window_skipped"
logger.info(
f"[LEA] Wrong window sans correction → skip "
f"(l'état est peut-être déjà atteint)"
)
return result
else:
logger.info(f"[LEA] Pré-vérif OK : '{current_title}'")
@@ -685,8 +818,8 @@ class ActionExecutorV1:
if action_type == "click":
# Si visual_mode est activé, le resolve DOIT réussir.
# Pas de fallback blind — on arrête le replay si la cible
# n'est pas trouvée visuellement. C'est un RPA VISUEL.
# Pas de fallback blind — Léa VOIT l'écran et CHERCHE
# l'élément. Si toute la cascade échoue → pause supervisée.
if visual_mode and not result.get("visual_resolved"):
# ── Policy : décider quoi faire quand grounding échoue ──
from .policy import PolicyEngine, Decision
@@ -708,7 +841,6 @@ class ActionExecutorV1:
)
if policy_decision.decision == Decision.RETRY:
# Re-tenter le grounding après correction (popup fermée, etc.)
resolved2 = self._resolve_target_visual(
server_url, target_spec, x_pct, y_pct, width, height
)
@@ -718,17 +850,48 @@ class ActionExecutorV1:
result["visual_resolved"] = True
print(f" [POLICY] Re-resolve OK après {policy_decision.action_taken}")
else:
# Re-resolve échoué — SUPERVISE (rendre la main)
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
# Retry échoué → mode apprentissage
# Léa a tout essayé (UIA, template, VLM, retry)
# et ne trouve toujours pas. L'humain doit montrer.
print(f" [POLICY] Retry échoué → mode apprentissage")
try:
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
except Exception:
pass
human_actions = self._capture_human_correction(
timeout_s=120,
)
if human_actions:
result["success"] = True
result["resolution_method"] = "human_supervised"
result["warning"] = "human_supervised_after_retry_failed"
last_click = None
for ha in reversed(human_actions):
if ha.get("type") == "click":
last_click = ha
break
if last_click:
result["actual_position"] = {
"x_pct": last_click["x_pct"],
"y_pct": last_click["y_pct"],
}
result["correction"] = {
"actions": human_actions,
"action_count": len(human_actions),
"last_click": last_click,
"trigger": "retry_failed",
}
else:
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
return result
elif policy_decision.decision == Decision.SKIP:
@@ -745,18 +908,55 @@ class ActionExecutorV1:
)
return result
else: # SUPERVISE ou CONTINUE
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
else: # SUPERVISE → mode apprentissage
# Léa est perdue. Au lieu de s'arrêter, elle
# passe en mode capture et enregistre ce que
# l'humain fait (mini-workflow de correction).
try:
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
except Exception:
pass
human_actions = self._capture_human_correction(
timeout_s=120,
)
return result
if human_actions:
# L'humain a montré un mini-workflow
result["success"] = True
result["resolution_method"] = "human_supervised"
result["warning"] = "human_supervised"
# Stocker le dernier clic comme position résolue
last_click = None
for ha in reversed(human_actions):
if ha.get("type") == "click":
last_click = ha
break
if last_click:
result["actual_position"] = {
"x_pct": last_click["x_pct"],
"y_pct": last_click["y_pct"],
}
# Envoyer toute la correction au serveur
result["correction"] = {
"actions": human_actions,
"action_count": len(human_actions),
"last_click": last_click,
}
logger.info(
f"[APPRENTISSAGE] Correction reçue : "
f"{len(human_actions)} actions — je m'en souviendrai."
)
else:
# Timeout — l'humain n'a pas répondu
result["success"] = False
result["error"] = "target_not_found"
result["target_description"] = target_desc
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
real_x = int(x_pct * width)
real_y = int(y_pct * height)
@@ -767,6 +967,15 @@ class ActionExecutorV1:
f"({real_x}, {real_y}) sur ({width}x{height}), bouton={button}"
)
self._click((real_x, real_y), button)
# Phase 1 apprentissage : exposer les coordonnées RÉSOLUES
# utilisées pour le clic. Le serveur (/replay/result) les lit
# directement comme source de vérité pour la mémoire.
# On donne des percentages car la mémoire est indépendante
# de la résolution écran du client.
result["actual_position"] = {
"x_pct": float(x_pct),
"y_pct": float(y_pct),
}
logger.info(
f"Replay click [{mode}] : ({x_pct:.3f}, {y_pct:.3f}) -> "
f"({real_x}, {real_y}) sur ({width}x{height})"
@@ -809,6 +1018,7 @@ class ActionExecutorV1:
f"Post-vérif échouée : fenêtre '{post_title}' "
f"au lieu de '{expected_after}'"
)
result["warning"] = "wrong_window"
print(
f" [POST-VÉRIF] STOP STRICT — l'étape ne s'est "
f"pas déroulée comme prévu, arrêt du replay"
@@ -916,33 +1126,66 @@ class ActionExecutorV1:
hash_before, timeout_ms=3000
)
if not screen_changed:
# ── Recovery : tenter un rollback si l'action n'a pas eu d'effet ──
from .recovery import RecoveryEngine
recovery = RecoveryEngine(self)
recovery_result = recovery.attempt(
failed_action=action,
critic_detail="L'écran n'a pas changé après l'action",
)
if recovery_result.success:
print(f" [RECOVERY] {recovery_result.detail}")
result["recovery"] = recovery_result.to_dict()
result["success"] = False
result["warning"] = "no_screen_change"
result["error"] = "Ecran inchange apres l'action"
print(
f" [ECHEC] Ecran inchange apres {action_type}"
f"l'action n'a pas eu d'effet visible"
)
logger.warning(
f"[LEA] Écran inchangé après {action_type} "
f"(action_id={action_id}) — pas d'effet visible"
)
# Notifier l'utilisateur en français naturel (niveau ATTENTION)
try:
self.notifier.replay_no_screen_change(action_type)
except Exception:
pass
# ── Mode apprentissage : clic sans effet = mauvais clic ──
# Si l'action était un clic visuel, l'écran inchangé prouve
# que le grounding a cliqué au mauvais endroit. Au lieu de
# passer silencieusement à la suite, Léa demande à l'humain.
if action_type == "click" and visual_mode:
print(
f" [ECHEC] Clic sans effet — "
f"je demande de l'aide"
)
try:
self.notifier.replay_no_screen_change(action_type)
except Exception:
pass
human_actions = self._capture_human_correction(
timeout_s=120,
)
if human_actions:
result["success"] = True
result["resolution_method"] = "human_supervised"
result["warning"] = "human_supervised_after_no_change"
last_click = None
for ha in reversed(human_actions):
if ha.get("type") == "click":
last_click = ha
break
if last_click:
result["actual_position"] = {
"x_pct": last_click["x_pct"],
"y_pct": last_click["y_pct"],
}
result["correction"] = {
"actions": human_actions,
"action_count": len(human_actions),
"last_click": last_click,
"trigger": "no_screen_change",
}
else:
# Timeout — l'humain n'a pas répondu
result["success"] = False
result["warning"] = "no_screen_change"
result["error"] = "Ecran inchange apres l'action"
else:
# Actions non-visuelles : comportement existant
result["success"] = False
result["warning"] = "no_screen_change"
result["error"] = "Ecran inchange apres l'action"
print(
f" [ECHEC] Ecran inchange apres {action_type}"
f"l'action n'a pas eu d'effet visible"
)
try:
self.notifier.replay_no_screen_change(action_type)
except Exception:
pass
else:
print(f" [OK] Changement d'ecran detecte apres {action_type}")
else:
@@ -1406,15 +1649,24 @@ Example: x_pct=0.50, y_pct=0.30"""
2. Execute l'action (clic, texte, etc.)
3. POST /replay/result avec le resultat + screenshot
Args:
session_id: Identifiant de la session courante
server_url: URL de base du serveur streaming
machine_id: Identifiant de la machine (pour le replay multi-machine)
Sérialisé par _replay_lock — une seule action à la fois.
Sans ce lock, deux threads concurrents consomment deux actions
et mss retourne des résolutions fantômes (thread-unsafe).
Retourne True si une action a ete executee, False sinon.
IMPORTANT: Si une action est recue, le resultat est TOUJOURS rapporte
au serveur (meme en cas d'erreur d'execution).
"""
# Sérialisation stricte : si un autre thread exécute déjà une
# action, on abandonne ce poll immédiatement (pas de file d'attente).
if not self._replay_lock.acquire(blocking=False):
return False
try:
return self._poll_and_execute_inner(session_id, server_url, machine_id)
finally:
self._replay_lock.release()
def _poll_and_execute_inner(self, session_id: str, server_url: str, machine_id: str) -> bool:
"""Implémentation interne de poll_and_execute (protégée par _replay_lock)."""
import requests
replay_next_url = f"{server_url}/traces/stream/replay/next"
@@ -1488,11 +1740,14 @@ Example: x_pct=0.50, y_pct=0.30"""
print(f">>> ERREUR EXECUTION : {e}")
logger.error(f"Erreur execute_replay_action: {e}")
import traceback
tb_str = traceback.format_exc()
traceback.print_exc()
result = {
"action_id": action_id,
"success": False,
"error": f"Exception executor: {e}",
# Inclure le traceback complet pour diagnostiquer
# les crashes côté agent depuis les logs serveur
"error": f"{e}\n---TRACEBACK---\n{tb_str[-500:]}",
"screenshot": None,
}
@@ -1509,9 +1764,13 @@ Example: x_pct=0.50, y_pct=0.30"""
"resolution_method": result.get("resolution_method"),
"resolution_score": result.get("resolution_score"),
"resolution_elapsed_ms": result.get("resolution_elapsed_ms"),
# Coordonnées RÉSOLUES effectivement cliquées (Phase 1 apprentissage)
"actual_position": result.get("actual_position"),
# Champs enrichis pour target_not_found (pause supervisée)
"target_description": result.get("target_description"),
"target_spec": result.get("target_spec"),
# Correction humaine (mode apprentissage supervisé)
"correction": result.get("correction"),
}
try:
resp2 = requests.post(
@@ -1994,6 +2253,159 @@ Example: x_pct=0.50, y_pct=0.30"""
logger.debug(f"Texte saisi char-by-char ({len(text)} chars)")
# =========================================================================
# Mode apprentissage — l'humain montre, Léa apprend
# =========================================================================
# Hotkey pour signaler la fin de la correction humaine
_LEARNING_DONE_HOTKEY = {Key.ctrl_l, Key.shift, KeyCode.from_char("l")}
def _capture_human_correction(self, timeout_s: float = 120.0) -> list[dict]:
"""Capturer un mini-workflow de correction humaine.
Léa est perdue — elle passe en mode capture et enregistre
TOUTES les actions de l'humain (clics, frappes, combos)
jusqu'à ce que l'humain signale qu'il a fini :
- Ctrl+Shift+L (hotkey)
- Ou timeout d'inactivité (10s sans action)
- Ou timeout global (120s)
Retourne la liste des actions capturées (peut être vide si timeout).
C'est un mini-workflow, pas juste un clic.
"""
done_event = threading.Event()
actions: list[dict] = []
last_action_time = [time.time()]
keys_pressed: set = set()
INACTIVITY_TIMEOUT = 10.0 # secondes
monitor = self.sct.monitors[1]
screen_w, screen_h = monitor["width"], monitor["height"]
def _on_click(x, y, button, pressed):
if done_event.is_set():
return False
if pressed and button.name in ("left", "right"):
action = {
"type": "click",
"x_pct": round(x / screen_w, 6),
"y_pct": round(y / screen_h, 6),
"button": button.name,
"timestamp": time.time(),
}
# UIA snapshot
try:
from .uia_helper import get_shared_helper
helper = get_shared_helper()
if helper.available:
elem = helper.query_at(int(x), int(y), with_parents=True)
if elem:
action["uia_snapshot"] = elem.to_dict()
except Exception:
pass
actions.append(action)
last_action_time[0] = time.time()
logger.info(f"[APPRENTISSAGE] Clic ({x}, {y}) bouton={button.name}")
def _on_key_press(key):
if done_event.is_set():
return False
keys_pressed.add(key)
# Vérifier hotkey Ctrl+Shift+L
if self._LEARNING_DONE_HOTKEY.issubset(keys_pressed):
logger.info("[APPRENTISSAGE] Hotkey Ctrl+Shift+L — fin de correction")
print(" [APPRENTISSAGE] Ctrl+Shift+L reçu — merci !")
done_event.set()
return False
def _on_key_release(key):
keys_pressed.discard(key)
if done_event.is_set():
return False
# Capturer les frappes texte (pas les modifiers seuls)
if hasattr(key, "char") and key.char:
actions.append({
"type": "type",
"text": key.char,
"timestamp": time.time(),
})
last_action_time[0] = time.time()
elif key == Key.enter:
actions.append({
"type": "key_combo",
"keys": ["enter"],
"timestamp": time.time(),
})
last_action_time[0] = time.time()
from pynput.mouse import Listener as MouseListener
from pynput.keyboard import Listener as KeyboardListener
mouse_listener = MouseListener(on_click=_on_click)
kbd_listener = KeyboardListener(
on_press=_on_key_press, on_release=_on_key_release,
)
mouse_listener.start()
kbd_listener.start()
logger.info(
f"[APPRENTISSAGE] Mode capture activé (timeout={timeout_s}s, "
f"inactivité={INACTIVITY_TIMEOUT}s, hotkey=Ctrl+Shift+L)"
)
print(
f" [APPRENTISSAGE] Montre-moi comment faire.\n"
f" Quand tu as fini → Ctrl+Shift+L\n"
f" (ou j'attends {INACTIVITY_TIMEOUT}s sans action)"
)
# Attendre : hotkey OU inactivité OU timeout global
start = time.time()
while not done_event.is_set():
elapsed = time.time() - start
if elapsed > timeout_s:
logger.info("[APPRENTISSAGE] Timeout global")
break
# Timeout inactivité : si l'humain a fait au moins 1 action
# et n'a rien fait depuis INACTIVITY_TIMEOUT secondes
if actions and (time.time() - last_action_time[0]) > INACTIVITY_TIMEOUT:
logger.info(
f"[APPRENTISSAGE] Inactivité {INACTIVITY_TIMEOUT}s — "
f"fin automatique ({len(actions)} actions)"
)
print(f" [APPRENTISSAGE] Pas d'action depuis {INACTIVITY_TIMEOUT}s — je reprends.")
break
time.sleep(0.2)
mouse_listener.stop()
kbd_listener.stop()
logger.info(f"[APPRENTISSAGE] {len(actions)} actions capturées")
print(f" [APPRENTISSAGE] {len(actions)} actions capturées — merci !")
return actions
def _capture_crop_at(self, x: int, y: int, size: int = 80) -> str:
"""Capturer un crop carré autour d'une position."""
try:
from PIL import Image
with mss.mss() as local_sct:
monitor = local_sct.monitors[1]
raw = local_sct.grab(monitor)
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
half = size // 2
left = max(0, x - half)
top = max(0, y - half)
right = min(img.width, x + half)
bottom = min(img.height, y + half)
crop = img.crop((left, top, right, bottom))
buffer = io.BytesIO()
crop.save(buffer, format="JPEG", quality=85)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
except Exception:
return ""
def _click(self, pos, button_name):
"""Deplacer la souris via courbe de Bézier puis cliquer.

View File

@@ -15,6 +15,7 @@ Ref: docs/PLAN_ACTEUR_V1.md — Architecture MICRO (grounding + exécution)
"""
import base64
import io
import logging
import os
import time
@@ -126,19 +127,62 @@ class GroundingEngine:
)
t_start = time.time()
screenshot_b64 = self._executor._capture_screenshot_b64(max_width=0, quality=75)
# ── Capture contrainte à la fenêtre active ──
# Le grounding ne voit QUE la fenêtre attendue — pas la taskbar,
# pas le systray, pas les autres apps. Comme un humain qui regarde
# l'application sur laquelle il travaille.
window_rect = None
try:
from ..window_info_crossplatform import get_active_window_rect
win_info = get_active_window_rect()
if win_info and win_info.get("rect"):
r = win_info["rect"] # [left, top, right, bottom]
# Validation : fenêtre visible et pas minuscule
w = r[2] - r[0]
h = r[3] - r[1]
if w > 50 and h > 50:
window_rect = {
"left": max(0, r[0]),
"top": max(0, r[1]),
"width": min(w, screen_width),
"height": min(h, screen_height),
}
logger.info(
f"Grounding contraint à la fenêtre : "
f"{window_rect['width']}x{window_rect['height']} "
f"à ({window_rect['left']}, {window_rect['top']})"
)
except Exception as e:
logger.debug(f"Pas de window rect disponible : {e}")
screenshot_b64 = self._capture_window_or_screen(window_rect)
if not screenshot_b64:
return GroundingResult(
found=False, detail="Capture screenshot échouée",
elapsed_ms=(time.time() - t_start) * 1000,
)
# Dimensions de la zone capturée (fenêtre ou écran entier)
cap_w = window_rect["width"] if window_rect else screen_width
cap_h = window_rect["height"] if window_rect else screen_height
for strategy in strategies:
result = self._try_strategy(
strategy, server_url, screenshot_b64, target_spec,
fallback_x, fallback_y, screen_width, screen_height,
fallback_x, fallback_y, cap_w, cap_h,
)
if result.found:
# ── Conversion coords fenêtre → coords écran ──
if window_rect:
# Le grounding a retourné des coords relatives à la fenêtre
# On les convertit en coords relatives à l'écran entier
abs_x = window_rect["left"] + result.x_pct * cap_w
abs_y = window_rect["top"] + result.y_pct * cap_h
result.x_pct = abs_x / screen_width
result.y_pct = abs_y / screen_height
result.detail = f"{result.detail} [fenêtre {cap_w}x{cap_h}]"
result.elapsed_ms = (time.time() - t_start) * 1000
return result
@@ -148,6 +192,39 @@ class GroundingEngine:
elapsed_ms=(time.time() - t_start) * 1000,
)
def _capture_window_or_screen(self, window_rect: Optional[Dict]) -> str:
"""Capturer soit la fenêtre active (croppée), soit l'écran entier.
Si window_rect est fourni, capture uniquement cette zone.
Sinon, capture l'écran entier (fallback).
"""
try:
from PIL import Image
import mss as mss_lib
with mss_lib.mss() as local_sct:
if window_rect:
# Capture de la zone fenêtre uniquement
region = {
"left": window_rect["left"],
"top": window_rect["top"],
"width": window_rect["width"],
"height": window_rect["height"],
}
raw = local_sct.grab(region)
else:
# Fallback écran entier
raw = local_sct.grab(local_sct.monitors[1])
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=75)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
except Exception as e:
logger.warning(f"Capture échouée : {e}")
# Fallback sur la méthode existante de l'executor
return self._executor._capture_screenshot_b64(max_width=0, quality=75)
def _try_strategy(
self,
strategy: str,

View File

@@ -37,6 +37,21 @@ logger = logging.getLogger(__name__)
# Timeout par défaut pour les appels UIA (en secondes)
_DEFAULT_TIMEOUT = 5.0
# Masquer la fenêtre console lors du spawn de lea_uia.exe sur Windows.
# Sans ce flag, chaque appel (à chaque clic utilisateur pendant
# l'enregistrement) fait apparaître une fenêtre cmd noire brièvement
# visible à l'écran → ralentit la souris et pollue les screenshots
# capturés (le VLM peut "voir" le chemin lea_uia.exe comme texte cliqué).
#
# La valeur 0x08000000 correspond à CREATE_NO_WINDOW défini dans
# l'API Windows. Sur Linux/Mac, la valeur est 0 et `creationflags`
# est ignoré. getattr() gère le cas où Python expose déjà la constante
# sur Windows.
if platform.system() == "Windows":
_SUBPROCESS_CREATION_FLAGS = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000)
else:
_SUBPROCESS_CREATION_FLAGS = 0
@dataclass
class UiaElement:
@@ -166,6 +181,7 @@ class UIAHelper:
timeout=self._timeout,
encoding="utf-8",
errors="replace",
creationflags=_SUBPROCESS_CREATION_FLAGS,
)
if result.returncode != 0:
logger.debug(

View File

@@ -568,6 +568,35 @@ def est_fenetre_lea(titre_fenetre: str) -> bool:
return any(re.search(motif, titre_lower) for motif in _MOTIFS_FENETRE_LEA_REGEX)
# Fenêtres parasites Windows à ignorer dans les pré-vérifications.
# Ce ne sont pas des fenêtres applicatives — c'est du bruit système
# qui prend le focus de manière imprévisible.
_FENETRES_BRUIT_SYSTEME = (
"fenêtre de dépassement de capacité",
"overflow", # version anglaise systray
"program manager",
"barre des tâches",
"task bar",
"cortana",
"action center",
"centre de notifications",
)
def est_fenetre_bruit(titre_fenetre: str) -> bool:
"""Détecter si un titre de fenêtre est du bruit système Windows.
Ces fenêtres prennent le focus de manière imprévisible (systray overflow,
taskbar, Program Manager) et ne sont jamais la cible d'une action utilisateur.
"""
if not titre_fenetre:
return True # pas de titre = bruit
titre_lower = titre_fenetre.lower().strip()
if titre_lower == "unknown_window":
return True
return any(p in titre_lower for p in _FENETRES_BRUIT_SYSTEME)
# Conservé pour rétro-compatibilité avec le code qui listait MOTIFS_FENETRE_LEA
MOTIFS_FENETRE_LEA = (
"léa",

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,214 @@
# agent_v1/core/grounding.py
"""
Module Grounding — localisation pure d'éléments UI sur l'écran.
Responsabilité unique : "Trouve l'élément X sur l'écran et retourne ses coordonnées."
Ne prend AUCUNE décision. Si l'élément n'est pas trouvé → retourne NOT_FOUND.
Stratégies disponibles (cascade configurable) :
1. Serveur SomEngine + VLM (GPU distant)
2. Template matching local (CPU, ~10ms)
3. VLM local direct (CPU/GPU local)
Séparé de Policy (qui décide quoi faire quand grounding échoue).
Ref: docs/PLAN_ACTEUR_V1.md — Architecture MICRO (grounding + exécution)
"""
import base64
import logging
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class GroundingResult:
"""Résultat d'une tentative de localisation visuelle."""
found: bool # L'élément a été trouvé
x_pct: float = 0.0 # Position X en % (0.0-1.0)
y_pct: float = 0.0 # Position Y en % (0.0-1.0)
method: str = "" # Méthode utilisée (server_som, anchor_template, vlm_direct...)
score: float = 0.0 # Confiance (0.0-1.0)
elapsed_ms: float = 0.0 # Temps de résolution
detail: str = "" # Info supplémentaire (label trouvé, raison échec)
raw: Optional[Dict] = None # Données brutes du resolver (pour debug)
def to_dict(self) -> Dict[str, Any]:
return {
"found": self.found,
"x_pct": self.x_pct,
"y_pct": self.y_pct,
"method": self.method,
"score": round(self.score, 3),
"elapsed_ms": round(self.elapsed_ms, 1),
"detail": self.detail,
}
# Résultat singleton pour "pas trouvé"
NOT_FOUND = GroundingResult(found=False, detail="Aucune méthode n'a trouvé l'élément")
class GroundingEngine:
"""Moteur de localisation visuelle d'éléments UI.
Encapsule la cascade de résolution (serveur → template → VLM local)
avec une interface unifiée. Ne prend aucune décision — c'est le rôle
de PolicyEngine.
Usage :
engine = GroundingEngine(executor)
result = engine.locate(screenshot_b64, target_spec, screen_w, screen_h)
if result.found:
click(result.x_pct, result.y_pct)
"""
def __init__(self, executor):
"""
Args:
executor: ActionExecutorV1 — fournit les méthodes de résolution existantes.
"""
self._executor = executor
def locate(
self,
server_url: str,
target_spec: Dict[str, Any],
fallback_x: float,
fallback_y: float,
screen_width: int,
screen_height: int,
strategies: Optional[List[str]] = None,
) -> GroundingResult:
"""Localiser un élément UI sur l'écran.
Exécute la cascade de stratégies dans l'ordre et retourne
dès qu'une stratégie trouve l'élément.
Args:
server_url: URL du serveur (SomEngine + VLM GPU)
target_spec: Spécification de la cible (by_text, anchor, vlm_description...)
fallback_x, fallback_y: Coordonnées de fallback (enregistrement)
screen_width, screen_height: Résolution écran
strategies: Liste ordonnée de stratégies à essayer.
Par défaut : ["server", "template", "vlm_local"]
Returns:
GroundingResult avec found=True et coordonnées, ou NOT_FOUND
"""
if strategies is None:
strategies = ["server", "template", "vlm_local"]
# ── Apprentissage : réordonner les stratégies selon l'historique ──
# Si le Learning sait quelle méthode marche pour cette cible,
# la mettre en premier. C'est la boucle d'apprentissage.
learned = target_spec.get("_learned_strategy", "")
if learned:
strategy_map = {
"som_text_match": "server",
"grounding_vlm": "server",
"server_som": "server",
"anchor_template": "template",
"template_matching": "template",
"hybrid_text_direct": "vlm_local",
"hybrid_vlm_text": "vlm_local",
"vlm_direct": "vlm_local",
}
preferred = strategy_map.get(learned, "")
if preferred and preferred in strategies:
strategies = [preferred] + [s for s in strategies if s != preferred]
logger.info(
f"Grounding: stratégie réordonnée par l'apprentissage → "
f"{strategies} (learned={learned})"
)
t_start = time.time()
screenshot_b64 = self._executor._capture_screenshot_b64(max_width=0, quality=75)
if not screenshot_b64:
return GroundingResult(
found=False, detail="Capture screenshot échouée",
elapsed_ms=(time.time() - t_start) * 1000,
)
for strategy in strategies:
result = self._try_strategy(
strategy, server_url, screenshot_b64, target_spec,
fallback_x, fallback_y, screen_width, screen_height,
)
if result.found:
result.elapsed_ms = (time.time() - t_start) * 1000
return result
return GroundingResult(
found=False,
detail=f"Toutes les stratégies ont échoué ({', '.join(strategies)})",
elapsed_ms=(time.time() - t_start) * 1000,
)
def _try_strategy(
self,
strategy: str,
server_url: str,
screenshot_b64: str,
target_spec: Dict[str, Any],
fallback_x: float,
fallback_y: float,
screen_width: int,
screen_height: int,
) -> GroundingResult:
"""Essayer une stratégie de grounding unique."""
if strategy == "server" and server_url:
raw = self._executor._server_resolve_target(
server_url, screenshot_b64, target_spec,
fallback_x, fallback_y, screen_width, screen_height,
)
if raw and raw.get("resolved"):
return GroundingResult(
found=True,
x_pct=raw["x_pct"],
y_pct=raw["y_pct"],
method=raw.get("method", "server"),
score=raw.get("score", 0.0),
detail=raw.get("matched_element", {}).get("label", ""),
raw=raw,
)
elif strategy == "template":
anchor_b64 = target_spec.get("anchor_image_base64", "")
if anchor_b64:
raw = self._executor._template_match_anchor(
screenshot_b64, anchor_b64, screen_width, screen_height,
)
if raw and raw.get("resolved"):
return GroundingResult(
found=True,
x_pct=raw["x_pct"],
y_pct=raw["y_pct"],
method="anchor_template",
score=raw.get("score", 0.0),
raw=raw,
)
elif strategy == "vlm_local":
by_text = target_spec.get("by_text", "")
vlm_desc = target_spec.get("vlm_description", "")
if vlm_desc or by_text:
raw = self._executor._hybrid_vlm_resolve(
screenshot_b64, target_spec, screen_width, screen_height,
)
if raw and raw.get("resolved"):
return GroundingResult(
found=True,
x_pct=raw["x_pct"],
y_pct=raw["y_pct"],
method=raw.get("method", "vlm_local"),
score=raw.get("score", 0.0),
detail=raw.get("matched_element", {}).get("label", ""),
raw=raw,
)
return GroundingResult(found=False, method=strategy, detail=f"{strategy}: pas trouvé")

View File

@@ -0,0 +1,152 @@
# agent_v1/core/policy.py
"""
Module Policy — décisions intelligentes quand le grounding échoue.
Responsabilité unique : "Le Grounding dit NOT_FOUND. Que fait-on ?"
Ne localise AUCUN élément — c'est le rôle du Grounding.
Décisions possibles :
- RETRY : re-tenter le grounding (après popup fermée, par exemple)
- SKIP : l'action n'est plus nécessaire (état déjà atteint)
- ABORT : arrêter le workflow (état incohérent)
- SUPERVISE : rendre la main à l'utilisateur
Séparé de Grounding (qui localise les éléments).
Ref: docs/PLAN_ACTEUR_V1.md — Architecture MÉSO (acteur intelligent)
"""
import logging
import os
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
class Decision(Enum):
"""Décisions possibles quand le grounding échoue."""
RETRY = "retry" # Re-tenter (après correction : popup fermée, navigation...)
SKIP = "skip" # Action inutile (état déjà atteint)
ABORT = "abort" # Arrêter le workflow (état incohérent)
SUPERVISE = "supervise" # Rendre la main à l'utilisateur (Léa dit "je bloque")
CONTINUE = "continue" # Continuer malgré l'échec (action non critique)
@dataclass
class PolicyDecision:
"""Résultat d'une décision Policy."""
decision: Decision
reason: str # Explication de la décision
action_taken: str = "" # Action corrective effectuée (ex: "popup fermée")
elapsed_ms: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
"decision": self.decision.value,
"reason": self.reason,
"action_taken": self.action_taken,
"elapsed_ms": round(self.elapsed_ms, 1),
}
class PolicyEngine:
"""Moteur de décision quand le grounding échoue.
Cascade de décision :
1. Popup détectée ? → fermer et RETRY
2. Acteur gemma4 → SKIP / ABORT / SUPERVISE
3. Fallback → SUPERVISE (rendre la main)
Usage :
policy = PolicyEngine(executor)
decision = policy.decide(action, target_spec, grounding_result)
if decision.decision == Decision.RETRY:
# re-tenter le grounding
elif decision.decision == Decision.SKIP:
# marquer comme réussi, passer à la suite
"""
def __init__(self, executor):
self._executor = executor
def decide(
self,
action: Dict[str, Any],
target_spec: Dict[str, Any],
retry_count: int = 0,
max_retries: int = 1,
) -> PolicyDecision:
"""Décider quoi faire quand le grounding a échoué.
Cascade :
1. Si c'est le premier essai → tenter de fermer une popup → RETRY
2. Si retry déjà fait → demander à l'acteur gemma4
3. Selon gemma4 : SKIP, ABORT, ou SUPERVISE
Args:
action: L'action qui a échoué
target_spec: La cible non trouvée
retry_count: Nombre de retries déjà faits
max_retries: Maximum de retries autorisés
"""
t_start = time.time()
# ── Étape 1 : Tentative de fermeture popup (premier essai) ──
if retry_count == 0:
popup_handled = self._try_close_popup()
if popup_handled:
return PolicyDecision(
decision=Decision.RETRY,
reason="Popup détectée et fermée, re-tentative",
action_taken="popup_closed",
elapsed_ms=(time.time() - t_start) * 1000,
)
# ── Étape 2 : Max retries atteint → acteur gemma4 ──
if retry_count >= max_retries:
actor_decision = self._ask_actor(action, target_spec)
if actor_decision == "PASSER":
return PolicyDecision(
decision=Decision.SKIP,
reason="Acteur gemma4 : l'état est déjà atteint",
elapsed_ms=(time.time() - t_start) * 1000,
)
elif actor_decision == "STOPPER":
return PolicyDecision(
decision=Decision.ABORT,
reason="Acteur gemma4 : état incohérent, arrêt",
elapsed_ms=(time.time() - t_start) * 1000,
)
else:
# EXECUTER ou inconnu → pause supervisée
return PolicyDecision(
decision=Decision.SUPERVISE,
reason=f"Acteur gemma4 : {actor_decision}, pause supervisée",
elapsed_ms=(time.time() - t_start) * 1000,
)
# ── Étape 3 : Encore des retries disponibles → RETRY ──
return PolicyDecision(
decision=Decision.RETRY,
reason=f"Retry {retry_count + 1}/{max_retries}",
elapsed_ms=(time.time() - t_start) * 1000,
)
def _try_close_popup(self) -> bool:
"""Tenter de fermer une popup via le handler VLM existant."""
try:
return self._executor._handle_popup_vlm()
except Exception as e:
logger.debug(f"Policy: popup handler échoué : {e}")
return False
def _ask_actor(self, action: Dict, target_spec: Dict) -> str:
"""Demander à gemma4 de décider (PASSER/EXECUTER/STOPPER)."""
try:
return self._executor._actor_decide(action, target_spec)
except Exception as e:
logger.debug(f"Policy: acteur gemma4 échoué : {e}")
return "EXECUTER" # Fallback → supervisé

View File

@@ -0,0 +1,294 @@
# core/workflow/uia_helper.py
"""
UIAHelper — Wrapper Python pour lea_uia.exe (helper Rust UI Automation).
Expose une API Python simple pour interroger UIA via le binaire Rust.
Communique via subprocess + stdin/stdout JSON.
Pourquoi un helper Rust ?
- 5-10x plus rapide que pywinauto (10-20ms vs 50-200ms)
- Binaire standalone ~500 Ko, aucune dépendance runtime
- Pas de problèmes de threading COM en Python
- Crash-safe (le crash du helper n'affecte pas l'agent Python)
Architecture :
Python executor
↓ subprocess.run
lea_uia.exe query --x 812 --y 436
↓ UIA API Windows
JSON response
↓ stdout
Python executor parse JSON
Si lea_uia.exe n'est pas disponible (Linux, binaire absent, crash) :
toutes les méthodes retournent None → fallback vision automatique.
"""
import json
import logging
import os
import platform
import subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Timeout par défaut pour les appels UIA (en secondes)
_DEFAULT_TIMEOUT = 5.0
# Masquer la fenêtre console lors du spawn de lea_uia.exe sur Windows.
# Sans ce flag, chaque appel (à chaque clic utilisateur pendant
# l'enregistrement) fait apparaître une fenêtre cmd noire brièvement
# visible à l'écran → ralentit la souris et pollue les screenshots
# capturés (le VLM peut "voir" le chemin lea_uia.exe comme texte cliqué).
#
# La valeur 0x08000000 correspond à CREATE_NO_WINDOW défini dans
# l'API Windows. Sur Linux/Mac, la valeur est 0 et `creationflags`
# est ignoré. getattr() gère le cas où Python expose déjà la constante
# sur Windows.
if platform.system() == "Windows":
_SUBPROCESS_CREATION_FLAGS = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000)
else:
_SUBPROCESS_CREATION_FLAGS = 0
@dataclass
class UiaElement:
"""Représentation Python d'un élément UIA."""
name: str = ""
control_type: str = ""
class_name: str = ""
automation_id: str = ""
bounding_rect: Tuple[int, int, int, int] = (0, 0, 0, 0)
is_enabled: bool = False
is_offscreen: bool = True
parent_path: List[Dict[str, str]] = field(default_factory=list)
process_name: str = ""
def center(self) -> Tuple[int, int]:
"""Retourner le centre du rectangle (pixels)."""
x1, y1, x2, y2 = self.bounding_rect
return ((x1 + x2) // 2, (y1 + y2) // 2)
def width(self) -> int:
return self.bounding_rect[2] - self.bounding_rect[0]
def height(self) -> int:
return self.bounding_rect[3] - self.bounding_rect[1]
def is_clickable(self) -> bool:
"""Peut-on cliquer dessus ?"""
return (
self.is_enabled
and not self.is_offscreen
and self.width() > 0
and self.height() > 0
)
def path_signature(self) -> str:
"""Signature du chemin parent (pour retrouver l'élément)."""
parts = [f"{p['control_type']}[{p['name']}]" for p in self.parent_path if p.get("name")]
parts.append(f"{self.control_type}[{self.name}]")
return " > ".join(parts)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"control_type": self.control_type,
"class_name": self.class_name,
"automation_id": self.automation_id,
"bounding_rect": list(self.bounding_rect),
"is_enabled": self.is_enabled,
"is_offscreen": self.is_offscreen,
"parent_path": self.parent_path,
"process_name": self.process_name,
}
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "UiaElement":
rect = d.get("bounding_rect", [0, 0, 0, 0])
if isinstance(rect, list) and len(rect) >= 4:
rect = tuple(rect[:4])
else:
rect = (0, 0, 0, 0)
return cls(
name=d.get("name", ""),
control_type=d.get("control_type", ""),
class_name=d.get("class_name", ""),
automation_id=d.get("automation_id", ""),
bounding_rect=rect,
is_enabled=d.get("is_enabled", False),
is_offscreen=d.get("is_offscreen", True),
parent_path=d.get("parent_path", []),
process_name=d.get("process_name", ""),
)
class UIAHelper:
"""Wrapper Python pour lea_uia.exe."""
def __init__(self, helper_path: str = "", timeout: float = _DEFAULT_TIMEOUT):
self._helper_path = helper_path or self._find_helper()
self._timeout = timeout
self._available = self._check_available()
def _find_helper(self) -> str:
"""Trouver lea_uia.exe dans les emplacements standards."""
candidates = [
r"C:\Lea\helpers\lea_uia.exe",
os.path.join(os.path.dirname(__file__), "..", "..",
"agent_rust", "lea_uia", "target",
"x86_64-pc-windows-gnu", "release", "lea_uia.exe"),
"./helpers/lea_uia.exe",
"lea_uia.exe",
]
for path in candidates:
if os.path.isfile(path):
return os.path.abspath(path)
return ""
def _check_available(self) -> bool:
"""Vérifier que le helper est utilisable (Windows + binaire + health OK)."""
if platform.system() != "Windows":
logger.debug("UIAHelper: Linux/Mac — helper désactivé")
return False
if not self._helper_path:
logger.debug("UIAHelper: lea_uia.exe introuvable")
return False
if not os.path.isfile(self._helper_path):
logger.debug(f"UIAHelper: chemin invalide {self._helper_path}")
return False
return True
@property
def available(self) -> bool:
return self._available
@property
def helper_path(self) -> str:
return self._helper_path
def _run(self, args: List[str]) -> Optional[Dict[str, Any]]:
"""Exécuter lea_uia.exe avec les arguments et parser le JSON."""
if not self._available:
return None
try:
result = subprocess.run(
[self._helper_path] + args,
capture_output=True,
text=True,
timeout=self._timeout,
encoding="utf-8",
errors="replace",
creationflags=_SUBPROCESS_CREATION_FLAGS,
)
if result.returncode != 0:
logger.debug(
f"UIAHelper: exit code {result.returncode}, "
f"stderr: {result.stderr[:200]}"
)
return None
output = result.stdout.strip()
if not output:
return None
return json.loads(output)
except subprocess.TimeoutExpired:
logger.debug(f"UIAHelper: timeout ({self._timeout}s) sur {args}")
return None
except json.JSONDecodeError as e:
logger.debug(f"UIAHelper: JSON invalide — {e}")
return None
except Exception as e:
logger.debug(f"UIAHelper: erreur {e}")
return None
def health(self) -> bool:
"""Vérifier que UIA répond."""
data = self._run(["health"])
return data is not None and data.get("status") == "ok"
def query_at(
self,
x: int,
y: int,
with_parents: bool = True,
) -> Optional[UiaElement]:
"""Récupérer l'élément UIA à une position écran.
Args:
x, y: Coordonnées pixel absolues
with_parents: Inclure la hiérarchie des parents
Returns:
UiaElement si trouvé, None sinon (pas d'élément ou UIA indispo)
"""
args = ["query", "--x", str(x), "--y", str(y)]
if not with_parents:
args.append("--with-parents=false")
data = self._run(args)
if not data or data.get("status") != "ok":
return None
elem_data = data.get("element")
if not elem_data:
return None
return UiaElement.from_dict(elem_data)
def find_by_name(
self,
name: str,
control_type: Optional[str] = None,
automation_id: Optional[str] = None,
window: Optional[str] = None,
timeout_ms: int = 2000,
) -> Optional[UiaElement]:
"""Rechercher un élément par son nom (+ filtres optionnels).
Args:
name: Nom exact de l'élément
control_type: Type de contrôle (Button, Edit, MenuItem...)
automation_id: ID d'automation
window: Restreindre à une fenêtre spécifique
timeout_ms: Timeout de recherche en millisecondes
"""
args = ["find", "--name", name, "--timeout-ms", str(timeout_ms)]
if control_type:
args.extend(["--control-type", control_type])
if automation_id:
args.extend(["--automation-id", automation_id])
if window:
args.extend(["--window", window])
data = self._run(args)
if not data or data.get("status") != "ok":
return None
elem_data = data.get("element")
if not elem_data:
return None
return UiaElement.from_dict(elem_data)
def capture_focused(self, max_depth: int = 3) -> Optional[UiaElement]:
"""Capturer l'élément ayant le focus + son contexte."""
data = self._run(["capture", "--max-depth", str(max_depth)])
if not data or data.get("status") != "ok":
return None
elem_data = data.get("element")
if not elem_data:
return None
return UiaElement.from_dict(elem_data)
# Instance globale partagée (singleton léger)
_SHARED_HELPER: Optional[UIAHelper] = None
def get_shared_helper() -> UIAHelper:
"""Retourner une instance partagée de UIAHelper."""
global _SHARED_HELPER
if _SHARED_HELPER is None:
_SHARED_HELPER = UIAHelper()
return _SHARED_HELPER

View File

@@ -1,12 +1,97 @@
# run_agent_v1.py
import sys
import os
import atexit
# Ajout du répertoire courant au PYTHONPATH pour permettre les imports de modules
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
# ---------------------------------------------------------------
# Verrou PID — empêche le lancement de plusieurs instances
# Même si Lea.bat est double-cliqué ou lancé deux fois,
# un seul agent tourne à la fois (defense-in-depth).
# ---------------------------------------------------------------
LOCK_FILE = os.path.join(current_dir, "lea_agent.lock")
def _pid_is_alive(pid: int) -> bool:
"""Vérifie si un processus avec ce PID existe encore (Windows + Unix)."""
if sys.platform == "win32":
try:
import ctypes
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
if handle:
kernel32.CloseHandle(handle)
return True
return False
except Exception:
# Fallback : tasklist
try:
import subprocess
result = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}", "/NH"],
capture_output=True, text=True, timeout=5,
)
return str(pid) in result.stdout
except Exception:
return False
else:
# Unix/Linux — os.kill(pid, 0) ne tue pas le process
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False
def _acquire_lock() -> bool:
"""Tente d'acquérir le verrou PID. Retourne False si une autre instance tourne."""
my_pid = os.getpid()
# Lire le PID existant
if os.path.isfile(LOCK_FILE):
try:
with open(LOCK_FILE, "r", encoding="utf-8") as f:
old_pid = int(f.read().strip())
# Le PID dans le lock est-il encore vivant ?
if old_pid != my_pid and _pid_is_alive(old_pid):
return False # Une autre instance tourne déjà
except (ValueError, OSError):
pass # Fichier corrompu — on l'écrase
# Écrire notre PID
try:
with open(LOCK_FILE, "w", encoding="utf-8") as f:
f.write(str(my_pid))
except OSError:
pass # Pas bloquant — on continue sans lock
return True
def _release_lock():
"""Supprime le fichier lock au shutdown."""
try:
if os.path.isfile(LOCK_FILE):
with open(LOCK_FILE, "r", encoding="utf-8") as f:
stored_pid = int(f.read().strip())
# Ne supprimer que si c'est bien NOTRE lock
if stored_pid == os.getpid():
os.remove(LOCK_FILE)
except (ValueError, OSError):
pass
# Vérification du lock AVANT toute initialisation lourde
if not _acquire_lock():
# Une autre instance de Léa tourne déjà — on quitte silencieusement
sys.exit(0)
atexit.register(_release_lock)
# Charger config.txt et .env comme variables d'environnement
# (équivalent du `set` dans Lea.bat, mais fonctionne aussi sans le .bat)
for config_file in ("config.txt", ".env"):
@@ -32,7 +117,7 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logging.info("=== Agent V1 démarrage — config chargée ===")
logging.info("=== Agent V1 démarrage — config chargée (PID %d) ===", os.getpid())
logging.info("RPA_SERVER_URL=%s", os.environ.get("RPA_SERVER_URL", "(non défini)"))
logging.info("RPA_SERVER_HOST=%s", os.environ.get("RPA_SERVER_HOST", "(non défini)"))
logging.info("RPA_API_TOKEN=%s", os.environ.get("RPA_API_TOKEN", "(non défini)")[:8] + "...")

View File

@@ -480,7 +480,7 @@ class ReplayResultReport(BaseModel):
screenshot: Optional[str] = None # Chemin ou base64 du screenshot post-action
screenshot_after: Optional[str] = None # Chemin ou base64 du screenshot APRES l'action
screenshot_before: Optional[str] = None # Screenshot AVANT l'action (pour le Critic)
actual_position: Optional[Dict[str, float]] = None # {"x": px, "y": py} position réelle du clic
actual_position: Optional[Dict[str, float]] = None # {"x_pct": float, "y_pct": float} coords résolues effectivement cliquées
# Métriques de résolution visuelle
resolution_method: Optional[str] = None # som_text_match, som_vlm, vlm_quick_find, etc.
resolution_score: Optional[float] = None
@@ -488,6 +488,8 @@ class ReplayResultReport(BaseModel):
# Champs enrichis pour target_not_found (pause supervisée)
target_description: Optional[str] = None # Description humaine de la cible
target_spec: Optional[Dict[str, Any]] = None # Spec complete de la cible
# Correction humaine (mode apprentissage supervisé)
correction: Optional[Dict[str, Any]] = None # {x_pct, y_pct, uia_snapshot, crop_b64}
class ErrorCallbackConfig(BaseModel):
@@ -1796,6 +1798,7 @@ async def start_replay(request: ReplayRequest):
total_actions=len(actions),
params=params,
machine_id=resolved_machine_id,
actions=actions,
)
# Enregistrer le mapping machine -> session pour le replay ciblé
if resolved_machine_id and resolved_machine_id != "default":
@@ -1882,6 +1885,26 @@ async def start_raw_replay(request: RawReplayRequest):
resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default")
with _replay_lock:
# ── Nettoyage : annuler les replays bloqués pour cette machine ──
# Un replay en paused_need_help bloque tous les suivants.
# Quand on lance un nouveau replay, les anciens sont obsolètes.
stale_ids = [
rid for rid, state in _replay_states.items()
if state.get("machine_id") == resolved_machine_id
and state["status"] in ("paused_need_help", "running")
]
for rid in stale_ids:
old_state = _replay_states[rid]
old_sid = old_state.get("session_id", "")
old_state["status"] = "cancelled"
# Vider la queue associée
if old_sid in _replay_queues:
_replay_queues.pop(old_sid, None)
logger.info(
f"Replay {rid} annulé (remplacé par {replay_id}) — "
f"était {old_state.get('completed_actions', 0)}/{old_state.get('total_actions', 0)}"
)
_replay_queues[session_id] = list(actions)
_replay_states[replay_id] = _create_replay_state(
replay_id=replay_id,
@@ -1890,6 +1913,7 @@ async def start_raw_replay(request: RawReplayRequest):
total_actions=len(actions),
params={},
machine_id=resolved_machine_id,
actions=actions,
)
# Enregistrer le mapping machine -> session pour le replay ciblé
if resolved_machine_id and resolved_machine_id != "default":
@@ -2089,6 +2113,7 @@ async def replay_from_session(
total_actions=len(actions),
params={},
machine_id=machine_id,
actions=actions,
)
# Enregistrer le mapping machine -> session pour le replay ciblé
if machine_id and machine_id != "default":
@@ -2345,6 +2370,7 @@ async def launch_replay_from_plan(request: PlanReplayRequest):
total_actions=len(validated),
params=dict(plan.variables or {}),
machine_id=resolved_machine_id,
actions=validated,
)
if resolved_machine_id and resolved_machine_id != "default":
_machine_replay_target[resolved_machine_id] = target_session_id
@@ -2804,10 +2830,34 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
"replay_id": "",
}
# [REPLAY] log structuré pour suivre une action à travers toute la chaîne
# Grep facile : journalctl --user -u rpa-streaming -f | grep REPLAY
_rid = owning_replay.get("replay_id", "?") if owning_replay else "?"
_tspec = action.get("target_spec") or {}
_expected_before = (
action.get("expected_window_before", "")
or _tspec.get("window_title", "")
)
_expected_after = action.get("expected_window_title", "")
_resolve_order = _tspec.get("resolve_order") or []
_by_text = _tspec.get("by_text", "")
_vlm_desc = _tspec.get("vlm_description", "")
_has_uia = bool(_tspec.get("uia_target"))
_has_anchor = bool(_tspec.get("anchor_image_base64"))
_precheck_sim = (
f" precheck_sim={precheck_result['similarity']:.3f}"
if precheck_result else ""
)
_intent_log = (action.get("intention", "") or "")[:50]
logger.info(
f"Action envoyée à {session_id} (machine={machine_id}) : "
f"{action.get('type')} (id={action.get('action_id')})"
f"{' [precheck OK sim=' + str(precheck_result['similarity']) + ']' if precheck_result else ''}"
f"[REPLAY] DISPATCH replay={_rid} session={session_id} machine={machine_id} "
f"action_id={action.get('action_id')} type={action.get('type')} "
f"intent='{_intent_log}' "
f"expected_before='{_expected_before}' expected_after='{_expected_after}' "
f"resolve_order={_resolve_order} has_uia={_has_uia} has_anchor={_has_anchor} "
f"by_text='{_by_text[:40]}' vlm_desc='{_vlm_desc[:40]}' "
f"strict={bool(action.get('success_strict'))}"
f"{_precheck_sim}"
)
response: Dict[str, Any] = {
@@ -2837,6 +2887,19 @@ async def report_action_result(report: ReplayResultReport):
session_id = report.session_id
action_id = report.action_id
# [REPLAY] log structuré d'arrivée du rapport agent
_pos_log = report.actual_position or {}
_x_log = _pos_log.get("x_pct", "?") if isinstance(_pos_log, dict) else "?"
_y_log = _pos_log.get("y_pct", "?") if isinstance(_pos_log, dict) else "?"
logger.info(
f"[REPLAY] REPORT action_id={action_id} session={session_id} "
f"success={report.success} error='{(report.error or '')[:80]}' "
f"warning='{report.warning or ''}' "
f"resolution_method='{report.resolution_method or '?'}' "
f"resolution_score={report.resolution_score or 0} "
f"actual_position=({_x_log}, {_y_log})"
)
# Trouver le replay correspondant à cette session
with _replay_lock:
replay_state = None
@@ -2945,6 +3008,24 @@ async def report_action_result(report: ReplayResultReport):
with _replay_lock:
replay_state["_last_screenshot_before"] = screenshot_after
# [REPLAY] log structuré de la décision de vérification
_ver_verified = verification.verified if verification else None
_ver_detail = verification.detail[:100] if verification and verification.detail else ""
_ver_sem = verification.semantic_verified if verification else None
_ver_sem_detail = (
verification.semantic_detail[:100]
if verification and hasattr(verification, "semantic_detail") and verification.semantic_detail
else ""
)
_final_success = report.success and (verification is None or verification.verified)
logger.info(
f"[REPLAY] VERIFY action_id={action_id} session={session_id} "
f"agent_success={report.success} "
f"ver_verified={_ver_verified} ver_detail='{_ver_detail}' "
f"sem_verified={_ver_sem} sem_detail='{_ver_sem_detail}' "
f"final_success={_final_success}"
)
# === Enregistrer le résultat ===
with _replay_lock:
result_entry = {
@@ -2973,6 +3054,26 @@ async def report_action_result(report: ReplayResultReport):
except Exception as e:
logger.debug(f"Learning: échec enregistrement: {e}")
# === Correction humaine (mode apprentissage supervisé) ===
# L'humain a montré à Léa où cliquer. On stocke cette correction
# dans target_memory pour que la prochaine fois, Léa sache toute seule.
if report.correction and original_action:
try:
corr = report.correction
target_spec = original_action.get("target_spec", {})
logger.info(
f"[APPRENTISSAGE] Correction humaine reçue : "
f"({corr.get('x_pct', 0):.4f}, {corr.get('y_pct', 0):.4f}) "
f"pour '{target_spec.get('by_text', '?')}'"
)
_replay_learner.record_human_correction(
session_id=session_id,
action=original_action,
correction=corr,
)
except Exception as e:
logger.warning(f"Learning: échec stockage correction humaine: {e}")
# === Audit Trail : traçabilité complète pour conformité hospitalière ===
try:
_action = original_action or {"action_id": action_id, "type": "unknown"}
@@ -3023,6 +3124,69 @@ async def report_action_result(report: ReplayResultReport):
except Exception as e:
logger.debug(f"Audit Trail: échec enregistrement: {e}")
# === Apprentissage persistant (Phase 1 plan Léa — Fiche #18) ===
# Single source of truth : l'agent remplit `report.actual_position`
# avec les coordonnées percentages qu'il a effectivement cliquées
# (après résolution visuelle). Le serveur les lit directement — pas
# de cache intermédiaire entre /resolve_target et /replay/result.
#
# On lit aussi le `target_spec` de l'action courante depuis
# `replay_state["actions"]`, qui contient la copie slim stockée au
# démarrage du replay (cf. _create_replay_state).
#
# Garde stricte : on ne mémorise que les clics (type == "click").
# On traite cette branche AVANT d'incrémenter current_action_index.
try:
from .replay_memory import memory_record_success, memory_record_failure
_idx = replay_state.get("current_action_index", 0)
_actions_meta = replay_state.get("actions", [])
if 0 <= _idx < len(_actions_meta):
_current = _actions_meta[_idx] or {}
if _current.get("type") == "click":
_mem_target_spec = _current.get("target_spec") or {}
_mem_window_title = (
_mem_target_spec.get("window_title", "")
or _mem_target_spec.get("expected_window_before", "")
)
if _mem_window_title:
_mem_success = (
report.success and (verification is None or verification.verified)
)
if _mem_success:
# Lire les coordonnées RÉSOLUES directement depuis
# le rapport de l'agent. Format attendu :
# actual_position = {"x_pct": float, "y_pct": float}
_pos = report.actual_position or {}
_x_pct = _pos.get("x_pct") if isinstance(_pos, dict) else None
_y_pct = _pos.get("y_pct") if isinstance(_pos, dict) else None
if _x_pct is not None and _y_pct is not None:
memory_record_success(
window_title=_mem_window_title,
target_spec=_mem_target_spec,
x_pct=float(_x_pct),
y_pct=float(_y_pct),
method=(report.resolution_method or "v4_unknown"),
confidence=float(report.resolution_score or 0.9),
)
else:
logger.debug(
"memory_record skipped: actual_position absent "
"ou sans x_pct/y_pct (agent pas à jour ?)"
)
else:
memory_record_failure(
window_title=_mem_window_title,
target_spec=_mem_target_spec,
error_message=(
report.error or report.warning or "post_cond_failed"
),
)
except Exception as _mem_exc:
logger.debug("Memory record skipped : %s", _mem_exc)
with _replay_lock:
# === Logique de retry / success / failure ===
if report.success and (verification is None or verification.verified):
@@ -3048,17 +3212,156 @@ async def report_action_result(report: ReplayResultReport):
replay_state["completed_actions"] += 1
replay_state["current_action_index"] += 1
elif not report.success and agent_warning == "wrong_window":
# L'agent a détecté en pré-vérification que la fenêtre active
# n'est pas celle attendue. Même philosophie que no_screen_change :
# un échec est un moment pédagogique, pas un stop.
#
# Causes fréquentes : Léa elle-même a pris le focus (popups de
# notification/chat), l'app cible s'est fermée, une popup système
# est apparue, l'écran a changé entre deux actions.
#
# On redirige vers paused_need_help pour que l'humain intervienne.
_tspec_ww = (original_action or {}).get("target_spec") or report.target_spec or {}
_intent_ww = ""
_idx_ww = replay_state.get("current_action_index", 0)
_actions_ww = replay_state.get("actions", [])
if 0 <= _idx_ww < len(_actions_ww):
_intent_ww = str((_actions_ww[_idx_ww] or {}).get("intention", "") or "")
_target_desc_ww = (
_intent_ww
or _tspec_ww.get("by_text", "")
or _tspec_ww.get("vlm_description", "")[:80]
or "cette action"
)
replay_state["status"] = "paused_need_help"
replay_state["failed_action"] = {
"action_id": action_id,
"type": (original_action or {}).get("type", "unknown"),
"target_description": _target_desc_ww,
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": _tspec_ww,
"reason": "wrong_window",
"error_detail": report.error or "",
}
replay_state["pause_message"] = (
f"Je m'attendais à voir la bonne fenêtre mais je vois autre "
f"chose. Peux-tu vérifier que l'application est au premier "
f"plan ? ({report.error or ''})"
)
error_entry = {
"action_id": action_id,
"error": report.error or "wrong_window",
"retry_count": retry_count,
"timestamp": time.time(),
}
replay_state["error_log"].append(error_entry)
logger.warning(
f"Replay PAUSE supervisée (wrong_window) : {action_id} "
f"{report.error or 'fenêtre incorrecte'} — en attente "
f"d'intervention humaine"
)
try:
log_replay_failure(
replay_id=replay_state["replay_id"],
action_id=action_id,
target_spec=_tspec_ww,
screenshot_b64=screenshot_after or report.screenshot,
error="wrong_window",
extra={"error_detail": report.error or "", "intent": _intent_ww},
)
except Exception as _log_exc:
logger.debug("log_replay_failure skip: %s", _log_exc)
elif not report.success and agent_warning == "no_screen_change":
# L'action a été exécutée mais l'écran n'a pas changé.
# PAS de retry — loguer l'échec et continuer vers l'action suivante.
# C'est plus honnête que "success" et évite les retries en boucle.
replay_state["unverified_actions"] += 1
replay_state["completed_actions"] += 1
replay_state["current_action_index"] += 1
logger.warning(
f"Action {action_id} : écran inchangé (no_screen_change) — "
f"action sans effet visible, on continue"
)
#
# Philosophie Léa (feedback_failure_is_learning.md) : un échec
# n'est jamais un stop avec error — c'est un **moment pédagogique**.
# Léa demande à l'humain de montrer ce qu'elle aurait dû faire.
#
# Comportement legacy (success_strict=False) : loguer l'échec
# et continuer vers l'action suivante. Justifié pour les
# workflows tolérants où un clic "sans effet" peut être normal
# (ex: cliquer sur une case déjà cochée).
#
# Comportement strict (success_strict=True) : écran inchangé =
# "je n'ai pas su faire". On redirige vers le mécanisme de pause
# supervisée existant (paused_need_help) pour que Léa demande à
# l'humain de montrer. Pas de retry automatique, pas de stop —
# on laisse la queue intacte et on attend l'intervention.
_is_strict = False
_intent_strict = ""
_idx_strict = replay_state.get("current_action_index", 0)
_actions_meta_strict = replay_state.get("actions", [])
if 0 <= _idx_strict < len(_actions_meta_strict):
_current_strict = _actions_meta_strict[_idx_strict] or {}
_is_strict = bool(_current_strict.get("success_strict", False))
_intent_strict = str(_current_strict.get("intention", "") or "")
if _is_strict:
# Apprentissage supervisé : pause, demande d'intervention
_tspec = (original_action or {}).get("target_spec") or report.target_spec or {}
_target_desc = (
_intent_strict
or _tspec.get("by_text", "")
or _tspec.get("vlm_description", "")[:80]
or "cette action"
)
replay_state["status"] = "paused_need_help"
replay_state["failed_action"] = {
"action_id": action_id,
"type": (original_action or {}).get("type", "unknown"),
"target_description": _target_desc,
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": _tspec,
"reason": "no_screen_change_strict",
"resolution_method": report.resolution_method or "",
"resolution_score": report.resolution_score or 0,
}
replay_state["pause_message"] = (
f"Mon clic sur '{_target_desc}' n'a produit aucun effet. "
f"Peux-tu me montrer où je devais cliquer ?"
)
error_entry = {
"action_id": action_id,
"error": f"no_screen_change_strict: {_target_desc}",
"retry_count": retry_count,
"timestamp": time.time(),
}
replay_state["error_log"].append(error_entry)
logger.warning(
f"Replay PAUSE supervisée (apprentissage) : {action_id} "
f"écran inchangé sur '{_target_desc}' — en attente "
f"d'intervention humaine"
)
# Logger l'échec pour l'apprentissage futur
try:
log_replay_failure(
replay_id=replay_state["replay_id"],
action_id=action_id,
target_spec=_tspec,
screenshot_b64=screenshot_after or report.screenshot,
error="no_screen_change_strict",
extra={
"target_description": _target_desc,
"resolution_method": report.resolution_method or "",
"resolution_score": report.resolution_score or 0,
"actions_completed": replay_state["completed_actions"],
},
)
except Exception as _log_exc:
logger.debug("log_replay_failure skip: %s", _log_exc)
else:
# Legacy (non-strict) : on continue, comportement historique
replay_state["unverified_actions"] += 1
replay_state["completed_actions"] += 1
replay_state["current_action_index"] += 1
logger.warning(
f"Action {action_id} : écran inchangé (no_screen_change) — "
f"action sans effet visible, on continue (non-strict)"
)
elif not report.success and (report.error or "") == "target_not_found":
# Cible non trouvée visuellement — PAUSE supervisée, PAS d'erreur fatale.
@@ -3390,6 +3693,7 @@ from .resolve_engine import (
_get_som_engine_api,
_resolve_by_som,
_resolve_target_sync,
_validate_resolution_quality,
_fuzzy_match,
_fallback_response,
_pre_analyze_screen_sync,
@@ -3448,9 +3752,29 @@ async def resolve_target(request: ResolveTargetRequest):
request.strict_mode,
processor,
)
# Validation qualité en sortie de cascade : seuil de score + garde
# de proximité contre les coords enregistrées. Single point of
# insertion, n'altère pas la cascade existante.
result = _validate_resolution_quality(
result,
request.fallback_x_pct,
request.fallback_y_pct,
)
# [REPLAY] log structuré de sortie résolution (après validation)
logger.info(
f"[REPLAY] RESOLVE_EXIT session={request.session_id} "
f"resolved={result.get('resolved', False) if result else False} "
f"method='{result.get('method', '?') if result else 'none'}' "
f"coords=({result.get('x_pct', 0):.4f}, {result.get('y_pct', 0):.4f}) "
f"score={result.get('score', 0) if result else 0} "
f"from_memory={bool(result.get('from_memory', False)) if result else False} "
f"reason='{result.get('reason', '') if result else ''}'"
)
return result
except Exception as e:
logger.error(f"Résolution visuelle échouée: {e}")
logger.error(f"[REPLAY] RESOLVE_EXCEPTION session={request.session_id} error={e}")
return _fallback_response(request, "analysis_error", str(e))
finally:
import os

View File

@@ -1147,8 +1147,45 @@ def _create_replay_state(
total_actions: int,
params: Optional[Dict[str, Any]] = None,
machine_id: Optional[str] = None,
actions: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Créer un état de replay enrichi avec les champs de suivi d'erreur."""
"""Créer un état de replay enrichi avec les champs de suivi d'erreur.
Args:
actions: Liste des actions du replay. Une copie slim (sans anchors
base64) est stockée pour permettre à `/replay/result` de
retrouver le `target_spec` de l'action courante — nécessaire
pour l'apprentissage mémoire (Phase 1 plan Léa).
"""
# Copie slim des actions : on strip les anchor_image_base64 pour ne
# pas gonfler la mémoire (anchors peuvent faire 50-200 KB chacun).
# On conserve les champs utilisés par :
# - la Phase 1 apprentissage (target_spec pour memory_record_success)
# - le contrôle strict (success_strict, expected_window_*)
# - les logs/audit (intention, action_id, type, coords)
actions_slim: List[Dict[str, Any]] = []
if actions:
for a in actions:
a_copy = {
"action_id": a.get("action_id"),
"type": a.get("type"),
"x_pct": a.get("x_pct"),
"y_pct": a.get("y_pct"),
# Contrôle strict des étapes (Dom, matin 10 avril 2026)
"success_strict": a.get("success_strict", False),
"expected_window_before": a.get("expected_window_before", ""),
"expected_window_title": a.get("expected_window_title", ""),
# Contexte métier utile pour logs et apprentissage
"intention": a.get("intention", ""),
}
ts = a.get("target_spec")
if isinstance(ts, dict):
a_copy["target_spec"] = {
k: v for k, v in ts.items()
if k not in ("anchor_image_base64",)
}
actions_slim.append(a_copy)
return {
"replay_id": replay_id,
"workflow_id": workflow_id,
@@ -1161,6 +1198,7 @@ def _create_replay_state(
"current_action_index": 0,
"params": params or {},
"results": [], # Historique des résultats action par action
"actions": actions_slim, # Copie slim pour lookup par index (Phase 1 mémoire)
# Champs enrichis pour le suivi d'erreur (#7)
"retried_actions": 0,
"unverified_actions": 0,

View File

@@ -0,0 +1,143 @@
# agent_v0/server_v1/replay_failure_logger.py
"""
Logger des echecs de replay pour l'apprentissage futur.
Chaque echec de resolution visuelle (target_not_found) est sauvegarde dans un
fichier JSONL par session, avec le screenshot de ce que l'agent voit au moment
de l'echec. Ces donnees alimentent le learning loop : re-entrainement des
embeddings, ajustement des seuils, enrichissement des target_spec.
Structure :
data/training/replay_failures/{replay_id}/failures.jsonl
data/training/replay_failures/{replay_id}/screenshots/{action_id}.jpg
"""
import base64
import json
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger("replay_failure_logger")
# Repertoire racine des echecs de replay
_FAILURES_BASE_DIR = Path("data/training/replay_failures")
# Lock pour les ecritures concurrentes
_write_lock = threading.Lock()
def log_replay_failure(
replay_id: str,
action_id: str,
target_spec: Optional[Dict[str, Any]],
screenshot_b64: Optional[str],
resolution_attempts: Optional[List[Dict[str, Any]]] = None,
error: str = "target_not_found",
extra: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""Sauvegarder un echec de replay pour l'apprentissage futur.
Args:
replay_id: Identifiant du replay en cours
action_id: Identifiant de l'action echouee
target_spec: Specification de la cible recherchee
screenshot_b64: Screenshot JPEG base64 de ce que l'agent voit
resolution_attempts: Liste des tentatives de resolution (methode, score, etc.)
error: Type d'erreur (defaut: "target_not_found")
extra: Champs supplementaires a stocker
Returns:
Chemin du fichier JSONL cree, ou None en cas d'erreur.
"""
try:
# Creer le repertoire de la session
session_dir = _FAILURES_BASE_DIR / replay_id
session_dir.mkdir(parents=True, exist_ok=True)
# Sauvegarder le screenshot si fourni
screenshot_path = None
if screenshot_b64:
screenshots_dir = session_dir / "screenshots"
screenshots_dir.mkdir(exist_ok=True)
screenshot_path = str(screenshots_dir / f"{action_id}.jpg")
try:
img_bytes = base64.b64decode(screenshot_b64)
with open(screenshot_path, "wb") as f:
f.write(img_bytes)
except Exception as e:
logger.warning(f"Impossible de sauvegarder le screenshot : {e}")
screenshot_path = None
# Construire l'entree JSONL
entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"replay_id": replay_id,
"action_id": action_id,
"target_spec": _sanitize_target_spec(target_spec) if target_spec else None,
"screenshot_path": screenshot_path,
"resolution_attempts": resolution_attempts or [],
"error": error,
}
if extra:
entry.update(extra)
# Ecrire dans le fichier JSONL (thread-safe)
jsonl_path = session_dir / "failures.jsonl"
with _write_lock:
with open(jsonl_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
logger.info(
f"Echec replay loggue : replay={replay_id} action={action_id} "
f"error={error} -> {jsonl_path}"
)
return str(jsonl_path)
except Exception as e:
logger.error(f"Impossible de logger l'echec replay : {e}")
return None
def _sanitize_target_spec(target_spec: Dict[str, Any]) -> Dict[str, Any]:
"""Nettoyer le target_spec pour le stockage (retirer les images base64 volumineuses)."""
cleaned = {}
for key, value in target_spec.items():
# Ne pas stocker les images base64 (trop volumineux pour le JSONL)
if key.endswith("_base64") or key.endswith("_b64"):
cleaned[key] = f"<{len(str(value))} chars>" if value else None
else:
cleaned[key] = value
return cleaned
def get_failure_count(replay_id: str) -> int:
"""Compter le nombre d'echecs pour un replay donne."""
jsonl_path = _FAILURES_BASE_DIR / replay_id / "failures.jsonl"
if not jsonl_path.exists():
return 0
try:
with open(jsonl_path, "r", encoding="utf-8") as f:
return sum(1 for _ in f)
except Exception:
return 0
def get_failures(replay_id: str) -> List[Dict[str, Any]]:
"""Lire tous les echecs pour un replay donne."""
jsonl_path = _FAILURES_BASE_DIR / replay_id / "failures.jsonl"
if not jsonl_path.exists():
return []
failures = []
try:
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
failures.append(json.loads(line))
except Exception as e:
logger.warning(f"Erreur lecture echecs replay {replay_id} : {e}")
return failures

View File

@@ -175,6 +175,55 @@ class ReplayLearner:
self.record(outcome)
def record_human_correction(
self,
session_id: str,
action: Dict[str, Any],
correction: Dict[str, Any],
) -> None:
"""Enregistrer une correction humaine (mode apprentissage supervisé).
L'humain a montré à Léa où cliquer. On stocke cette correction
dans target_memory.db pour que la prochaine fois, Léa sache.
"""
target_spec = action.get("target_spec", {})
by_text = target_spec.get("by_text", "")
window_title = target_spec.get("window_title", "")
x_pct = correction.get("x_pct", 0.0)
y_pct = correction.get("y_pct", 0.0)
# Enregistrer dans le JSONL d'apprentissage
outcome = ActionOutcome(
session_id=session_id,
action_id=action.get("action_id", ""),
action_type="click",
target_description=by_text,
window_title=window_title,
resolution_method="human_supervised",
resolution_score=1.0, # Confiance maximale — l'humain a montré
success=True,
)
self.record(outcome)
# Stocker dans target_memory.db pour le lookup futur
try:
from .replay_memory import get_target_memory_store
store = get_target_memory_store()
if store:
store.record_success(
screen_signature="human_correction",
target_spec=target_spec,
resolved_position={"x_pct": x_pct, "y_pct": y_pct},
method="human_supervised",
score=1.0,
)
logger.info(
f"[APPRENTISSAGE] Correction stockée dans target_memory : "
f"'{by_text}' → ({x_pct:.4f}, {y_pct:.4f})"
)
except Exception as e:
logger.warning(f"Learning: échec stockage target_memory: {e}")
def query_similar(
self,
target_description: str = "",

View File

@@ -0,0 +1,316 @@
# agent_v0/server_v1/replay_memory.py
"""
replay_memory — Greffe de TargetMemoryStore (Fiche #18) sur le pipeline V4.
Phase 1 du plan apprentissage Léa (docs/PLAN_APPRENTISSAGE_LEA.md).
Le runtime V4 appelle :
- `memory_lookup()` AVANT la cascade coûteuse (OCR/template/VLM)
- `memory_record_success()` APRÈS validation post-condition (`title_match` strict)
- `memory_record_failure()` sur les échecs
Fingerprint léger V4 : les coordonnées clic (x_pct, y_pct) sont stockées dans
les deux premières valeurs de `TargetFingerprint.bbox`, et la méthode de
résolution ayant réussi dans le champ `etype`.
Signature d'écran V4 : `sha256(normalize(window_title))[:16]`. Simple et
robuste aux données variables car les titres de fenêtre restent stables.
Les faux positifs (même titre, écrans différents) sont rattrapés par la
post-condition qui décrémentera la fiabilité via `record_failure()`.
Critère de fiabilité : 2 succès minimum et < 30% d'échecs pour déclencher
un hit (paramètres de `TargetMemoryStore.lookup`). C'est exactement la
cristallisation par répétition que l'on veut — Léa est un stagiaire qui
apprend de l'observation.
Auteur : Dom, Alice — avril 2026
"""
from __future__ import annotations
import hashlib
import logging
import os
import unicodedata
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Singleton du store persistant
# =========================================================================
_MEMORY_SINGLETON: Optional[Any] = None
_MEMORY_DISABLED = False
def get_memory_store():
"""Retourne le `TargetMemoryStore` partagé, ou None si indisponible.
Lazy-init : le store n'est créé qu'au premier appel, ce qui évite
d'importer `core.learning.target_memory_store` à l'import du module
(et donc d'éviter les effets de bord sur le démarrage du serveur).
"""
global _MEMORY_SINGLETON, _MEMORY_DISABLED
if _MEMORY_DISABLED:
return None
if _MEMORY_SINGLETON is not None:
return _MEMORY_SINGLETON
try:
from core.learning.target_memory_store import TargetMemoryStore
base_path = os.environ.get("RPA_LEARNING_DIR", "data/learning")
_MEMORY_SINGLETON = TargetMemoryStore(base_path=base_path)
logger.info(
"replay_memory: TargetMemoryStore initialisé (base=%s)", base_path,
)
return _MEMORY_SINGLETON
except Exception as exc:
logger.warning(
"replay_memory: TargetMemoryStore indisponible (%s) — "
"l'apprentissage persistant est désactivé", exc,
)
_MEMORY_DISABLED = True
return None
# =========================================================================
# Normalisation de texte et hash
# =========================================================================
def _norm_text(s: str) -> str:
"""Normalise un texte pour un hash stable (accents, casse, NBSP, espaces)."""
if not s:
return ""
s = s.replace("\u00A0", " ").strip().lower()
s = unicodedata.normalize("NFKD", s)
s = "".join(ch for ch in s if not unicodedata.combining(ch))
return " ".join(s.split())
def compute_screen_sig(window_title: str) -> str:
"""Calcule la signature d'écran V4 à partir du titre de fenêtre.
Le `window_title` est strict depuis la phase "controle des étapes"
(post-condition `title_match` obligatoire). C'est notre clé naturelle.
"""
norm = _norm_text(window_title)
if not norm:
return ""
return hashlib.sha256(norm.encode("utf-8")).hexdigest()[:16]
class _TargetSpecLike:
"""Adaptateur dict → objet pour `TargetMemoryStore._hash_target_spec()`.
Le hash interne de TargetMemoryStore utilise `getattr(spec, "by_role", ...)`
qui ne fonctionne pas avec un dict brut. On expose les attributs nécessaires.
On intègre aussi `resolve_order` et `vlm_description` dans `context_hints`
pour qu'ils entrent dans le hash — deux actions avec le même `by_text`
mais un `resolve_order` différent doivent avoir des hashes distincts.
"""
__slots__ = ("by_role", "by_text", "by_position", "context_hints")
def __init__(self, d: Dict[str, Any]):
self.by_role = d.get("by_role", "") or ""
self.by_text = d.get("by_text", "") or ""
self.by_position = d.get("by_position")
hints = dict(d.get("context_hints") or {})
resolve_order = d.get("resolve_order")
if resolve_order:
hints["_resolve_order"] = "|".join(resolve_order) if isinstance(
resolve_order, list
) else str(resolve_order)
if d.get("vlm_description"):
hints["_vlm_desc"] = str(d["vlm_description"])
if d.get("anchor_hint"):
hints["_anchor_hint"] = str(d["anchor_hint"])
self.context_hints = hints
# =========================================================================
# Lookup — consulté AVANT la cascade coûteuse
# =========================================================================
def memory_lookup(
window_title: str,
target_spec: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
"""Cherche une résolution apprise pour cette cible sur cet écran.
Returns:
Dict compatible avec le format de sortie de `_resolve_target_sync`
(resolved, method, x_pct, y_pct, score, ...) si une entrée fiable
est trouvée. None sinon.
"""
store = get_memory_store()
if store is None:
return None
screen_sig = compute_screen_sig(window_title)
if not screen_sig:
return None
try:
spec_shim = _TargetSpecLike(target_spec)
fp = store.lookup(screen_sig, spec_shim)
except Exception as exc:
logger.debug("memory_lookup: erreur lookup (%s)", exc)
return None
if fp is None:
return None
# Fingerprint léger : bbox = (x_pct, y_pct, 0, 0)
try:
x_pct = float(fp.bbox[0])
y_pct = float(fp.bbox[1])
except (TypeError, IndexError, ValueError):
logger.debug("memory_lookup: fingerprint bbox invalide")
return None
# Sanity check : les pourcentages doivent être dans [0, 1]
if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0):
logger.warning(
"memory_lookup: coords invalides (%.3f, %.3f) pour sig=%s"
"entrée ignorée",
x_pct, y_pct, screen_sig,
)
return None
method = fp.etype or "memory"
confidence = float(getattr(fp, "confidence", 0.9) or 0.9)
logger.info(
"memory_lookup HIT : sig=%s method=%s coords=(%.4f, %.4f) conf=%.2f "
"target='%s'",
screen_sig, method, x_pct, y_pct, confidence,
(target_spec.get("by_text") or "")[:60],
)
return {
"resolved": True,
"method": f"memory_{method}",
"x_pct": x_pct,
"y_pct": y_pct,
"score": confidence,
"from_memory": True,
"screen_sig": screen_sig,
}
# =========================================================================
# Record — appelé APRÈS validation post-condition
# =========================================================================
def memory_record_success(
window_title: str,
target_spec: Dict[str, Any],
x_pct: float,
y_pct: float,
method: str,
confidence: float = 0.9,
) -> bool:
"""Enregistre une résolution réussie dans la mémoire persistante.
À appeler APRÈS validation de la post-condition (`title_match` strict).
"""
store = get_memory_store()
if store is None:
return False
screen_sig = compute_screen_sig(window_title)
if not screen_sig:
return False
# Sanity check : coordonnées dans [0, 1]
try:
x_pct = float(x_pct)
y_pct = float(y_pct)
except (TypeError, ValueError):
logger.debug("memory_record_success: coords non numériques, skip")
return False
if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0):
logger.debug(
"memory_record_success: coords hors [0,1] (%.3f, %.3f), skip",
x_pct, y_pct,
)
return False
try:
from core.learning.target_memory_store import TargetFingerprint
method_clean = method or "v4_unknown"
fingerprint = TargetFingerprint(
element_id=f"v4_{method_clean}",
bbox=(x_pct, y_pct, 0.0, 0.0),
role=target_spec.get("by_role", "") or None,
etype=method_clean,
label=(target_spec.get("by_text") or "")[:200] or None,
confidence=float(confidence),
)
spec_shim = _TargetSpecLike(target_spec)
store.record_success(
screen_signature=screen_sig,
target_spec=spec_shim,
fingerprint=fingerprint,
strategy_used=method_clean,
confidence=float(confidence),
)
logger.info(
"memory_record_success: sig=%s method=%s coords=(%.4f, %.4f) "
"target='%s'",
screen_sig, method_clean, x_pct, y_pct,
(target_spec.get("by_text") or "")[:60],
)
return True
except Exception as exc:
logger.warning("memory_record_success: échec (%s)", exc)
return False
def memory_record_failure(
window_title: str,
target_spec: Dict[str, Any],
error_message: str,
) -> bool:
"""Incrémente le `fail_count` pour cette (signature, target).
Appelé quand l'action échoue OU quand la post-condition n'est pas
satisfaite. Le `TargetMemoryStore.lookup()` ignorera cette entrée
si le ratio d'échecs dépasse 30%.
"""
store = get_memory_store()
if store is None:
return False
screen_sig = compute_screen_sig(window_title)
if not screen_sig:
return False
try:
spec_shim = _TargetSpecLike(target_spec)
store.record_failure(
screen_signature=screen_sig,
target_spec=spec_shim,
error_message=(error_message or "unknown")[:200],
)
logger.debug(
"memory_record_failure: sig=%s error='%s'",
screen_sig, (error_message or "")[:80],
)
return True
except Exception as exc:
logger.debug("memory_record_failure: échec (%s)", exc)
return False

View File

@@ -1558,6 +1558,51 @@ def _resolve_target_sync(
"""
anchor_image_b64 = target_spec.get("anchor_image_base64", "")
# [REPLAY] log structuré d'entrée résolution
_window_title_log = target_spec.get("window_title", "") or ""
_resolve_order_log = target_spec.get("resolve_order") or []
_uia_target_log = target_spec.get("uia_target") or {}
_by_text_log = target_spec.get("by_text", "")
_vlm_desc_log = target_spec.get("vlm_description", "")
logger.info(
f"[REPLAY] RESOLVE_ENTRY window='{_window_title_log}' "
f"resolve_order={_resolve_order_log} "
f"has_uia={bool(_uia_target_log)} uia_name='{_uia_target_log.get('name','')[:40]}' "
f"has_anchor={bool(anchor_image_b64)} "
f"by_text='{_by_text_log[:40]}' vlm_desc='{_vlm_desc_log[:40]}' "
f"strict_mode={strict_mode} screen={screen_width}x{screen_height}"
)
# ===================================================================
# PHASE 1 APPRENTISSAGE : Lookup mémoire persistante (Fiche #18)
# ===================================================================
# Avant TOUTE résolution coûteuse (OCR/template/VLM), on consulte la
# mémoire persistante (TargetMemoryStore). Si cette cible a été résolue
# avec succès ≥2 fois sur cet écran (fail_ratio < 30%), on retourne
# directement les coordonnées mémorisées.
#
# Hit mémoire : <10ms (vs 300ms-15s de résolution)
# Miss mémoire : aucun overhead, on continue la cascade normale
#
# Les coords stockées sont celles qui ont PASSÉ la post-condition
# (title_match strict) lors des replays précédents. C'est la
# cristallisation par répétition : Léa = stagiaire qui apprend.
try:
from .replay_memory import memory_lookup
_window_title = target_spec.get("window_title", "") or ""
if _window_title:
_mem_result = memory_lookup(
window_title=_window_title,
target_spec=target_spec,
)
if _mem_result:
# Hit mémoire : on skip toute la cascade.
# Les coordonnées sont sanity-checked dans memory_lookup().
return _mem_result
except Exception as _exc:
logger.debug("Memory lookup skipped : %s", _exc)
# ===================================================================
# V4 : Résolution pilotée par le plan pré-compilé
# ===================================================================
@@ -2015,6 +2060,163 @@ def _fallback_response(request: ResolveTargetRequest, reason: str, detail: str)
}
# =========================================================================
# Validation qualité de résolution (garde score + garde proximité)
# =========================================================================
#
# Couche de sécurité appliquée en sortie de la cascade de résolution pour
# rejeter les résultats peu fiables. Deux checks :
#
# 1. Seuil de score minimum par méthode — chaque stratégie a sa propre
# fiabilité empirique. Un SoM+VLM à 0.59 n'a pas le même sens qu'un
# hybrid_text_direct à 0.59.
#
# 2. Garde de proximité — si les coordonnées enregistrées lors de la
# démonstration sont disponibles (via fallback_x/y_pct), on rejette
# tout résultat dont les coordonnées résolues sont aberrantes par
# rapport aux coordonnées attendues. Un écart > 20% de l'écran
# signale un faux positif (ex: SoM qui a trouvé le même texte
# "Enregistrer" à un endroit totalement différent).
#
# Insertion : appelée une fois, juste avant de retourner le résultat au
# client dans le handler /resolve_target. N'altère pas la cascade existante.
# Seuils minimum de score par méthode. Les méthodes non listées héritent
# du seuil par défaut (0.5). Une méthode peut être matchée par préfixe
# (ex: "memory_v4_ocr" match "memory_").
_RESOLUTION_MIN_SCORES: Dict[str, float] = {
# Mémoire Phase 1 : si une entrée a été cristallisée (≥2 succès), on a
# une confiance quasi absolue — pas de seuil strict.
"memory_": 0.0,
# hybrid_text_direct est un matching OCR direct : fiable quand il trouve.
"hybrid_text_direct": 0.80,
# SoM (Set-of-Mark) : peut retourner un faux positif si l'élément
# cherché existe à plusieurs endroits de l'écran.
"som_anchor_match": 0.75,
"som_text_match": 0.75,
"som_vlm": 0.70,
# Template matching : très strict sur la ressemblance pixel.
"template_matching": 0.85,
"v4_template": 0.85,
# OCR seul (V4 ExecutionPlan) : fiable avec score moyen.
"v4_ocr": 0.70,
# VLM : souvent moins précis sur les coordonnées, seuil plus souple.
"vlm_quick_find": 0.60,
"vlm": 0.60,
"v4_vlm": 0.60,
"grounding": 0.60,
"v4_grounding": 0.60,
# UIA local : déterministe, confiance élevée quand succès.
"v4_uia_local": 0.90,
"uia": 0.90,
}
# Écart maximum toléré entre coords résolues et coords enregistrées
# (en fraction d'écran, dans chaque axe). Au-delà, on considère que la
# résolution a trouvé un faux positif ailleurs sur l'écran.
_RESOLUTION_MAX_DRIFT: float = 0.20
def _validate_resolution_quality(
result: Optional[Dict[str, Any]],
fallback_x_pct: float,
fallback_y_pct: float,
) -> Optional[Dict[str, Any]]:
"""Valide un résultat de résolution et le rejette s'il est peu fiable.
Deux checks appliqués en sortie de cascade :
- Score minimum par méthode (voir _RESOLUTION_MIN_SCORES)
- Drift maximum par rapport aux coordonnées enregistrées
(_RESOLUTION_MAX_DRIFT, activé uniquement si fallback_x/y_pct
sont significatifs, c'est-à-dire différents du placeholder 0.5/0.5
et non nuls).
Si un check échoue, retourne un nouveau dict `resolved=False` avec
une raison explicite. Sinon retourne le result inchangé.
Cette fonction est le **seul point d'insertion** des gardes qualité :
elle n'est PAS appelée par les méthodes internes de la cascade, mais
uniquement depuis le handler HTTP `/resolve_target` après que la
cascade a produit son meilleur candidat.
"""
if not result or not isinstance(result, dict):
return result
if not result.get("resolved"):
return result
method = str(result.get("method", "") or "")
try:
score = float(result.get("score", 0) or 0)
except (TypeError, ValueError):
score = 0.0
try:
resolved_x = float(result.get("x_pct", 0) or 0)
resolved_y = float(result.get("y_pct", 0) or 0)
except (TypeError, ValueError):
return result # coords non-numériques : on ne peut rien valider
# --- Check 1 : seuil de score par méthode ---
# Trouver le seuil qui s'applique (match exact ou préfixe)
min_score: Optional[float] = None
if method in _RESOLUTION_MIN_SCORES:
min_score = _RESOLUTION_MIN_SCORES[method]
else:
for prefix, threshold in _RESOLUTION_MIN_SCORES.items():
if prefix.endswith("_") and method.startswith(prefix):
min_score = threshold
break
if min_score is not None and score < min_score:
logger.warning(
"[REPLAY] Resolution REJETÉE (score trop bas) : method=%s score=%.3f < %.2f",
method, score, min_score,
)
return {
"resolved": False,
"method": f"rejected_low_score_{method}",
"reason": f"score_{score:.3f}_below_threshold_{min_score:.2f}",
"original_method": method,
"original_score": score,
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# --- Check 2 : garde de proximité ---
# On n'applique la garde que si les coordonnées enregistrées ont un
# sens (pas des placeholders 0.5/0.5 des plans V4 ni des 0.0/0.0).
_has_recorded_coords = (
fallback_x_pct > 0.001
and fallback_y_pct > 0.001
and not (abs(fallback_x_pct - 0.5) < 0.001 and abs(fallback_y_pct - 0.5) < 0.001)
)
if _has_recorded_coords:
dx = abs(resolved_x - fallback_x_pct)
dy = abs(resolved_y - fallback_y_pct)
if dx > _RESOLUTION_MAX_DRIFT or dy > _RESOLUTION_MAX_DRIFT:
logger.warning(
"[REPLAY] Resolution REJETÉE (drift trop grand) : "
"method=%s resolved=(%.3f, %.3f) expected=(%.3f, %.3f) "
"drift=(%.3f, %.3f) max=%.2f",
method, resolved_x, resolved_y,
fallback_x_pct, fallback_y_pct,
dx, dy, _RESOLUTION_MAX_DRIFT,
)
return {
"resolved": False,
"method": f"rejected_drift_{method}",
"reason": f"drift_dx{dx:.3f}_dy{dy:.3f}_max{_RESOLUTION_MAX_DRIFT:.2f}",
"original_method": method,
"original_score": score,
"drift_dx": round(dx, 3),
"drift_dy": round(dy, 3),
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# Validation OK — on retourne le result inchangé
return result
# =========================================================================
# Observer — Pré-analyse écran avant résolution
# =========================================================================

View File

@@ -1,622 +0,0 @@
"""
UIDetector - Détection Sémantique d'Éléments UI avec VLM
Utilise un Vision-Language Model (VLM) pour détecter et classifier
les éléments UI avec leurs types et rôles sémantiques.
"""
from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path
from dataclasses import dataclass
import numpy as np
from PIL import Image
import json
import re
from ..models.ui_element import UIElement, UIElementEmbeddings, VisualFeatures
from .ollama_client import OllamaClient, check_ollama_available
@dataclass
class DetectionConfig:
"""Configuration de la détection UI"""
vlm_model: str = "qwen3-vl:8b" # Modèle VLM à utiliser (qwen3-vl:8b recommandé)
vlm_endpoint: str = "http://localhost:11434" # Endpoint Ollama
confidence_threshold: float = 0.7 # Seuil de confiance minimum
max_elements: int = 50 # Nombre max d'éléments à détecter
detect_regions: bool = True # Détecter régions d'intérêt d'abord
use_embeddings: bool = True # Générer embeddings duaux
class UIDetector:
"""
Détecteur d'éléments UI sémantique
Utilise un VLM (Vision-Language Model) pour :
1. Détecter les régions d'intérêt dans un screenshot
2. Classifier le type de chaque élément UI
3. Déterminer le rôle sémantique
4. Extraire les features visuelles
5. Générer des embeddings duaux (image + texte)
"""
def __init__(self, config: Optional[DetectionConfig] = None):
"""
Initialiser le détecteur
Args:
config: Configuration (utilise config par défaut si None)
"""
self.config = config or DetectionConfig()
self.vlm_client = None
self._initialize_vlm()
def _initialize_vlm(self) -> None:
"""Initialiser le client VLM (Ollama)"""
try:
# Vérifier si Ollama est disponible
if check_ollama_available(self.config.vlm_endpoint):
self.vlm_client = OllamaClient(
endpoint=self.config.vlm_endpoint,
model=self.config.vlm_model
)
print(f"✓ VLM initialized: {self.config.vlm_model} at {self.config.vlm_endpoint}")
else:
print(f"⚠ Ollama not available at {self.config.vlm_endpoint}, using simulation mode")
self.vlm_client = None
except Exception as e:
print(f"⚠ Failed to initialize VLM: {e}, using simulation mode")
self.vlm_client = None
def detect(self,
screenshot_path: str,
window_context: Optional[Dict[str, Any]] = None) -> List[UIElement]:
"""
Détecter tous les éléments UI dans un screenshot
Args:
screenshot_path: Chemin vers le screenshot
window_context: Contexte de la fenêtre (titre, process, etc.)
Returns:
Liste d'UIElements détectés
"""
# Charger image
image = self._load_image(screenshot_path)
if image is None:
return []
# Détecter régions d'intérêt si activé
if self.config.detect_regions:
regions = self._detect_regions_of_interest(image, window_context)
else:
# Utiliser image complète
regions = [{"bbox": (0, 0, image.width, image.height), "confidence": 1.0}]
# Détecter éléments UI dans chaque région
ui_elements = []
for region in regions:
elements = self._detect_elements_in_region(
image,
region,
screenshot_path,
window_context
)
ui_elements.extend(elements)
# Filtrer par confiance
ui_elements = [
el for el in ui_elements
if el.confidence >= self.config.confidence_threshold
]
# Limiter nombre d'éléments
if len(ui_elements) > self.config.max_elements:
# Trier par confiance et garder les meilleurs
ui_elements.sort(key=lambda x: x.confidence, reverse=True)
ui_elements = ui_elements[:self.config.max_elements]
return ui_elements
def _load_image(self, screenshot_path: str) -> Optional[Image.Image]:
"""Charger une image depuis un fichier"""
try:
return Image.open(screenshot_path)
except Exception as e:
print(f"Error loading image {screenshot_path}: {e}")
return None
def _detect_regions_of_interest(self,
image: Image.Image,
window_context: Optional[Dict] = None) -> List[Dict]:
"""
Détecter les régions d'intérêt dans l'image
Utilise le VLM pour identifier les zones contenant des éléments UI.
Args:
image: Image PIL
window_context: Contexte de la fenêtre
Returns:
Liste de régions {bbox: (x, y, w, h), confidence: float}
"""
if self.vlm_client is None:
# Mode simulation : diviser l'image en grille
return self._simulate_region_detection(image)
# Utiliser VLM pour détecter régions
# Pour l'instant, on utilise l'image complète (plus simple et efficace)
width, height = image.size
return [{
"bbox": (0, 0, width, height),
"confidence": 1.0
}]
def _simulate_region_detection(self, image: Image.Image) -> List[Dict]:
"""Simulation de détection de régions (pour développement)"""
width, height = image.size
# Diviser en grille 3x3 pour simulation
regions = []
grid_size = 3
cell_w = width // grid_size
cell_h = height // grid_size
for i in range(grid_size):
for j in range(grid_size):
regions.append({
"bbox": (j * cell_w, i * cell_h, cell_w, cell_h),
"confidence": 0.8
})
return regions
def _detect_elements_in_region(self,
image: Image.Image,
region: Dict,
screenshot_path: str,
window_context: Optional[Dict] = None) -> List[UIElement]:
"""
Détecter éléments UI dans une région spécifique
Args:
image: Image complète
region: Région à analyser
screenshot_path: Chemin du screenshot
window_context: Contexte de la fenêtre
Returns:
Liste d'UIElements dans cette région
"""
bbox = region["bbox"]
x, y, w, h = bbox
# Extraire crop de la région
region_image = image.crop((x, y, x + w, y + h))
# Détecter éléments avec VLM
if self.vlm_client is None:
# Mode simulation
return self._simulate_element_detection(
region_image, bbox, screenshot_path, window_context
)
# Vraie détection avec VLM !
return self._detect_with_vlm(
region_image, bbox, screenshot_path, window_context
)
def _detect_with_vlm(self,
region_image: Image.Image,
region_bbox: Tuple[int, int, int, int],
screenshot_path: str,
window_context: Optional[Dict] = None) -> List[UIElement]:
"""
Détecter éléments UI avec le VLM (vraie détection)
Args:
region_image: Image de la région
region_bbox: Bbox de la région (x, y, w, h)
screenshot_path: Chemin du screenshot
window_context: Contexte de la fenêtre
Returns:
Liste d'UIElements détectés
"""
x_offset, y_offset, w, h = region_bbox
# Construire le prompt pour le VLM
context_str = ""
if window_context:
context_str = f"\nWindow context: {window_context.get('title', 'Unknown')}"
# Approche simplifiée : demander une description structurée
prompt = f"""List all interactive UI elements in this screenshot.{context_str}
For each element, provide:
- type (button, text_input, checkbox, link, etc.)
- label (visible text)
- approximate position (top/middle/bottom, left/center/right)
Format as JSON array:
[{{"type": "button", "label": "Submit", "position": "middle-center"}}]
Return ONLY the JSON array, no other text."""
# Appeler le VLM
# Note: Utiliser le chemin du screenshot complet plutôt que le crop
# car certains VLM gèrent mieux les fichiers que les images PIL
result = self.vlm_client.generate(
prompt=prompt,
image_path=screenshot_path, # Utiliser le chemin au lieu de l'image PIL
temperature=0.1,
max_tokens=1000
)
if not result["success"]:
print(f"❌ VLM detection failed: {result.get('error', 'Unknown error')}")
return []
if not result["response"] or len(result["response"].strip()) == 0:
print(f"⚠ VLM returned empty response")
return []
# Parser la réponse JSON
elements = self._parse_vlm_response(
result["response"],
region_bbox,
screenshot_path,
window_context
)
return elements
def _parse_vlm_response(self,
response: str,
region_bbox: Tuple[int, int, int, int],
screenshot_path: str,
window_context: Optional[Dict] = None) -> List[UIElement]:
"""
Parser la réponse JSON du VLM
Args:
response: Réponse texte du VLM
region_bbox: Bbox de la région
screenshot_path: Chemin du screenshot
window_context: Contexte de la fenêtre
Returns:
Liste d'UIElements
"""
x_offset, y_offset, region_w, region_h = region_bbox
try:
# Extraire le JSON de la réponse (peut contenir du texte avant/après)
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if not json_match:
print(f"No JSON array found in VLM response")
print(f"VLM response was: {response[:500]}...")
return []
elements_data = json.loads(json_match.group(0))
if not isinstance(elements_data, list):
print(f"VLM response is not a JSON array")
return []
elements = []
for i, elem_data in enumerate(elements_data):
try:
# Gérer les positions (pourcentages ou textuelles)
if 'x' in elem_data and 'y' in elem_data:
# Format avec pourcentages
x_pct = float(elem_data.get('x', 0))
y_pct = float(elem_data.get('y', 0))
w_pct = float(elem_data.get('width', 10))
h_pct = float(elem_data.get('height', 5))
elem_x = x_offset + int(region_w * x_pct / 100)
elem_y = y_offset + int(region_h * y_pct / 100)
elem_w = int(region_w * w_pct / 100)
elem_h = int(region_h * h_pct / 100)
else:
# Format avec position textuelle (top/middle/bottom, left/center/right)
position = elem_data.get('position', 'middle-center').lower()
# Parser la position
if 'top' in position:
elem_y = y_offset + region_h // 4
elif 'bottom' in position:
elem_y = y_offset + 3 * region_h // 4
else: # middle
elem_y = y_offset + region_h // 2
if 'left' in position:
elem_x = x_offset + region_w // 4
elif 'right' in position:
elem_x = x_offset + 3 * region_w // 4
else: # center
elem_x = x_offset + region_w // 2
# Taille par défaut basée sur le type
elem_type = elem_data.get('type', 'button')
if elem_type == 'button':
elem_w, elem_h = 100, 40
elif elem_type == 'text_input':
elem_w, elem_h = 200, 35
elif elem_type == 'checkbox':
elem_w, elem_h = 25, 25
else:
elem_w, elem_h = 80, 30
# Créer l'UIElement
element = UIElement(
element_id=f"vlm_{elem_x}_{elem_y}",
type=elem_data.get('type', 'unknown'),
role=elem_data.get('role', 'unknown'),
bbox=(elem_x, elem_y, elem_w, elem_h),
center=(elem_x + elem_w // 2, elem_y + elem_h // 2),
label=elem_data.get('label', ''),
label_confidence=0.85, # Confiance par défaut pour VLM
embeddings=UIElementEmbeddings(),
visual_features=VisualFeatures(
dominant_color="rgb(128, 128, 128)",
has_icon=elem_data.get('type') == 'icon',
shape="rectangle",
size_category="medium"
),
confidence=0.85, # Confiance par défaut pour VLM
metadata={
"detected_by": "vlm",
"model": self.config.vlm_model,
"screenshot_path": screenshot_path
}
)
elements.append(element)
except (KeyError, ValueError, TypeError) as e:
print(f"Error parsing element {i}: {e}")
continue
return elements
except json.JSONDecodeError as e:
print(f"Failed to parse VLM JSON response: {e}")
print(f"Response was: {response[:200]}...")
return []
def _simulate_element_detection(self,
region_image: Image.Image,
region_bbox: Tuple[int, int, int, int],
screenshot_path: str,
window_context: Optional[Dict] = None) -> List[UIElement]:
"""Simulation de détection d'éléments (pour développement)"""
# Pour simulation, créer quelques éléments fictifs
elements = []
x_offset, y_offset, w, h = region_bbox
# Simuler 2-3 éléments par région
num_elements = np.random.randint(2, 4)
for i in range(num_elements):
# Position aléatoire dans la région
elem_w = np.random.randint(50, 150)
elem_h = np.random.randint(20, 60)
elem_x = x_offset + np.random.randint(0, max(1, w - elem_w))
elem_y = y_offset + np.random.randint(0, max(1, h - elem_h))
# Type et rôle aléatoires
types = ["button", "text_input", "checkbox", "link", "icon"]
roles = ["primary_action", "cancel", "submit", "form_input", "navigation"]
element = UIElement(
element_id=f"elem_{elem_x}_{elem_y}",
type=np.random.choice(types),
role=np.random.choice(roles),
bbox=(elem_x, elem_y, elem_w, elem_h),
center=(elem_x + elem_w // 2, elem_y + elem_h // 2),
label=f"Element {i}",
label_confidence=np.random.uniform(0.7, 0.95),
embeddings=UIElementEmbeddings(), # Embeddings vides
visual_features=VisualFeatures(
dominant_color="rgb(128, 128, 128)",
has_icon=np.random.choice([True, False]),
shape="rectangle",
size_category="medium"
),
confidence=np.random.uniform(0.7, 0.95),
metadata={"simulated": True, "screenshot_path": screenshot_path}
)
elements.append(element)
return elements
def classify_type(self,
element_image: Image.Image,
context: Optional[Dict] = None) -> Tuple[str, float]:
"""
Classifier le type d'un élément UI
Args:
element_image: Image de l'élément
context: Contexte additionnel
Returns:
(type, confidence)
"""
if self.vlm_client is None:
# Simulation
types = ["button", "text_input", "checkbox", "radio", "dropdown",
"tab", "link", "icon", "table_row", "menu_item"]
return np.random.choice(types), np.random.uniform(0.7, 0.95)
# Vraie classification avec VLM
result = self.vlm_client.classify_element_type(element_image, context)
if result["success"]:
return result["type"], result["confidence"]
return "unknown", 0.0
def classify_role(self,
element_image: Image.Image,
element_type: str,
context: Optional[Dict] = None) -> Tuple[str, float]:
"""
Classifier le rôle sémantique d'un élément
Args:
element_image: Image de l'élément
element_type: Type de l'élément
context: Contexte additionnel
Returns:
(role, confidence)
"""
if self.vlm_client is None:
# Simulation
roles = ["primary_action", "cancel", "submit", "form_input",
"search_field", "navigation", "settings", "close"]
return np.random.choice(roles), np.random.uniform(0.7, 0.95)
# Vraie classification avec VLM
result = self.vlm_client.classify_element_role(
element_image,
element_type,
context
)
if result["success"]:
return result["role"], result["confidence"]
return "unknown", 0.0
def extract_visual_features(self,
element_image: Image.Image) -> VisualFeatures:
"""
Extraire les features visuelles d'un élément
Args:
element_image: Image de l'élément
Returns:
VisualFeatures
"""
# Calculer couleur dominante
img_array = np.array(element_image)
if len(img_array.shape) == 3:
# Moyenne des couleurs
dominant_color = tuple(img_array.mean(axis=(0, 1)).astype(int).tolist())
else:
dominant_color = (128, 128, 128)
# Déterminer forme (simplifié)
width, height = element_image.size
aspect_ratio = width / height if height > 0 else 1.0
if aspect_ratio > 3:
shape = "horizontal_bar"
elif aspect_ratio < 0.33:
shape = "vertical_bar"
elif 0.8 <= aspect_ratio <= 1.2:
shape = "square"
else:
shape = "rectangle"
# Catégorie de taille
area = width * height
if area < 1000:
size_category = "small"
elif area < 10000:
size_category = "medium"
else:
size_category = "large"
# Détection d'icône (simplifié)
has_icon = width < 100 and height < 100 and 0.8 <= aspect_ratio <= 1.2
return VisualFeatures(
dominant_color=dominant_color,
has_icon=has_icon,
shape=shape,
size_category=size_category
)
def generate_embeddings(self,
element_image: Image.Image,
element_label: str,
embedder: Optional[Any] = None) -> Optional[UIElementEmbeddings]:
"""
Générer embeddings duaux (image + texte) pour un élément
Args:
element_image: Image de l'élément
element_label: Label textuel de l'élément
embedder: Embedder à utiliser (optionnel)
Returns:
UIElementEmbeddings ou None
"""
if not self.config.use_embeddings or embedder is None:
return None
try:
# Générer embedding image
image_embedding_id = None
if hasattr(embedder, 'embed_image'):
# Sauvegarder temporairement l'image
# TODO: Implémenter sauvegarde et embedding
pass
# Générer embedding texte
text_embedding_id = None
if element_label and hasattr(embedder, 'embed_text'):
# TODO: Implémenter embedding texte
pass
if image_embedding_id or text_embedding_id:
return UIElementEmbeddings(
image_embedding_id=image_embedding_id,
text_embedding_id=text_embedding_id,
provider="openclip_ViT-B-32",
dimensions=512
)
except Exception as e:
print(f"Warning: Failed to generate embeddings: {e}")
return None
def set_vlm_client(self, client: Any) -> None:
"""Définir le client VLM"""
self.vlm_client = client
def get_config(self) -> DetectionConfig:
"""Récupérer la configuration"""
return self.config
# ============================================================================
# Fonctions utilitaires
# ============================================================================
def create_detector(vlm_model: str = "qwen3-vl:8b",
confidence_threshold: float = 0.7) -> UIDetector:
"""
Créer un UIDetector avec configuration personnalisée
Args:
vlm_model: Modèle VLM à utiliser
confidence_threshold: Seuil de confiance
Returns:
UIDetector configuré
"""
config = DetectionConfig(
vlm_model=vlm_model,
confidence_threshold=confidence_threshold
)
return UIDetector(config)

View File

@@ -37,6 +37,21 @@ logger = logging.getLogger(__name__)
# Timeout par défaut pour les appels UIA (en secondes)
_DEFAULT_TIMEOUT = 5.0
# Masquer la fenêtre console lors du spawn de lea_uia.exe sur Windows.
# Sans ce flag, chaque appel (à chaque clic utilisateur pendant
# l'enregistrement) fait apparaître une fenêtre cmd noire brièvement
# visible à l'écran → ralentit la souris et pollue les screenshots
# capturés (le VLM peut "voir" le chemin lea_uia.exe comme texte cliqué).
#
# La valeur 0x08000000 correspond à CREATE_NO_WINDOW défini dans
# l'API Windows. Sur Linux/Mac, la valeur est 0 et `creationflags`
# est ignoré. getattr() gère le cas où Python expose déjà la constante
# sur Windows.
if platform.system() == "Windows":
_SUBPROCESS_CREATION_FLAGS = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000)
else:
_SUBPROCESS_CREATION_FLAGS = 0
@dataclass
class UiaElement:
@@ -166,6 +181,7 @@ class UIAHelper:
timeout=self._timeout,
encoding="utf-8",
errors="replace",
creationflags=_SUBPROCESS_CREATION_FLAGS,
)
if result.returncode != 0:
logger.debug(

View File

@@ -51,10 +51,14 @@ echo Pour arreter Lea : clic droit sur l'icone ^> "Quitter Lea"
echo Vous pouvez fermer cette fenetre.
echo.
.venv\Scripts\pythonw.exe run_agent_v1.py
start "" /b .venv\Scripts\pythonw.exe run_agent_v1.py
:: Attendre 3s puis verifier que Lea tourne
timeout /t 3 >nul
tasklist /FI "IMAGENAME eq pythonw.exe" /NH 2>nul | findstr /I "pythonw" >nul
if errorlevel 1 (
echo.
echo Lea a rencontre un probleme au demarrage.
echo Lea n'a pas demarre correctement.
echo Tentative avec affichage des erreurs...
echo.
.venv\Scripts\python.exe run_agent_v1.py

View File

@@ -0,0 +1,220 @@
# Plan Apprentissage Léa — Phase 1 / 2 / 3
**Date** : 10 avril 2026
**Auteur** : Dom + Claude (session cartographie target_resolver)
**Statut** : Plan validé par Dom, implémentation non commencée
---
## Contexte
Après deux semaines à debugger le replay sur Windows et avoir écrit du code (V4 : surface_classifier, UIA, execution_plan, executor strict) qui **dupliquait sans le savoir** des concepts déjà présents dans le V3 legacy, une cartographie exhaustive a été lancée.
Fichiers lus en profondeur :
- `core/execution/target_resolver.py` (3495 lignes)
- `core/learning/target_memory_store.py` (545 lignes — Fiche #18)
- `core/models/workflow_graph.py` (TargetSpec — 570-640)
- `core/detection/spatial_analyzer.py` (595 lignes)
## Découverte critique
**Les pipelines V3 et V4 sont complètement découplés au runtime de replay.**
```
REPLAY V4 (actif aujourd'hui) LEGACY V3 (dormant au replay)
============================= =============================
stream_processor workflow_pipeline
↓ ↓
execution_plan_runner execution_loop
↓ ↓
agent_v1/core/executor.py action_executor
↓ ↓
OCR + template + VLM direct target_resolver
target_memory_store (Fiche #18)
SpatialAnalyzer
```
Vérifié par `grep "from core.execution" agent_v0/`**zéro import**.
Callers V3 encore vivants (mais pas sur le chemin de replay critique) :
- `agent_chat/app.py`
- `visual_workflow_builder/backend/api/workflows.py`
- `core/evaluation/*`
## Modules dormants à valeur immédiate
### TargetMemoryStore — le Crystallizer qu'on pensait devoir écrire
- SQLite `data/learning/target_memory.db` + JSONL audit `data/learning/events/YYYY-MM-DD/*.jsonl`
- API propre et testée :
- `record_success(screen_sig, target_spec, fingerprint, strategy, confidence)`
- `record_failure(screen_sig, target_spec, error)`
- `lookup(screen_sig, target_spec, min_success_count=2, max_fail_ratio=0.3)` → fingerprint ou None
- Clé unique : `(screen_signature, target_spec_hash)`
- Fingerprint : `(element_id, bbox, role, etype, label, confidence)`
- **Critère de fiabilité** : au moins 2 succès et < 30% d'échecs → c'est ça la "cristallisation par répétition"
### TargetSpec — vocabulaire déjà riche
Dans `core/models/workflow_graph.py:572` :
- `context_hints` : `near_text`, `below_text`, `right_of_text`, `same_row_as_text`, `within_region`, `exclude_near_text`
- `hard_constraints` : `within_container_text`, `min_area`
- `weights` : `proximity`, `alignment`, `container`, `roi_iou`
### ResolutionStrategy V4 — vocabulaire pauvre (à enrichir)
Dans `core/workflow/execution_plan.py:27` :
- `target_text`, `anchor_b64`, `zone`, `vlm_description`, `uia_*`, `dom_*`
- Pas de context_hints, pas de hard_constraints → trou dans l'expressivité
## Décision validée
**Léa = stagiaire qui apprend de la répétition.** La mémoire précède la généralisation. Mais le raisonnement spatial reste indispensable comme filet de sécurité quand la mémoire ne suffit pas (décalages de layout, premier replay sur nouvel écran, généralisation entre écrans similaires).
## Plan séquencé
### Phase 1 — Mémoire sur V4 (≈1 jour, ~150 lignes)
**Objectif** : greffer `TargetMemoryStore` directement sur le resolve V4, sans passer par target_resolver ni UIElement.
**Lookup avant OCR/template/VLM**
```python
fp = memory.lookup(screen_sig, target_spec)
if fp:
# On a vu ce clic réussir ≥2 fois sur cet écran
return fp.bbox # clic direct, <10ms
```
**Record après validation post-condition (déjà en place — `title_match` strict)**
```python
if post_condition_passed:
memory.record_success(screen_sig, target_spec, fingerprint, "v4_ocr", confidence)
else:
memory.record_failure(screen_sig, target_spec, reason)
```
**À construire**
- `screen_signature(screenshot)` → hash stable. Piste : `window_title` + tokens OCR dominants, ou réutiliser `core/execution/screen_signature.py` si compatible.
- Fingerprint léger : `(x, y, w, h, method)`. Pas besoin de role/type/label en V4.
- Point de branchement exact à confirmer avant implémentation :
- Côté serveur dans `resolve_engine` (si resolve serveur)
- Côté agent dans `agent_v1/core/executor.py` (si resolve local)
**Bénéfice observable**
- 3ème passage d'un workflow sur même écran : 10-15s VLM remplacés par <10ms lookup
- Léa **apprend** vraiment — pas parce qu'on a écrit un Crystallizer, parce qu'on a consommé celui qui dort depuis mars
**Tests de validation**
- [ ] Rejouer un workflow 3 fois, mesurer le temps du 3ème passage
- [ ] Vérifier que `data/learning/target_memory.db` se remplit
- [ ] Vérifier que les événements JSONL s'écrivent
### Phase 2 light — Raisonnement spatial OCR-only (≈3-5 jours, ~300-400 lignes)
**Principe clé** : pur pixel/OCR. Pas d'`UIElement`, pas de role/type, pas de parser UI. On évite le piège "ressusciter V3 complet".
**À l'enregistrement (IRBuilder, côté serveur)**
1. Pour chaque clic `(x, y)` dans la trace
2. OCR la zone autour (±300px)
3. Identifier les 3-5 textes les plus proches avec direction (left/right/above/below) et distance
4. Populer `ResolutionStrategy.context_hints` :
```python
{
"right_of_text": "Nom du patient", # 60px à gauche du clic
"below_text": "Identité", # 120px au-dessus
"near_text": "Enregistrer", # le texte du clic lui-même
}
```
**Au replay (resolve_engine)**, en cascade :
1. Lookup mémoire (Phase 1) → si hit, clic direct
2. Sinon : OCR de l'écran actuel
3. Trouver les ancres de `context_hints` via OCR (normalisation accents + fuzzy Fiche #8)
4. Calculer la zone candidate par intersection des contraintes spatiales
5. Cliquer
6. Si post-cond échoue : retombée VLM (exception handler)
**Logique à porter depuis target_resolver.py**
- `_apply_context_hints_to_candidates` (lignes 2601-2803) — adaptée à "candidats = zones OCR" au lieu de "candidats = UIElement"
- `_find_element_by_text` + normalisation (`_norm_text`, `_fuzzy_ratio`) lignes 211-235
- Healing profile (ligne 395) pour relaxation progressive
**Décision tranchée**
- OCR **côté serveur Linux** (docTR déjà présent via SomEngine)
- Zéro changement sur le client Windows
- Le serveur reçoit le screenshot au moment du build IR, extrait les context_hints, les intègre dans `ResolutionStrategy`
**Enrichissement de `ResolutionStrategy` (execution_plan.py)**
Ajouter au dataclass :
```python
context_hints: Dict[str, Any] = field(default_factory=dict)
```
Et dans `execution_plan_runner._strategy_to_target_spec` : propager `context_hints` dans `target_spec`.
**Tests de validation**
- [ ] Enregistrer un workflow, vérifier que le plan contient des `context_hints` cohérents
- [ ] Modifier la résolution de la VM (1920→1280), rejouer, vérifier que les clics atteignent la bonne cible
- [ ] Ajouter un champ au-dessus de la cible, rejouer, vérifier robustesse
### Phase 3 — Spatial V3 complet (pas maintenant)
**Correction 10 avril 2026** : une version précédente de ce document affirmait qu'OmniParser avait été retiré. **C'était faux.** OmniParser est toujours présent :
- `core/detection/omniparser_adapter.py` — 429 lignes
- `agent_v0/server_v1/resolve_engine.py:254` — `_get_omniparser()` singleton thread-safe, lazy-load
- `agent_v0/server_v1/resolve_engine.py:293` — `_resolve_by_yolo()` défini et importé dans `api_stream.py`
Ce qui est vrai : `_resolve_by_yolo` **n'est jamais appelé** dans la cascade V4 (`_resolve_target_sync` ne l'invoque pas). C'est du code **dormant**, pas supprimé.
**Conséquence pour Phase 3** : on a potentiellement **déjà** un parser UI utilisable. Deux pistes :
1. **Ré-activer `_resolve_by_yolo`** dans la cascade V4 (injecter un appel dans `_resolve_target_sync` comme fallback après OCR/template/VLM). Il produit déjà une liste d'éléments détectés avec bbox et role approximatif.
2. **Pont `_resolve_by_yolo → List[UIElement]`** : adapter la sortie YOLO pour alimenter `target_resolver` V3. Un pont d'une centaine de lignes devrait suffire.
**Avant de lancer Phase 3**, vérifier :
- Les modèles YOLO sont-ils toujours sur disque ? (`omniparser.detect()` lazy-loads)
- Quelle qualité de détection sur des écrans Citrix/DPI réels ?
- Les tests `tests/integration/test_auto_healing_integration.py` et `tests/unit/test_fiche11_*` passent-ils encore ?
**Tant qu'on n'a pas fait cette vérification, Phase 3 reste pending.**
## Ce qu'on ne fait PAS
| Tentation | Pourquoi on résiste |
|-----------|---------------------|
| Refactorer `target_resolver.py` pour le rendre V4-compatible | 3495 lignes couplées à `UIElement` disparu — plus économique de le laisser dormir et recoder l'essentiel minimal dans V4 |
| Brancher `action_executor` sur le streaming replay | 2000 lignes de pipeline pour un bénéfice qu'on a en 150 lignes avec TargetMemoryStore seul |
| Ressusciter `SpatialAnalyzer` maintenant | Zéro valeur sans `UIElement` riches en amont |
| Faire Phase 2 avant Phase 1 | Léa raisonnerait à chaque clic, lent et coûteux — pas un "stagiaire qui apprend", juste un agent qui réfléchit en boucle |
## Suivi d'avancement
### Phase 1 — Mémoire sur V4
- [ ] Identifier le point de branchement exact (serveur vs agent)
- [ ] Définir `screen_signature` stable pour V4
- [ ] Définir le format fingerprint léger
- [ ] Brancher `memory.lookup()` avant cascade OCR/template/VLM
- [ ] Brancher `memory.record_success()` après post-cond validée
- [ ] Brancher `memory.record_failure()` sur échec
- [ ] Test : workflow rejoué 3 fois, 3ème en <100ms sur le resolve
- [ ] Vérifier remplissage de `data/learning/target_memory.db`
### Phase 2 light — Spatial OCR-only
- [ ] Enrichir `ResolutionStrategy` avec `context_hints`
- [ ] IRBuilder : extraire context_hints via OCR au build
- [ ] `execution_plan_runner` : propager context_hints dans target_spec
- [ ] resolve_engine : implémenter fallback spatial OCR
- [ ] Porter `_apply_context_hints_to_candidates` adapté
- [ ] Porter normalisation texte (`_norm_text`, `_fuzzy_ratio`)
- [ ] Test : résolution VM modifiée, clic atteint toujours la cible
- [ ] Test : champ ajouté dans le formulaire, robustesse préservée
### Phase 3 — Spatial V3 complet
- [ ] **BLOQUÉ** jusqu'à ce qu'un parser UI produise des `UIElement`
## Liens
- Code de référence : `core/execution/target_resolver.py`, `core/learning/target_memory_store.py`
- Architecture V4 : `core/workflow/execution_plan.py`, `core/workflow/execution_compiler.py`, `agent_v0/server_v1/execution_plan_runner.py`
- Replay runtime : `agent_v0/agent_v1/core/executor.py`

13
tools/run_session_cleaner.sh Executable file
View File

@@ -0,0 +1,13 @@
#!/bin/bash
# Lancement rapide du Session Cleaner
# Usage : ./tools/run_session_cleaner.sh [--port 5006] [--debug]
cd "$(dirname "$0")/.."
source .venv/bin/activate 2>/dev/null || true
# Charger le token API depuis .env.local si present
if [ -f .env.local ]; then
export $(grep RPA_API_TOKEN .env.local 2>/dev/null | xargs)
fi
python tools/session_cleaner.py "$@"

1263
tools/session_cleaner.py Normal file

File diff suppressed because it is too large Load Diff