# run_agent_v1.py import sys import os import atexit # Ajout du répertoire courant au PYTHONPATH pour permettre les imports de modules current_dir = os.path.dirname(os.path.abspath(__file__)) if current_dir not in sys.path: sys.path.append(current_dir) # --------------------------------------------------------------- # Verrou PID — empêche le lancement de plusieurs instances # Même si Lea.bat est double-cliqué ou lancé deux fois, # un seul agent tourne à la fois (defense-in-depth). # --------------------------------------------------------------- LOCK_FILE = os.path.join(current_dir, "lea_agent.lock") def _pid_is_alive(pid: int) -> bool: """Vérifie si un processus avec ce PID existe encore (Windows + Unix).""" if sys.platform == "win32": try: import ctypes kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid) if handle: kernel32.CloseHandle(handle) return True return False except Exception: # Fallback : tasklist try: import subprocess result = subprocess.run( ["tasklist", "/FI", f"PID eq {pid}", "/NH"], capture_output=True, text=True, timeout=5, ) return str(pid) in result.stdout except Exception: return False else: # Unix/Linux — os.kill(pid, 0) ne tue pas le process try: os.kill(pid, 0) return True except (OSError, ProcessLookupError): return False def _acquire_lock() -> bool: """Tente d'acquérir le verrou PID. Retourne False si une autre instance tourne.""" my_pid = os.getpid() # Lire le PID existant if os.path.isfile(LOCK_FILE): try: with open(LOCK_FILE, "r", encoding="utf-8") as f: old_pid = int(f.read().strip()) # Le PID dans le lock est-il encore vivant ? if old_pid != my_pid and _pid_is_alive(old_pid): return False # Une autre instance tourne déjà except (ValueError, OSError): pass # Fichier corrompu — on l'écrase # Écrire notre PID try: with open(LOCK_FILE, "w", encoding="utf-8") as f: f.write(str(my_pid)) except OSError: pass # Pas bloquant — on continue sans lock return True def _release_lock(): """Supprime le fichier lock au shutdown.""" try: if os.path.isfile(LOCK_FILE): with open(LOCK_FILE, "r", encoding="utf-8") as f: stored_pid = int(f.read().strip()) # Ne supprimer que si c'est bien NOTRE lock if stored_pid == os.getpid(): os.remove(LOCK_FILE) except (ValueError, OSError): pass # Vérification du lock AVANT toute initialisation lourde if not _acquire_lock(): # Une autre instance de Léa tourne déjà — on quitte silencieusement sys.exit(0) atexit.register(_release_lock) # Charger config.txt et .env comme variables d'environnement # (équivalent du `set` dans Lea.bat, mais fonctionne aussi sans le .bat) for config_file in ("config.txt", ".env"): config_path = os.path.join(current_dir, config_file) if os.path.isfile(config_path): with open(config_path, encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: key, _, value = line.partition("=") key = key.strip() value = value.strip() if key and value and key not in os.environ: os.environ[key] = value # Configurer le logging dans un fichier (fonctionne même avec pythonw.exe) import logging log_path = os.path.join(current_dir, "agent_debug.log") logging.basicConfig( filename=log_path, level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) logging.info("=== Agent V1 démarrage — config chargée (PID %d) ===", os.getpid()) logging.info("RPA_SERVER_URL=%s", os.environ.get("RPA_SERVER_URL", "(non défini)")) logging.info("RPA_SERVER_HOST=%s", os.environ.get("RPA_SERVER_HOST", "(non défini)")) logging.info("RPA_API_TOKEN=%s", os.environ.get("RPA_API_TOKEN", "(non défini)")[:8] + "...") logging.info("RPA_BLUR_SENSITIVE=%s", os.environ.get("RPA_BLUR_SENSITIVE", "(non défini)")) try: from agent_v1.main import main if __name__ == "__main__": main() except ImportError as e: logging.error("Erreur d'importation : %s", e) print(f"Erreur d'importation : {e}") except Exception as e: logging.error("Erreur fatale : %s", e, exc_info=True)