diff --git a/agent_v0/agent_v1/core/executor.py b/agent_v0/agent_v1/core/executor.py index 38813c5a0..24475925c 100644 --- a/agent_v0/agent_v1/core/executor.py +++ b/agent_v0/agent_v1/core/executor.py @@ -585,6 +585,7 @@ class ActionExecutorV1: pass result["success"] = False result["error"] = f"Fenêtre incorrecte: '{current_title}' (attendu: '{expected_title}')" + result["warning"] = "wrong_window" return result else: logger.info(f"[LEA] Pré-vérif OK : '{current_title}'") @@ -818,6 +819,7 @@ class ActionExecutorV1: f"Post-vérif échouée : fenêtre '{post_title}' " f"au lieu de '{expected_after}'" ) + result["warning"] = "wrong_window" print( f" [POST-VÉRIF] STOP STRICT — l'étape ne s'est " f"pas déroulée comme prévu, arrêt du replay" diff --git a/agent_v0/deploy/windows_client/agent_v1/core/executor.py b/agent_v0/deploy/windows_client/agent_v1/core/executor.py index 22b0c6e4c..78becf6cc 100644 --- a/agent_v0/deploy/windows_client/agent_v1/core/executor.py +++ b/agent_v0/deploy/windows_client/agent_v1/core/executor.py @@ -27,7 +27,7 @@ from ..config import MACHINE_ID as _ # noqa: F401 — side-effect import import mss from pynput.mouse import Button, Controller as MouseController -from pynput.keyboard import Controller as KeyboardController, Key +from pynput.keyboard import Controller as KeyboardController, Key, KeyCode logger = logging.getLogger(__name__) @@ -79,6 +79,8 @@ class ActionExecutorV1: self._poll_backoff_factor = 1.5 # Multiplicateur en cas d'echec # Token d'authentification API self._api_token = os.environ.get("RPA_API_TOKEN", "") + # Gestionnaire de notifications toast (pour les messages utilisateur) + self._notification_manager = None # Log de la resolution physique pour le diagnostic DPI self._log_screen_info() @@ -94,6 +96,22 @@ class ActionExecutorV1: except Exception as e: logger.debug(f"Impossible de lire la resolution ecran : {e}") + @property + def notifier(self): + """Instance NotificationManager paresseuse.""" + if self._notification_manager is None: + try: + from ..ui.notifications import NotificationManager + self._notification_manager = NotificationManager() + except Exception as e: + logger.debug(f"NotificationManager indisponible : {e}") + # Retourner un objet factice qui ne fait rien + class _Noop: + def replay_target_not_found(self, *a, **kw): + return False + self._notification_manager = _Noop() + return self._notification_manager + def _auth_headers(self) -> dict: """Headers d'authentification Bearer pour les requetes au serveur.""" if self._api_token: @@ -107,6 +125,30 @@ class ActionExecutorV1: self._sct = mss.mss() return self._sct + @staticmethod + def _describe_target(target_spec: dict) -> str: + """Construire une description humaine de la cible depuis target_spec. + + Utilisé pour les notifications et le logging quand la cible n'est pas trouvée. + """ + parts = [] + by_text = target_spec.get("by_text", "").strip() + window = target_spec.get("window_title", "").strip() + if by_text: + parts.append(f"'{by_text}'") + if window: + parts.append(f"dans {window}") + if not parts: + # Fallback sur la vlm_description + vlm = target_spec.get("vlm_description", "") + if vlm: + parts.append(vlm[:60]) + else: + parts.append("un élément") + if parts: + return " ".join(parts) + return "élément inconnu" + # ========================================================================= # Execution legacy (watchdog command.json) # ========================================================================= @@ -135,6 +177,166 @@ class ActionExecutorV1: except Exception as e: logger.error(f"Echec de l'ordre {action} : {e}") + # ========================================================================= + # Acteur intelligent — décision gemma4 quand le magnétoscope bloque + # ========================================================================= + + def _actor_decide(self, action: dict, target_spec: dict) -> str: + """Demander à gemma4 de décider quand le magnétoscope ne trouve pas la cible. + + gemma4 reçoit le contexte (action prévue, état de l'écran) et décide : + - PASSER : l'état est déjà atteint (ex: onglet déjà actif) + - EXECUTER : l'action est nécessaire mais pas trouvable automatiquement + - STOPPER : l'état est incohérent, impossible de continuer + + Appelle gemma4 en mode texte avec thinking (Docker port 11435). + Fallback : EXECUTER (pause supervisée) si gemma4 indisponible. + """ + import requests as _requests + + gemma4_port = os.environ.get("GEMMA4_PORT", "11435") + by_text = target_spec.get("by_text", "") + window_title = target_spec.get("window_title", "") + + # Récupérer le titre de la fenêtre ACTUELLE + try: + from ..window_info_crossplatform import get_active_window_info + current_info = get_active_window_info() + current_title = current_info.get("title", "") + except Exception: + current_title = "" + + prompt = ( + f"Tu es un robot RPA. L'action suivante est : cliquer sur '{by_text or 'un élément'}' " + f"dans '{window_title}'.\n" + f"La fenêtre active est \"{current_title}\".\n" + f"Dois-je faire cette action ?\n" + f"- EXECUTER : l'action est nécessaire\n" + f"- PASSER : le résultat est déjà atteint\n" + f"- STOPPER : état incohérent\n" + f"Réponds UN SEUL MOT." + ) + + try: + resp = _requests.post( + f"http://localhost:{gemma4_port}/api/chat", + json={ + "model": "gemma4:e4b", + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "think": True, + "options": {"temperature": 0.1, "num_predict": 500}, + }, + timeout=30, + ) + content = resp.json().get("message", {}).get("content", "").strip().upper() + # Extraire le mot clé + for keyword in ("PASSER", "EXECUTER", "STOPPER"): + if keyword in content: + logger.info(f"Acteur gemma4 décide : {keyword}") + return keyword + logger.warning(f"Acteur gemma4 réponse inattendue : {content[:50]}") + return "EXECUTER" + except Exception as e: + logger.warning(f"Acteur gemma4 indisponible : {e}") + return "EXECUTER" + + # ========================================================================= + # Observer — pré-analyse écran avant chaque action + # ========================================================================= + + def _observe_screen( + self, server_url: str, target_spec: dict, + screen_width: int, screen_height: int, + ) -> dict: + """Observer : analyser l'écran AVANT de résoudre la cible. + + Détecte les popups, dialogues, et états inattendus AVANT de tenter + la résolution visuelle. C'est la "pre-exploration" qui améliore + dramatiquement les performances (cf. benchmarks Claude Computer Use). + + Stratégie en 2 temps (rapide puis intelligent) : + 1. Vérification rapide locale : titre fenêtre, popup connue + 2. Si serveur disponible : envoi du screenshot pour pré-analyse VLM + + Returns: + None si écran OK (pas de problème détecté) + Dict avec screen_state ("ok"|"popup"|"unexpected"), détails, coords popup + """ + import requests as _requests + + # Étape 1 : vérification rapide locale (titre fenêtre) + try: + from ..window_info_crossplatform import get_active_window_info + current_info = get_active_window_info() + current_title = current_info.get("title", "").lower() + + # Patterns de popup/dialogue courants (Windows FR + EN) + popup_patterns = [ + "enregistrer", "sauvegarder", "voulez-vous", + "confirmer", "confirmation", "avertissement", + "erreur", "error", "warning", "alert", + "do you want", "save as", "are you sure", + ] + for pattern in popup_patterns: + if pattern in current_title: + logger.info(f"Observer : popup détectée par titre — '{current_title}'") + # On ne peut pas résoudre les coords juste par le titre + # → retourner popup sans coords, le caller fera handle_popup_vlm() + return { + "screen_state": "popup", + "popup_label": current_title, + "popup_coords": None, + "detail": f"Popup détectée par titre : {current_title}", + } + except Exception: + pass + + # Étape 2 : pré-analyse serveur (si disponible) + if not server_url: + return None # Pas de serveur → pas de pré-analyse avancée + + # Envoyer le screenshot au serveur pour détection popup via VLM + screenshot_b64 = self._capture_screenshot_b64(max_width=0, quality=60) + if not screenshot_b64: + return None + + try: + url = f"{server_url}/traces/stream/replay/pre_analyze" + from ..config import API_TOKEN + headers = {"Content-Type": "application/json"} + if API_TOKEN: + headers["Authorization"] = f"Bearer {API_TOKEN}" + + resp = _requests.post( + url, + json={ + "screenshot_b64": screenshot_b64, + "expected_state": target_spec.get("expected_state", ""), + "window_title": target_spec.get("window_title", ""), + "screen_width": screen_width, + "screen_height": screen_height, + }, + headers=headers, + timeout=10, + ) + + if resp.ok: + data = resp.json() + state = data.get("screen_state", "ok") + if state != "ok": + logger.info(f"Observer serveur : {state} — {data.get('detail', '')}") + return data + # Serveur ne supporte pas encore /pre_analyze → silencieux + except _requests.Timeout: + logger.debug("Observer : serveur timeout (10s)") + except _requests.ConnectionError: + pass # Serveur indisponible — pas grave, on continue sans + except Exception as e: + logger.debug(f"Observer : erreur serveur — {e}") + + return None # Écran OK ou pas de pré-analyse possible + # ========================================================================= # Execution replay (polling serveur) # ========================================================================= @@ -191,48 +393,248 @@ class ActionExecutorV1: x_pct = action.get("x_pct", 0.0) y_pct = action.get("y_pct", 0.0) - if visual_mode and target_spec and server_url: - resolved = self._resolve_target_visual( - server_url, target_spec, x_pct, y_pct, width, height - ) - if resolved: - x_pct = resolved["x_pct"] - y_pct = resolved["y_pct"] - result["visual_resolved"] = resolved.get("resolved", False) - if resolved.get("resolved"): - logger.info( - f"Visual resolve OK: {resolved.get('matched_element', {}).get('label', '?')} " - f"-> ({x_pct:.4f}, {y_pct:.4f})" - ) + # Extraire le nom de l'application depuis un titre de fenêtre + def _app_name(title): + for sep in [" – ", " - ", " — "]: + if sep in title: + return title.split(sep)[-1].strip().lower() + return title.strip().lower() - # ---- Hash AVANT l'action (pour verification post-action) ---- - # Seules les actions click et key_combo sont verifiees : elles - # provoquent un changement visible de l'ecran (ouverture de fenetre, - # focus, etc.). Les actions type/wait/scroll ne sont pas verifiees. + # ── Pré-vérification : titre fenêtre ── + # Vérifier que l'écran est dans l'état attendu AVANT de cliquer. + if visual_mode and target_spec: + expected_title = target_spec.get("window_title", "") + if expected_title and expected_title != "unknown_window": + from ..window_info_crossplatform import get_active_window_info + current_info = get_active_window_info() + current_title = current_info.get("title", "") + + current_app = _app_name(current_title) + expected_app = _app_name(expected_title) + title_match = ( + current_app == expected_app + or expected_title.lower() in current_title.lower() + or current_title.lower() in expected_title.lower() + ) + # Ignorer la fenêtre de Léa elle-même (overlay agent) + _lea_windows = ("léa", "lea —", "léa —", "lea assistante", "léa assistante") + is_lea_window = any(p in current_title.lower() for p in _lea_windows) + + if not title_match and not is_lea_window: + logger.warning( + f"PRÉ-VÉRIF ÉCHOUÉE : attendu '{expected_title}', " + f"actuel '{current_title}' — STOP" + ) + print(f" [PRÉ-VÉRIF] STOP — fenêtre '{current_title}' ≠ attendu '{expected_title}'") + result["success"] = False + result["error"] = f"Fenêtre incorrecte: '{current_title}' (attendu: '{expected_title}')" + result["warning"] = "wrong_window" + return result + elif is_lea_window: + logger.info(f"PRÉ-VÉRIF : fenêtre Léa détectée, ignorée — on continue") + else: + logger.info(f"PRÉ-VÉRIF OK : '{current_title}'") + + # ── OBSERVER : pré-analyse écran avant résolution ── + # Détecte popups, dialogues, états inattendus AVANT de chercher la cible. + # Si un problème est détecté, on le gère tout de suite (pas après l'échec). + # Ref: docs/VISION_RPA_INTELLIGENT.md — "Il observe" + if visual_mode and target_spec and action_type == "click": + observation = self._observe_screen(server_url, target_spec, width, height) + if observation: + obs_state = observation.get("screen_state", "ok") + + if obs_state == "popup": + # Popup détectée AVANT la résolution — la fermer + popup_label = observation.get("popup_label", "popup") + popup_coords = observation.get("popup_coords") + print(f" [OBSERVER] Popup détectée : '{popup_label}' — fermeture") + logger.info(f"Observer : popup '{popup_label}' détectée avant résolution") + if popup_coords: + real_x = int(popup_coords["x_pct"] * width) + real_y = int(popup_coords["y_pct"] * height) + self._click((real_x, real_y), "left") + time.sleep(1.0) + print(f" [OBSERVER] Popup fermée — reprise du flow normal") + else: + # Pas de coordonnées → fallback sur handle_popup_vlm classique + self._handle_popup_vlm() + + elif obs_state == "unexpected": + # État inattendu (pas la bonne page/écran) + detail = observation.get("detail", "état inattendu") + print(f" [OBSERVER] État inattendu : {detail}") + logger.warning(f"Observer : état inattendu — {detail}") + # Demander à l'acteur (gemma4) de décider + decision = self._actor_decide(action, target_spec) + if decision == "STOPPER": + result["success"] = False + result["error"] = f"observer_unexpected:{detail}" + return result + elif decision == "PASSER": + result["success"] = True + result["warning"] = "observer_skip" + return result + # EXECUTER → continuer normalement + + if visual_mode and target_spec and server_url: + # ── GROUNDING : localisation pure via GroundingEngine ── + from .grounding import GroundingEngine + grounding = GroundingEngine(self) + grounding_result = grounding.locate( + server_url, target_spec, x_pct, y_pct, width, height, + ) + if grounding_result.found: + x_pct = grounding_result.x_pct + y_pct = grounding_result.y_pct + result["visual_resolved"] = True + result["resolution_method"] = grounding_result.method + result["resolution_score"] = grounding_result.score + result["resolution_elapsed_ms"] = grounding_result.elapsed_ms + logger.info( + f"Grounding OK [{grounding_result.method}] " + f"{grounding_result.elapsed_ms:.0f}ms : " + f"{grounding_result.detail or '?'} " + f"-> ({x_pct:.4f}, {y_pct:.4f})" + ) + + # ---- Screenshot + hash AVANT l'action (pour le Critic post-action) ---- + # Le serveur utilise screenshot_before + screenshot_after pour évaluer + # si l'action a eu l'effet attendu (Critic sémantique VLM). needs_screen_check = action_type in ("click", "key_combo") hash_before = "" + screenshot_before_b64 = "" if needs_screen_check: hash_before = self._quick_screenshot_hash() + screenshot_before_b64 = self._capture_screenshot_b64() if action_type == "click": + # Si visual_mode est activé, le resolve DOIT réussir. + # Pas de fallback blind — on arrête le replay si la cible + # n'est pas trouvée visuellement. C'est un RPA VISUEL. + if visual_mode and not result.get("visual_resolved"): + # ── Policy : décider quoi faire quand grounding échoue ── + from .policy import PolicyEngine, Decision + policy = PolicyEngine(self) + target_desc = self._describe_target(target_spec) + retry_count = action.get("_retry_count", 0) + + policy_decision = policy.decide( + action=action, target_spec=target_spec, + retry_count=retry_count, max_retries=1, + ) + print( + f" [POLICY] {policy_decision.decision.value} — " + f"{policy_decision.reason}" + ) + logger.info( + f"Action {action_id} : Policy → {policy_decision.decision.value} " + f"({policy_decision.reason})" + ) + + if policy_decision.decision == Decision.RETRY: + # Re-tenter le grounding après correction (popup fermée, etc.) + resolved2 = self._resolve_target_visual( + server_url, target_spec, x_pct, y_pct, width, height + ) + if resolved2 and resolved2.get("resolved"): + x_pct = resolved2["x_pct"] + y_pct = resolved2["y_pct"] + result["visual_resolved"] = True + print(f" [POLICY] Re-resolve OK après {policy_decision.action_taken}") + else: + # Re-resolve échoué — SUPERVISE (rendre la main) + result["success"] = False + result["error"] = "target_not_found" + result["target_description"] = target_desc + result["target_spec"] = target_spec + result["screenshot"] = self._capture_screenshot_b64() + result["warning"] = "visual_resolve_failed" + self.notifier.replay_target_not_found(target_desc) + return result + + elif policy_decision.decision == Decision.SKIP: + result["success"] = True + result["warning"] = "policy_skip" + return result + + elif policy_decision.decision == Decision.ABORT: + result["success"] = False + result["error"] = f"policy_abort:{target_desc}" + self.notifier.replay_target_not_found(target_desc) + return result + + else: # SUPERVISE ou CONTINUE + result["success"] = False + result["error"] = "target_not_found" + result["target_description"] = target_desc + result["target_spec"] = target_spec + result["screenshot"] = self._capture_screenshot_b64() + result["warning"] = "visual_resolve_failed" + self.notifier.replay_target_not_found(target_desc) + return result + real_x = int(x_pct * width) real_y = int(y_pct * height) button = action.get("button", "left") - mode = "VISUAL" if result["visual_resolved"] else "BLIND" + mode = "VISUAL" if result.get("visual_resolved") else "COORD" print( f" [CLICK] [{mode}] ({x_pct:.3f}, {y_pct:.3f}) -> " f"({real_x}, {real_y}) sur ({width}x{height}), bouton={button}" ) self._click((real_x, real_y), button) - print(f" [CLICK] Termine.") + # Phase 1 apprentissage : exposer les coordonnées RÉSOLUES + # utilisées pour le clic. Le serveur (/replay/result) les lit + # directement comme source de vérité pour la mémoire. + # On donne des percentages car la mémoire est indépendante + # de la résolution écran du client. + result["actual_position"] = { + "x_pct": float(x_pct), + "y_pct": float(y_pct), + } logger.info( f"Replay click [{mode}] : ({x_pct:.3f}, {y_pct:.3f}) -> " f"({real_x}, {real_y}) sur ({width}x{height})" ) + # ── Post-vérification : polling du titre fenêtre ── + # On attend que le titre change vers celui attendu (max 10s) + # C'est 100% visuel — pas de wait fixe arbitraire + expected_after = action.get("expected_window_title", "") + if expected_after: + from ..window_info_crossplatform import get_active_window_info + max_wait = 10.0 + poll_interval = 0.3 + elapsed_wait = 0.0 + matched = False + while elapsed_wait < max_wait: + time.sleep(poll_interval) + elapsed_wait += poll_interval + post_info = get_active_window_info() + post_title = post_info.get("title", "") + post_app = _app_name(post_title) + expected_app_after = _app_name(expected_after) + if (post_app == expected_app_after + or expected_after.lower() in post_title.lower() + or post_title.lower() in expected_after.lower()): + matched = True + break + if matched: + print(f" [POST-VÉRIF] OK en {elapsed_wait:.1f}s — '{post_title}'") + logger.info(f"POST-VÉRIF OK en {elapsed_wait:.1f}s : '{post_title}'") + else: + print(f" [POST-VÉRIF] TIMEOUT {max_wait}s — '{post_title}' ≠ '{expected_after}'") + logger.warning(f"POST-VÉRIF TIMEOUT : '{post_title}' ≠ '{expected_after}'") + result["warning"] = f"post_verif_timeout:{post_title}" + else: + print(f" [CLICK] Terminé.") + elif action_type == "type": text = action.get("text", "") + raw_keys = action.get("raw_keys") print(f" [TYPE] Texte: '{text[:50]}' ({len(text)} chars)") + if raw_keys: + print(f" [TYPE] raw_keys disponibles ({len(raw_keys)} events) — replay exact") # Cliquer sur le champ avant de taper (si coordonnees disponibles) if x_pct > 0 and y_pct > 0: real_x = int(x_pct * width) @@ -240,16 +642,26 @@ class ActionExecutorV1: print(f" [TYPE] Clic prealable sur ({real_x}, {real_y})") self._click((real_x, real_y), "left") time.sleep(0.3) - self._type_text(text) + if raw_keys: + self._replay_raw_keys(raw_keys) + else: + # Fallback copier-coller (anciens enregistrements sans raw_keys) + self._type_text(text) print(f" [TYPE] Termine.") - logger.info(f"Replay type : '{text[:30]}...' ({len(text)} chars)") + logger.info(f"Replay type : '{text[:30]}...' ({len(text)} chars, raw_keys={'oui' if raw_keys else 'non'})") elif action_type == "key_combo": keys = action.get("keys", []) + raw_keys = action.get("raw_keys") print(f" [KEY_COMBO] Touches: {keys}") - self._execute_key_combo(keys) + if raw_keys: + print(f" [KEY_COMBO] raw_keys disponibles ({len(raw_keys)} events) — replay exact") + self._replay_raw_keys(raw_keys) + else: + # Fallback (anciens enregistrements sans raw_keys) + self._execute_key_combo(keys) print(f" [KEY_COMBO] Termine.") - logger.info(f"Replay key_combo : {keys}") + logger.info(f"Replay key_combo : {keys} (raw_keys={'oui' if raw_keys else 'non'})") elif action_type == "scroll": real_x = int(x_pct * width) if x_pct > 0 else int(0.5 * width) @@ -295,34 +707,41 @@ class ActionExecutorV1: result["success"] = True + # Stocker le screenshot_before pour le Critic côté serveur + if screenshot_before_b64: + result["screenshot_before"] = screenshot_before_b64 + # ---- Verification post-action : l'ecran a-t-il change ? ---- + # Verifie UNIQUEMENT, ne tente PAS de gerer les popups + # (Enter/Escape perturbent l'application). + # Signale l'echec honnêtement — le serveur decide du retry. if needs_screen_check and hash_before: screen_changed = self._wait_for_screen_change( - hash_before, timeout_ms=5000 + hash_before, timeout_ms=3000 ) if not screen_changed: - # Ecran inchange — tenter de gerer une popup imprevue - # (dialogue de confirmation, erreur, etc.) - popup_handled = self._handle_possible_popup() - if popup_handled: - result["warning"] = "popup_handled" - print( - f" [OK] Popup geree automatiquement apres {action_type}" - ) - logger.info( - f"Action {action_id} ({action_type}) : popup geree " - f"automatiquement" - ) - else: - result["warning"] = "no_screen_change" - print( - f" [WARN] Ecran inchange apres {action_type} — " - f"l'action n'a peut-etre pas eu d'effet" - ) - logger.warning( - f"Action {action_id} ({action_type}) : ecran inchange " - f"apres 5s — possible echec silencieux" - ) + # ── Recovery : tenter un rollback si l'action n'a pas eu d'effet ── + from .recovery import RecoveryEngine + recovery = RecoveryEngine(self) + recovery_result = recovery.attempt( + failed_action=action, + critic_detail="L'écran n'a pas changé après l'action", + ) + if recovery_result.success: + print(f" [RECOVERY] {recovery_result.detail}") + result["recovery"] = recovery_result.to_dict() + + result["success"] = False + result["warning"] = "no_screen_change" + result["error"] = "Ecran inchange apres l'action" + print( + f" [ECHEC] Ecran inchange apres {action_type} — " + f"l'action n'a pas eu d'effet visible" + ) + logger.warning( + f"Action {action_id} ({action_type}) : ecran inchange " + f"— action sans effet visible" + ) else: print(f" [OK] Changement d'ecran detecte apres {action_type}") else: @@ -343,64 +762,439 @@ class ActionExecutorV1: fallback_x: float, fallback_y: float, screen_width: int, screen_height: int, ) -> dict: - """ - Envoyer un screenshot au serveur pour resolution visuelle de la cible. + """Résoudre la position d'un clic visuellement. - Capture l'ecran en resolution native (pas de downscale, necessaire pour - le template matching precis cross-resolution), l'encode en base64 JPEG, - et POST au endpoint /replay/resolve_target. Retourne les coordonnees resolues. + Stratégie en cascade — compréhension sémantique d'abord : + 1. Serveur resolve_target (SomEngine + VLM) — comprend CE QU'ON CHERCHE + 2. Template matching local (fallback rapide si serveur indisponible) + 3. VLM local (fallback dev/test Linux) + + Le template matching compare des pixels et donne des faux positifs quand + l'écran n'est pas dans le même état que l'enregistrement. Le SomEngine + comprend sémantiquement les éléments UI (bouton, menu, texte) et trouve + le bon élément peu importe l'état de l'écran. """ - import requests + import time as _time + t_start = _time.time() + + screenshot_b64 = self._capture_screenshot_b64(max_width=0, quality=75) + if not screenshot_b64: + logger.warning("Capture screenshot echouee pour visual resolve") + return None + + def _with_metrics(result, method_override=None): + """Enrichir le résultat avec les métriques de résolution.""" + if result is None: + return None + elapsed_ms = (_time.time() - t_start) * 1000 + result["resolution_method"] = method_override or result.get("method", "unknown") + result["resolution_score"] = result.get("score", 0.0) + result["resolution_elapsed_ms"] = round(elapsed_ms, 1) + return result + + # ---- ÉTAPE 1 : Résolution serveur (SomEngine + VLM) ---- + # Le serveur comprend sémantiquement ce qu'on cherche. Pas de faux positifs. + if server_url: + server_result = self._server_resolve_target( + server_url, screenshot_b64, target_spec, + fallback_x, fallback_y, screen_width, screen_height, + ) + if server_result and server_result.get("resolved"): + return _with_metrics(server_result) + + # ---- ÉTAPE 2 : Template matching local (fallback si serveur down) ---- + anchor_b64 = target_spec.get("anchor_image_base64", "") + if anchor_b64: + tm_result = self._template_match_anchor(screenshot_b64, anchor_b64, screen_width, screen_height) + if tm_result and tm_result.get("resolved"): + return _with_metrics(tm_result) + + # ---- ÉTAPE 3 : VLM local (fallback dev/test Linux) ---- + by_text = target_spec.get("by_text", "") + vlm_description = target_spec.get("vlm_description", "") + if vlm_description or by_text: + hybrid_result = self._hybrid_vlm_resolve( + screenshot_b64, target_spec, screen_width, screen_height + ) + if hybrid_result and hybrid_result.get("resolved"): + return _with_metrics(hybrid_result) + + print(" [VISUAL] Toutes les méthodes ont échoué") + return None + + def _server_resolve_target( + self, server_url: str, screenshot_b64: str, target_spec: dict, + fallback_x: float, fallback_y: float, + screen_width: int, screen_height: int, + ) -> dict: + """Résolution visuelle via le serveur (SomEngine + VLM sur GPU). + + Le serveur dispose de SomEngine (YOLO + docTR) et du VLM (qwen3-vl). + L'agent envoie le screenshot + target_spec, le serveur résout et + retourne les coordonnées. + """ + import requests as _requests + from ..config import API_TOKEN + + url = f"{server_url}/traces/stream/replay/resolve_target" + payload = { + "session_id": "", + "screenshot_b64": screenshot_b64, + "target_spec": target_spec, + "fallback_x_pct": fallback_x, + "fallback_y_pct": fallback_y, + "screen_width": screen_width, + "screen_height": screen_height, + "strict_mode": True, + } + headers = {"Content-Type": "application/json"} + if API_TOKEN: + headers["Authorization"] = f"Bearer {API_TOKEN}" try: - # Capturer à résolution native pour le template matching - # (le downscale nuit à la précision du matching quand les - # résolutions d'apprentissage et de replay diffèrent) - screenshot_b64 = self._capture_screenshot_b64( - max_width=0, - quality=75, - ) - if not screenshot_b64: - logger.warning("Capture screenshot echouee pour visual resolve") + print(f" [SERVER-RESOLVE] Appel serveur {server_url}...") + resp = _requests.post(url, json=payload, headers=headers, timeout=30) + if not resp.ok: + logger.warning(f"Server resolve HTTP {resp.status_code}") return None - print( - f" [VISUAL] Envoi screenshot ({len(screenshot_b64) // 1024} Ko) " - f"au serveur pour resolution..." - ) + data = resp.json() + resolved = data.get("resolved", False) + method = data.get("method", "server_unknown") - # Appel au serveur - resolve_url = f"{server_url}/traces/stream/replay/resolve_target" - payload = { - "session_id": "", # Pas critique pour la resolution - "screenshot_b64": screenshot_b64, - "target_spec": target_spec, - "fallback_x_pct": fallback_x, - "fallback_y_pct": fallback_y, - "screen_width": screen_width, - "screen_height": screen_height, - "strict_mode": True, # Replay = seuil strict 0.90 + YOLO - } - - resp = requests.post(resolve_url, json=payload, headers=self._auth_headers(), timeout=60) - if resp.ok: - data = resp.json() - method = data.get("method", "?") - resolved = data.get("resolved", False) + if resolved: print( - f" [VISUAL] Reponse serveur : resolved={resolved}, " - f"method={method}, score={data.get('score', 'N/A')}" + f" [SERVER-RESOLVE] OK [{method}] " + f"→ ({data.get('x_pct', 0):.3f}, {data.get('y_pct', 0):.3f}) " + f"score={data.get('score', 0):.2f}" ) - return data + logger.info(f"Server resolve OK [{method}] score={data.get('score', 0):.2f}") else: - logger.warning(f"Visual resolve HTTP {resp.status_code}: {resp.text[:200]}") - return None + reason = data.get("reason", "unknown") + print(f" [SERVER-RESOLVE] Échec ({reason})") + logger.info(f"Server resolve échoué : {reason}") - except requests.exceptions.Timeout: - logger.warning("Visual resolve timeout (30s)") + return data + + except _requests.Timeout: + print(" [SERVER-RESOLVE] Timeout (30s)") + logger.warning("Server resolve timeout") return None except Exception as e: - logger.warning(f"Visual resolve echoue: {e}") + print(f" [SERVER-RESOLVE] Erreur : {e}") + logger.warning(f"Server resolve erreur : {e}") + return None + + def _template_match_anchor( + self, screenshot_b64: str, anchor_b64: str, + screen_width: int, screen_height: int, + ) -> dict: + """Template matching direct avec le crop anchor (image de référence). + + Le crop anchor est une capture de l'élément UI lors de l'enregistrement. + Si l'UI est identique (même résolution, même thème), le match est + quasi-parfait et très rapide (~10ms). + """ + import cv2 + import numpy as np + + try: + # Décoder les deux images + scr_bytes = base64.b64decode(screenshot_b64) + scr_array = np.frombuffer(scr_bytes, dtype=np.uint8) + screenshot = cv2.imdecode(scr_array, cv2.IMREAD_GRAYSCALE) + + anc_bytes = base64.b64decode(anchor_b64) + anc_array = np.frombuffer(anc_bytes, dtype=np.uint8) + anchor = cv2.imdecode(anc_array, cv2.IMREAD_GRAYSCALE) + + if screenshot is None or anchor is None: + return None + if anchor.shape[0] >= screenshot.shape[0] or anchor.shape[1] >= screenshot.shape[1]: + return None + + result = cv2.matchTemplate(screenshot, anchor, cv2.TM_CCOEFF_NORMED) + _, max_val, _, max_loc = cv2.minMaxLoc(result) + + print(f" [ANCHOR-TM] Score={max_val:.3f}") + + # Seuil élevé : le crop anchor doit matcher très bien + if max_val >= 0.80: + # Centre du match en pixels + cx = max_loc[0] + anchor.shape[1] // 2 + cy = max_loc[1] + anchor.shape[0] // 2 + # Convertir en pourcentages par rapport au screenshot décodé + x_pct = cx / screenshot.shape[1] + y_pct = cy / screenshot.shape[0] + + print( + f" [ANCHOR-TM] TROUVÉ ({x_pct:.3f}, {y_pct:.3f}) " + f"score={max_val:.3f}" + ) + logger.info( + f"[ANCHOR-TM] Match anchor à ({x_pct:.3f}, {y_pct:.3f}) " + f"score={max_val:.3f}" + ) + return { + "resolved": True, + "method": "anchor_template", + "x_pct": x_pct, + "y_pct": y_pct, + "score": max_val, + } + + except Exception as e: + print(f" [ANCHOR-TM] Erreur: {e}") + logger.warning(f"[ANCHOR-TM] Erreur: {e}") + + return None + + def _hybrid_vlm_resolve( + self, screenshot_b64: str, target_spec: dict, + screen_width: int, screen_height: int, + ) -> dict: + """Approche hybride : le VLM identifie l'élément, le template matching le localise. + + Le VLM décrit quel élément il voit (texte du bouton/label) et le + template matching avec rendu texte localise sa position exacte. + + Utile quand le crop anchor ne matche plus (changement de thème, + résolution différente, etc.) mais le texte du bouton est identique. + """ + import requests as _requests + + by_text = target_spec.get("by_text", "") + vlm_description = target_spec.get("vlm_description", "") + + # Si on a déjà le texte cible (by_text), essayer directement le template matching texte + if by_text: + position = self._find_text_on_screen(screenshot_b64, by_text) + if position: + x_pct = position[0] / screen_width if screen_width > 0 else 0 + y_pct = position[1] / screen_height if screen_height > 0 else 0 + # Recalculer par rapport à l'image décodée, pas l'écran + import cv2 + import numpy as np + img_bytes = base64.b64decode(screenshot_b64) + img_array = np.frombuffer(img_bytes, dtype=np.uint8) + img = cv2.imdecode(img_array, cv2.IMREAD_GRAYSCALE) + if img is not None: + x_pct = position[0] / img.shape[1] + y_pct = position[1] / img.shape[0] + print( + f" [HYBRID] by_text '{by_text}' trouvé directement " + f"({x_pct:.3f}, {y_pct:.3f})" + ) + return { + "resolved": True, + "method": "hybrid_text_direct", + "x_pct": x_pct, + "y_pct": y_pct, + "score": 0.9, + } + + # Sinon, demander au VLM d'identifier l'élément + if not vlm_description: + return None + + ollama_host = os.environ.get("RPA_SERVER_HOST", "localhost") + ollama_url = f"http://{ollama_host}:11434/api/chat" + + prompt = ( + f"Look at this screenshot. {vlm_description}\n" + "What is the exact text label of this element? " + "Answer ONLY the text visible on the element (button text, label, menu item)." + ) + # Prefill pour les modèles thinking (qwen3) — skip la phase de réflexion + _vlm_model_ident = os.environ.get("RPA_VLM_MODEL", "gemma4:e4b") + _is_thinking_ident = "qwen3" in _vlm_model_ident.lower() + + messages_ident = [ + { + "role": "system", + "content": "You read text from UI screenshots. Answer briefly with just the text.", + }, + {"role": "user", "content": prompt, "images": [screenshot_b64]}, + ] + if _is_thinking_ident: + messages_ident.append({"role": "assistant", "content": "The text is: "}) + + payload = { + "model": _vlm_model_ident, + "messages": messages_ident, + "stream": False, + "think": False, + "options": {"temperature": 0.1, "num_predict": 30, "num_ctx": 8192}, + } + + try: + print(f" [HYBRID] Appel VLM pour identification élément...") + start = time.time() + resp = _requests.post(ollama_url, json=payload, timeout=20) + elapsed = time.time() - start + + if not resp.ok: + print(f" [HYBRID] VLM HTTP {resp.status_code} ({elapsed:.1f}s)") + return None + + raw = resp.json().get("message", {}).get("content", "") + element_text = raw.strip().strip('"').strip("'").strip(".") + print(f" [HYBRID] VLM identifie : '{element_text}' ({elapsed:.1f}s)") + + if not element_text or len(element_text) > 50: + return None + + # Localiser ce texte sur le screenshot + position = self._find_text_on_screen(screenshot_b64, element_text) + + # Essayer des variantes de casse + if not position: + for variant in [element_text.upper(), element_text.lower(), + element_text.capitalize(), element_text.title()]: + if variant == element_text: + continue + position = self._find_text_on_screen(screenshot_b64, variant) + if position: + break + + if not position: + print(f" [HYBRID] '{element_text}' identifié mais non localisé") + return None + + # Convertir pixels en pourcentages (par rapport au screenshot décodé) + import cv2 + import numpy as np + img_bytes = base64.b64decode(screenshot_b64) + img_array = np.frombuffer(img_bytes, dtype=np.uint8) + img = cv2.imdecode(img_array, cv2.IMREAD_GRAYSCALE) + if img is None: + return None + x_pct = position[0] / img.shape[1] + y_pct = position[1] / img.shape[0] + + print( + f" [HYBRID] TROUVÉ '{element_text}' à ({x_pct:.3f}, {y_pct:.3f})" + ) + logger.info( + f"[HYBRID] Élément '{element_text}' trouvé à ({x_pct:.3f}, {y_pct:.3f}) " + f"[VLM identifie + template matching localise]" + ) + return { + "resolved": True, + "method": "hybrid_vlm_text", + "x_pct": x_pct, + "y_pct": y_pct, + "score": 0.85, + "matched_element": {"label": element_text}, + } + + except _requests.exceptions.Timeout: + print(" [HYBRID] Timeout VLM 20s") + return None + except Exception as e: + print(f" [HYBRID] Erreur: {e}") + return None + + def _vlm_direct_resolve(self, screenshot_b64: str, target_spec: dict) -> dict: + """Appeler Ollama directement pour trouver l'élément à l'écran (legacy). + + Demande des coordonnées JSON au VLM. Peu fiable avec qwen3-vl:8b + qui retourne souvent des coordonnées incorrectes ou du JSON malformé. + Gardé comme dernier recours après les méthodes template matching et hybride. + """ + import requests as _requests + import json as _json + import re + + anchor_b64 = target_spec.get("anchor_image_base64", "") + vlm_description = target_spec.get("vlm_description", "") + by_text = target_spec.get("by_text", "") + window_title = target_spec.get("window_title", "") + + if not anchor_b64 and not vlm_description: + return None + + # Prompt simple et direct — le VLM doit retourner x_pct et y_pct + if anchor_b64 and vlm_description: + prompt = f"""Look at the first image (screenshot). The second image shows a UI element. +{vlm_description} +Where is this element on the screenshot? Give the center x,y as percentage (0.0 to 1.0). +Example: x_pct=0.50, y_pct=0.30""" + elif vlm_description: + prompt = f"""{vlm_description} +Where is this element? Give center x,y as percentage (0.0 to 1.0). +Example: x_pct=0.50, y_pct=0.30""" + else: + prompt = """The second image shows a UI element. Find it on the first image (screenshot). +Give the center x,y as percentage (0.0 to 1.0). +Example: x_pct=0.50, y_pct=0.30""" + + images = [screenshot_b64] + if anchor_b64: + images.append(anchor_b64) + + ollama_host = os.environ.get("RPA_SERVER_HOST", "localhost") + ollama_url = f"http://{ollama_host}:11434/api/chat" + + # Prefill pour les modèles thinking (qwen3) — évite le mode réflexion >180s + _vlm_model = os.environ.get("RPA_VLM_MODEL", "gemma4:e4b") + _is_thinking = "qwen3" in _vlm_model.lower() + prefill = '{"x_pct": 0.' if _is_thinking else "" + + messages = [ + {"role": "system", "content": "You locate UI elements on screenshots. Reply with JSON only: {\"x_pct\": 0.XX, \"y_pct\": 0.XX, \"confidence\": 0.XX}"}, + {"role": "user", "content": prompt, "images": images}, + ] + if prefill: + messages.append({"role": "assistant", "content": prefill}) + + payload = { + "model": _vlm_model, + "messages": messages, + "stream": False, + "think": False, + "options": {"temperature": 0.1, "num_predict": 60, "num_ctx": 8192}, + } + + try: + print(f" [VLM-DIRECT] Appel Ollama ({ollama_host}:11434)...") + start = time.time() + resp = _requests.post(ollama_url, json=payload, timeout=30) + elapsed = time.time() - start + + if not resp.ok: + print(f" [VLM-DIRECT] HTTP {resp.status_code} ({elapsed:.1f}s)") + return None + + raw_content = resp.json().get("message", {}).get("content", "") + content = prefill + raw_content + print(f" [VLM-DIRECT] Réponse en {elapsed:.1f}s : {content[:100]}") + + # Parser JSON + match = re.search(r'\{[^}]+\}', content) + if not match: + return None + data = _json.loads(match.group()) + + x = data.get("x_pct") + y = data.get("y_pct") + conf = data.get("confidence", 0) + + if x is None or y is None or conf < 0.3: + print(f" [VLM-DIRECT] Non trouvé (conf={conf})") + return None + if not (0.0 <= x <= 1.0 and 0.0 <= y <= 1.0): + print(f" [VLM-DIRECT] Hors limites ({x}, {y})") + return None + + print(f" [VLM-DIRECT] TROUVÉ ({x:.3f}, {y:.3f}) conf={conf:.2f} en {elapsed:.1f}s") + return {"resolved": True, "method": "vlm_direct", "x_pct": x, "y_pct": y, "score": conf} + + except _requests.exceptions.Timeout: + print(" [VLM-DIRECT] Timeout 30s") + return None + except Exception as e: + print(f" [VLM-DIRECT] Erreur: {e}") return None def poll_and_execute(self, session_id: str, server_url: str, machine_id: str = "default") -> bool: @@ -509,6 +1303,16 @@ class ActionExecutorV1: "error": result.get("error"), "warning": result.get("warning"), "screenshot": result.get("screenshot"), + "screenshot_after": result.get("screenshot"), + "screenshot_before": result.get("screenshot_before"), + "resolution_method": result.get("resolution_method"), + "resolution_score": result.get("resolution_score"), + "resolution_elapsed_ms": result.get("resolution_elapsed_ms"), + # Coordonnées RÉSOLUES effectivement cliquées (Phase 1 apprentissage) + "actual_position": result.get("actual_position"), + # Champs enrichis pour target_not_found (pause supervisée) + "target_description": result.get("target_description"), + "target_spec": result.get("target_spec"), } try: resp2 = requests.post( @@ -535,7 +1339,319 @@ class ActionExecutorV1: return True # ========================================================================= - # Gestion automatique des popups imprevues + # Gestion intelligente des popups imprévues (VLM) + # ========================================================================= + + def _handle_popup_vlm(self) -> bool: + """Détecter et gérer une popup imprévue via approche hybride. + + Approche hybride VLM + template matching : + 1. Le VLM **identifie** s'il y a une popup et le texte du bouton à cliquer + 2. Le template matching **localise** la position exacte du bouton + + Le VLM (qwen3-vl:8b) ne retourne pas de coordonnées fiables, mais il + sait identifier les éléments : "il y a un bouton Oui et un bouton Non". + On lui demande donc uniquement le texte du bouton, puis on localise + ce texte sur le screenshot via rendu texte + cv2.matchTemplate. + + Appelée quand le visual resolve échoue (cible non trouvée), ce qui + peut indiquer qu'une popup modale masque l'élément attendu. + + Une seule tentative par action (pas de boucle infinie). + + Returns: + True si une popup a été gérée (fermée), False sinon. + """ + # Capturer le screenshot actuel (résolution native pour template matching) + screenshot_b64 = self._capture_screenshot_b64(max_width=0, quality=75) + if not screenshot_b64: + logger.warning("[POPUP-VLM] Capture screenshot échouée") + return False + + # Essayer la détection popup via le serveur d'abord + from ..config import SERVER_URL, API_TOKEN + if SERVER_URL: + monitor = self.sct.monitors[1] + sw, sh = monitor["width"], monitor["height"] + server_result = self._server_resolve_target( + SERVER_URL, screenshot_b64, + {"vlm_description": "popup, dialog box, confirmation, or error message button (Oui, OK, Yes, Non, Enregistrer, Annuler)"}, + 0.5, 0.5, sw, sh, + ) + if server_result and server_result.get("resolved"): + x_pct = server_result["x_pct"] + y_pct = server_result["y_pct"] + real_x = int(x_pct * sw) + real_y = int(y_pct * sh) + label = server_result.get("matched_element", {}).get("label", "popup") + print(f" [POPUP-SERVER] Popup détectée ! Clic sur '{label}' → ({real_x}, {real_y})") + logger.info(f"[POPUP-SERVER] Clic popup '{label}' à ({real_x}, {real_y})") + self._click((real_x, real_y), "left") + time.sleep(1.0) + return True + + # Fallback : VLM local identifie le bouton à cliquer + button_text = self._vlm_identify_popup_button(screenshot_b64) + if not button_text: + return False # Pas de popup ou VLM en échec + + # Étape 2 : Localiser le bouton par son texte via template matching + position = self._find_text_on_screen(screenshot_b64, button_text) + + # Fallback : essayer des variantes de casse + if not position: + variants = [ + button_text.upper(), + button_text.lower(), + button_text.capitalize(), + button_text.title(), + ] + for variant in variants: + if variant == button_text: + continue + position = self._find_text_on_screen(screenshot_b64, variant) + if position: + print(f" [POPUP-VLM] Variante trouvée : '{variant}'") + break + + if not position: + print(f" [POPUP-VLM] Bouton '{button_text}' identifié par VLM mais non localisé par template matching") + logger.warning(f"[POPUP-VLM] Bouton '{button_text}' identifié mais non localisé") + return False + + # Étape 3 : Cliquer sur le bouton + real_x, real_y = position + print( + f" [POPUP-VLM] Popup détectée ! Clic sur '{button_text}' " + f"-> ({real_x}, {real_y})" + ) + logger.info( + f"[POPUP-VLM] Clic popup '{button_text}' à ({real_x}, {real_y}) " + f"[hybride VLM+template]" + ) + + self._click((real_x, real_y), "left") + + # Attendre que la popup se ferme + time.sleep(1.0) + + print(f" [POPUP-VLM] Popup '{button_text}' gérée avec succès") + logger.info(f"[POPUP-VLM] Popup '{button_text}' gérée, attente 1s terminée") + return True + + def _vlm_identify_popup_button(self, screenshot_b64: str) -> str: + """Demander au VLM s'il y a une popup et quel bouton cliquer. + + Le VLM identifie uniquement le TEXTE du bouton (pas de coordonnées). + C'est son point fort : comprendre sémantiquement le contenu de l'écran. + + Returns: + Le texte du bouton à cliquer (ex: "Oui", "OK", "Enregistrer"), + ou une chaîne vide si pas de popup. + """ + import requests as _requests + + ollama_host = os.environ.get("RPA_SERVER_HOST", "localhost") + ollama_url = f"http://{ollama_host}:11434/api/chat" + + prompt = ( + "Regarde cette capture d'écran. Y a-t-il une popup, une boîte de dialogue, " + "error message, or modal window visible?\n" + "If yes, what button should I click to proceed?\n" + "Answer ONLY the button text (like: Oui, OK, Yes, Enregistrer, Non, " + "Cancel, Remplacer, Replace, Fermer, Close, Ne pas enregistrer, Don't Save).\n" + "If no popup: answer NO_POPUP" + ) + + # Prefill pour les modèles thinking (qwen3) — skip la phase de réflexion + _vlm_model_popup = os.environ.get("RPA_VLM_MODEL", "gemma4:e4b") + _is_thinking_popup = "qwen3" in _vlm_model_popup.lower() + + messages_popup = [ + { + "role": "system", + "content": ( + "You analyze screenshots to detect popup dialogs. " + "Answer briefly with just the button text. No JSON, no coordinates." + ), + }, + {"role": "user", "content": prompt, "images": [screenshot_b64]}, + ] + if _is_thinking_popup: + messages_popup.append({"role": "assistant", "content": "The button to click is: "}) + + payload = { + "model": _vlm_model_popup, + "messages": messages_popup, + "stream": False, + "think": False, + "options": {"temperature": 0.1, "num_predict": 30, "num_ctx": 8192}, + } + + try: + print(f" [POPUP-VLM] Appel Ollama ({ollama_host}:11434) — identification popup...") + start = time.time() + resp = _requests.post(ollama_url, json=payload, timeout=15) + elapsed = time.time() - start + + if not resp.ok: + print(f" [POPUP-VLM] HTTP {resp.status_code} ({elapsed:.1f}s)") + logger.warning(f"[POPUP-VLM] HTTP {resp.status_code}") + return "" + + raw_content = resp.json().get("message", {}).get("content", "") + full_response = prefill + raw_content + print(f" [POPUP-VLM] Réponse en {elapsed:.1f}s : {full_response.strip()}") + logger.info(f"[POPUP-VLM] Réponse VLM ({elapsed:.1f}s) : {full_response.strip()}") + + # Extraire le texte du bouton depuis la réponse + button_text = raw_content.strip().strip('"').strip("'").strip(".") + # Nettoyer les artefacts courants du VLM + for noise in ["The button to click is:", "Button:", "Click:"]: + if button_text.lower().startswith(noise.lower()): + button_text = button_text[len(noise):].strip() + + if not button_text or "NO_POPUP" in button_text.upper(): + print(f" [POPUP-VLM] Pas de popup détectée") + logger.info("[POPUP-VLM] Pas de popup détectée par le VLM") + return "" + + # Limiter à un texte raisonnable (un bouton fait rarement plus de 30 chars) + if len(button_text) > 30: + # Prendre juste le premier mot significatif + button_text = button_text.split("\n")[0].strip() + if len(button_text) > 30: + button_text = button_text[:30].strip() + + print(f" [POPUP-VLM] Bouton identifié : '{button_text}'") + logger.info(f"[POPUP-VLM] Bouton identifié par VLM : '{button_text}'") + return button_text + + except _requests.exceptions.Timeout: + print(" [POPUP-VLM] Timeout 15s") + logger.warning("[POPUP-VLM] Timeout Ollama 15s") + return "" + except Exception as e: + print(f" [POPUP-VLM] Erreur: {e}") + logger.error(f"[POPUP-VLM] Erreur inattendue: {e}") + return "" + + def _find_text_on_screen(self, screenshot_b64: str, text: str) -> tuple: + """Localiser un texte sur le screenshot via template matching. + + Rend le texte en image (PIL) avec plusieurs tailles de police, + puis utilise cv2.matchTemplate pour le trouver sur le screenshot. + + Cette approche ne nécessite pas de dépendance supplémentaire : + PIL et cv2 sont déjà disponibles dans le projet. + + Args: + screenshot_b64: Screenshot encodé en base64 (JPEG) + text: Texte à rechercher sur le screenshot + + Returns: + Tuple (x, y) des coordonnées pixel du centre du texte trouvé, + ou None si non trouvé. + """ + from PIL import Image, ImageDraw, ImageFont + import cv2 + import numpy as np + + if not text or not screenshot_b64: + return None + + # Décoder le screenshot base64 en image cv2 + try: + img_bytes = base64.b64decode(screenshot_b64) + img_array = np.frombuffer(img_bytes, dtype=np.uint8) + screenshot_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + if screenshot_bgr is None: + logger.warning("[FIND-TEXT] Impossible de décoder le screenshot") + return None + gray = cv2.cvtColor(screenshot_bgr, cv2.COLOR_BGR2GRAY) + except Exception as e: + logger.warning(f"[FIND-TEXT] Erreur décodage screenshot : {e}") + return None + + # Charger une police TrueType (Windows a arial.ttf, sinon default) + def _get_font(size): + font_paths = [ + "C:/Windows/Fonts/arial.ttf", + "C:/Windows/Fonts/segoeui.ttf", + "C:/Windows/Fonts/tahoma.ttf", + "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", + ] + for fp in font_paths: + try: + return ImageFont.truetype(fp, size) + except (OSError, IOError): + continue + return ImageFont.load_default() + + best_match = None + best_val = 0.0 + threshold = 0.50 # Seuil équilibré + + # Essayer plusieurs tailles de police pour couvrir différentes résolutions + for font_size in [14, 16, 18, 20, 22, 24, 12, 26, 28, 10]: + font = _get_font(font_size) + + # Calculer la taille exacte du texte rendu + # Créer une image temporaire pour mesurer + tmp_img = Image.new("L", (1, 1), 255) + tmp_draw = ImageDraw.Draw(tmp_img) + bbox = tmp_draw.textbbox((0, 0), text, font=font) + text_w = bbox[2] - bbox[0] + 6 # +6 pour marge + text_h = bbox[3] - bbox[1] + 6 + + if text_w <= 0 or text_h <= 0: + continue + # Le template ne doit pas être plus grand que le screenshot + if text_w >= gray.shape[1] or text_h >= gray.shape[0]: + continue + + # Rendre le texte : fond blanc, texte noir (comme un bouton Windows) + text_img = Image.new("L", (text_w, text_h), 255) + draw = ImageDraw.Draw(text_img) + draw.text((3, 3), text, fill=0, font=font) + + template = np.array(text_img) + + # Template matching + result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED) + _, max_val, _, max_loc = cv2.minMaxLoc(result) + + if max_val > best_val: + best_val = max_val + best_match = ( + max_loc[0] + template.shape[1] // 2, + max_loc[1] + template.shape[0] // 2, + ) + + # Match suffisamment bon → arrêter tôt + if max_val > 0.75: + break + + if best_match and best_val >= threshold: + print( + f" [FIND-TEXT] '{text}' trouvé à ({best_match[0]}, {best_match[1]}) " + f"score={best_val:.3f}" + ) + logger.info( + f"[FIND-TEXT] '{text}' trouvé à ({best_match[0]}, {best_match[1]}) " + f"score={best_val:.3f}" + ) + return best_match + + if best_val > 0: + print(f" [FIND-TEXT] '{text}' meilleur score={best_val:.3f} < seuil {threshold}") + else: + print(f" [FIND-TEXT] '{text}' aucun match") + return None + + # ========================================================================= + # Gestion automatique des popups imprevues (legacy clavier) # ========================================================================= def _handle_possible_popup(self) -> bool: @@ -650,58 +1766,43 @@ class ActionExecutorV1: # ========================================================================= def _type_text(self, text: str): - """Saisir du texte via copier-coller (methode principale) ou keyboard.type (fallback). + """Saisir du texte caractère par caractère (anti-détection robot). - Le copier-coller via le presse-papiers est la methode principale car - keyboard.type() de pynput envoie les scancodes QWERTY, ce qui produit - des caracteres incorrects sur les claviers AZERTY (ex: "ce" -> "ci"). - Le copier-coller est agnostique du layout clavier. + Chaque caractère est tapé individuellement avec un délai aléatoire + pour simuler une frappe humaine. Les caractères spéciaux AZERTY + (@ # € etc.) utilisent les bons VK codes via KeyCode.from_char(). + + Pas de copier-coller (détectable par les systèmes anti-robot Citrix). """ + import random + if not text: return - clipboard_ok = False - try: - import pyperclip - # Sauvegarder le contenu actuel du presse-papiers + for char in text: try: - old_clipboard = pyperclip.paste() + # Taper le caractère via from_char (respecte le layout clavier) + self.keyboard.press(KeyCode.from_char(char)) + self.keyboard.release(KeyCode.from_char(char)) except Exception: - old_clipboard = None - - pyperclip.copy(text) - # Ctrl+V pour coller - self.keyboard.press(Key.ctrl) - time.sleep(0.02) - self.keyboard.press('v') - self.keyboard.release('v') - self.keyboard.release(Key.ctrl) - time.sleep(0.1) - - # Restaurer le presse-papiers original - if old_clipboard is not None: + # Fallback : keyboard.type pour les cas spéciaux try: - pyperclip.copy(old_clipboard) - except Exception: - pass + self.keyboard.type(char) + except Exception as e: + logger.debug(f"Impossible de taper '{char}': {e}") + # Délai humain entre les frappes (40-120ms) + time.sleep(random.uniform(0.04, 0.12)) - clipboard_ok = True - logger.debug(f"Texte saisi via presse-papiers ({len(text)} chars)") - except ImportError: - logger.debug("pyperclip non disponible, fallback sur keyboard.type()") - except Exception as e: - logger.warning(f"Copier-coller echoue ({e}), fallback sur keyboard.type()") - - if not clipboard_ok: - self.keyboard.type(text) + logger.debug(f"Texte saisi char-by-char ({len(text)} chars)") def _click(self, pos, button_name): - """Deplacer la souris et cliquer. + """Deplacer la souris via courbe de Bézier puis cliquer. - Supporte les boutons : left, right, double (double-clic gauche). + Le mouvement en courbe de Bézier simule un déplacement humain + (anti-détection robot pour Citrix et systèmes surveillés). """ - self.mouse.position = pos - time.sleep(0.1) # Delai pour simuler le temps de reaction humain + self._bezier_move(pos) + time.sleep(0.05) if button_name == "double": self.mouse.click(Button.left, 2) @@ -710,6 +1811,35 @@ class ActionExecutorV1: else: self.mouse.click(Button.left) + def _bezier_move(self, target, steps=25): + """Déplacer la souris vers target via une courbe de Bézier cubique. + + Génère un mouvement naturel avec un point de contrôle aléatoire + pour éviter les lignes droites détectables par les anti-bots. + """ + import random + + start = self.mouse.position + sx, sy = start + tx, ty = target + + # Point de contrôle aléatoire (déviation latérale) + dist = ((tx - sx) ** 2 + (ty - sy) ** 2) ** 0.5 + deviation = max(20, dist * 0.2) + cx = (sx + tx) / 2 + random.uniform(-deviation, deviation) + cy = (sy + ty) / 2 + random.uniform(-deviation, deviation) + + for i in range(1, steps + 1): + t = i / steps + # Bézier quadratique : B(t) = (1-t)²·S + 2(1-t)t·C + t²·T + inv_t = 1 - t + x = inv_t * inv_t * sx + 2 * inv_t * t * cx + t * t * tx + y = inv_t * inv_t * sy + 2 * inv_t * t * cy + t * t * ty + self.mouse.position = (int(x), int(y)) + # Vitesse variable (plus lent au début et à la fin) + speed = 0.005 + 0.01 * (1 - abs(2 * t - 1)) + time.sleep(speed) + def _execute_key_combo(self, keys: list): """ Executer une combinaison de touches. @@ -753,6 +1883,50 @@ class ActionExecutorV1: for mod in reversed(modifiers): self.keyboard.release(mod) + def _replay_raw_keys(self, raw_keys: list): + """Rejouer une séquence press/release exacte via virtual key codes. + + Utilise KeyCode.from_vk() pour reconstituer les touches à partir + de leur vk code, ce qui garantit un replay fidèle indépendant du + layout clavier (AZERTY, QWERTZ, etc.). + + Chaque événement raw_key est un dict avec : + - "action": "press" ou "release" + - "kind": "vk" (touche avec virtual key code) ou "key" (touche spéciale pynput) + - "vk": int (si kind == "vk") + - "name": str (si kind == "key", ex: "ctrl_l", "enter") + - "char": str ou None (si kind == "vk", informatif) + """ + for event in raw_keys: + key = self._decode_raw_key(event) + if key is None: + continue + action = event.get("action", "") + if action == "press": + self.keyboard.press(key) + elif action == "release": + self.keyboard.release(key) + else: + logger.warning(f"Action raw_key inconnue : {action}") + continue + time.sleep(0.01) # Petit délai entre chaque événement + + @staticmethod + def _decode_raw_key(data: dict): + """Décoder un événement raw_key en objet pynput (Key ou KeyCode). + + Retourne None si le décodage échoue (touche inconnue). + """ + kind = data.get("kind", "") + if kind == "key": + name = data.get("name", "") + return getattr(Key, name, None) + if kind == "vk": + vk = data.get("vk") + if vk is not None: + return KeyCode.from_vk(vk) + return None + def _capture_screenshot_b64(self, max_width: int = 800, quality: int = 60) -> str: """ Capturer l'ecran et retourner le screenshot en base64. diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 128620f10..d2f32aad4 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -3170,6 +3170,68 @@ async def report_action_result(report: ReplayResultReport): replay_state["completed_actions"] += 1 replay_state["current_action_index"] += 1 + elif not report.success and agent_warning == "wrong_window": + # L'agent a détecté en pré-vérification que la fenêtre active + # n'est pas celle attendue. Même philosophie que no_screen_change : + # un échec est un moment pédagogique, pas un stop. + # + # Causes fréquentes : Léa elle-même a pris le focus (popups de + # notification/chat), l'app cible s'est fermée, une popup système + # est apparue, l'écran a changé entre deux actions. + # + # On redirige vers paused_need_help pour que l'humain intervienne. + _tspec_ww = (original_action or {}).get("target_spec") or report.target_spec or {} + _intent_ww = "" + _idx_ww = replay_state.get("current_action_index", 0) + _actions_ww = replay_state.get("actions", []) + if 0 <= _idx_ww < len(_actions_ww): + _intent_ww = str((_actions_ww[_idx_ww] or {}).get("intention", "") or "") + + _target_desc_ww = ( + _intent_ww + or _tspec_ww.get("by_text", "") + or _tspec_ww.get("vlm_description", "")[:80] + or "cette action" + ) + replay_state["status"] = "paused_need_help" + replay_state["failed_action"] = { + "action_id": action_id, + "type": (original_action or {}).get("type", "unknown"), + "target_description": _target_desc_ww, + "screenshot_b64": screenshot_after or report.screenshot, + "target_spec": _tspec_ww, + "reason": "wrong_window", + "error_detail": report.error or "", + } + replay_state["pause_message"] = ( + f"Je m'attendais à voir la bonne fenêtre mais je vois autre " + f"chose. Peux-tu vérifier que l'application est au premier " + f"plan ? ({report.error or ''})" + ) + error_entry = { + "action_id": action_id, + "error": report.error or "wrong_window", + "retry_count": retry_count, + "timestamp": time.time(), + } + replay_state["error_log"].append(error_entry) + logger.warning( + f"Replay PAUSE supervisée (wrong_window) : {action_id} " + f"— {report.error or 'fenêtre incorrecte'} — en attente " + f"d'intervention humaine" + ) + try: + log_replay_failure( + replay_id=replay_state["replay_id"], + action_id=action_id, + target_spec=_tspec_ww, + screenshot_b64=screenshot_after or report.screenshot, + error="wrong_window", + extra={"error_detail": report.error or "", "intent": _intent_ww}, + ) + except Exception as _log_exc: + logger.debug("log_replay_failure skip: %s", _log_exc) + elif not report.success and agent_warning == "no_screen_change": # L'action a été exécutée mais l'écran n'a pas changé. #