Premier replay fonctionnel de bout en bout (Bloc-notes, Chrome). Corrections critiques : - Fix double-lancement agent (Lea.bat start /b + verrou PID) - Sérialisation replay (threading.Lock dans poll_and_execute) - Garde UIA bbox >50% écran (rejet conteneurs "Bureau") - Filtre fenêtres bruit système (systray overflow) - Auto-nettoyage replays bloqués (paused_need_help) Cascade visuelle complète dans session_cleaner : - UIA local (10ms) → template matching (100ms) → serveur docTR/VLM - Nettoyage bureau pré-replay (clic "Afficher le bureau") - Crops 80x80 + vlm_description pour chaque clic Grounding contraint à la fenêtre active : - Capture croppée à la fenêtre au lieu de l'écran entier - Conversion coordonnées fenêtre → écran - Élimine les faux positifs taskbar/systray Mode apprentissage supervisé (SUPERVISE → capture humaine) : - Léa passe en mode capture quand elle est perdue - Capture mini-workflow humain (clics + frappes + combos) - Fin par Ctrl+Shift+L ou timeout inactivité 10s - Correction stockée dans target_memory.db via serveur Deploy Windows complet (grounding.py, policy.py, uia_helper.py). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
135 lines
4.9 KiB
Python
135 lines
4.9 KiB
Python
# run_agent_v1.py
|
|
import sys
|
|
import os
|
|
import atexit
|
|
|
|
# Ajout du répertoire courant au PYTHONPATH pour permettre les imports de modules
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
if current_dir not in sys.path:
|
|
sys.path.append(current_dir)
|
|
|
|
# ---------------------------------------------------------------
|
|
# Verrou PID — empêche le lancement de plusieurs instances
|
|
# Même si Lea.bat est double-cliqué ou lancé deux fois,
|
|
# un seul agent tourne à la fois (defense-in-depth).
|
|
# ---------------------------------------------------------------
|
|
LOCK_FILE = os.path.join(current_dir, "lea_agent.lock")
|
|
|
|
|
|
def _pid_is_alive(pid: int) -> bool:
|
|
"""Vérifie si un processus avec ce PID existe encore (Windows + Unix)."""
|
|
if sys.platform == "win32":
|
|
try:
|
|
import ctypes
|
|
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
|
|
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
|
|
handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
|
|
if handle:
|
|
kernel32.CloseHandle(handle)
|
|
return True
|
|
return False
|
|
except Exception:
|
|
# Fallback : tasklist
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["tasklist", "/FI", f"PID eq {pid}", "/NH"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
return str(pid) in result.stdout
|
|
except Exception:
|
|
return False
|
|
else:
|
|
# Unix/Linux — os.kill(pid, 0) ne tue pas le process
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except (OSError, ProcessLookupError):
|
|
return False
|
|
|
|
|
|
def _acquire_lock() -> bool:
|
|
"""Tente d'acquérir le verrou PID. Retourne False si une autre instance tourne."""
|
|
my_pid = os.getpid()
|
|
|
|
# Lire le PID existant
|
|
if os.path.isfile(LOCK_FILE):
|
|
try:
|
|
with open(LOCK_FILE, "r", encoding="utf-8") as f:
|
|
old_pid = int(f.read().strip())
|
|
# Le PID dans le lock est-il encore vivant ?
|
|
if old_pid != my_pid and _pid_is_alive(old_pid):
|
|
return False # Une autre instance tourne déjà
|
|
except (ValueError, OSError):
|
|
pass # Fichier corrompu — on l'écrase
|
|
|
|
# Écrire notre PID
|
|
try:
|
|
with open(LOCK_FILE, "w", encoding="utf-8") as f:
|
|
f.write(str(my_pid))
|
|
except OSError:
|
|
pass # Pas bloquant — on continue sans lock
|
|
return True
|
|
|
|
|
|
def _release_lock():
|
|
"""Supprime le fichier lock au shutdown."""
|
|
try:
|
|
if os.path.isfile(LOCK_FILE):
|
|
with open(LOCK_FILE, "r", encoding="utf-8") as f:
|
|
stored_pid = int(f.read().strip())
|
|
# Ne supprimer que si c'est bien NOTRE lock
|
|
if stored_pid == os.getpid():
|
|
os.remove(LOCK_FILE)
|
|
except (ValueError, OSError):
|
|
pass
|
|
|
|
|
|
# Vérification du lock AVANT toute initialisation lourde
|
|
if not _acquire_lock():
|
|
# Une autre instance de Léa tourne déjà — on quitte silencieusement
|
|
sys.exit(0)
|
|
|
|
atexit.register(_release_lock)
|
|
|
|
# Charger config.txt et .env comme variables d'environnement
|
|
# (équivalent du `set` dans Lea.bat, mais fonctionne aussi sans le .bat)
|
|
for config_file in ("config.txt", ".env"):
|
|
config_path = os.path.join(current_dir, config_file)
|
|
if os.path.isfile(config_path):
|
|
with open(config_path, encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if "=" in line:
|
|
key, _, value = line.partition("=")
|
|
key = key.strip()
|
|
value = value.strip()
|
|
if key and value and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
# Configurer le logging dans un fichier (fonctionne même avec pythonw.exe)
|
|
import logging
|
|
log_path = os.path.join(current_dir, "agent_debug.log")
|
|
logging.basicConfig(
|
|
filename=log_path,
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
)
|
|
logging.info("=== Agent V1 démarrage — config chargée (PID %d) ===", os.getpid())
|
|
logging.info("RPA_SERVER_URL=%s", os.environ.get("RPA_SERVER_URL", "(non défini)"))
|
|
logging.info("RPA_SERVER_HOST=%s", os.environ.get("RPA_SERVER_HOST", "(non défini)"))
|
|
logging.info("RPA_API_TOKEN=%s", os.environ.get("RPA_API_TOKEN", "(non défini)")[:8] + "...")
|
|
logging.info("RPA_BLUR_SENSITIVE=%s", os.environ.get("RPA_BLUR_SENSITIVE", "(non défini)"))
|
|
|
|
try:
|
|
from agent_v1.main import main
|
|
if __name__ == "__main__":
|
|
main()
|
|
except ImportError as e:
|
|
logging.error("Erreur d'importation : %s", e)
|
|
print(f"Erreur d'importation : {e}")
|
|
except Exception as e:
|
|
logging.error("Erreur fatale : %s", e, exc_info=True)
|