From 4eb48d10d5e76fd5f3e8043e248e833c61c07ef5 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 14 Jan 2026 22:45:17 +0100 Subject: [PATCH] =?UTF-8?q?Feat:=20Humanizer=20anti-d=C3=A9tection=20pour?= =?UTF-8?q?=20environnements=20Citrix/VDI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Module humanizer.py avec simulation comportement humain - Courbes de Bézier pour mouvements souris - Décalage gaussien pour positions de clic - Frappe avec rythme variable et micro-erreurs - 4 profils: fast, normal, slow, stealth - Intégré dans click_anchor et type_text (humanize=True par défaut) Co-Authored-By: Claude Opus 4.5 --- .../backend/actions/vision_ui/click_anchor.py | 414 +++++++++++++ .../backend/actions/vision_ui/type_text.py | 452 ++++++++++++++ .../backend/utils/humanizer.py | 573 ++++++++++++++++++ 3 files changed, 1439 insertions(+) create mode 100644 visual_workflow_builder/backend/actions/vision_ui/click_anchor.py create mode 100644 visual_workflow_builder/backend/actions/vision_ui/type_text.py create mode 100644 visual_workflow_builder/backend/utils/humanizer.py diff --git a/visual_workflow_builder/backend/actions/vision_ui/click_anchor.py b/visual_workflow_builder/backend/actions/vision_ui/click_anchor.py new file mode 100644 index 000000000..711057014 --- /dev/null +++ b/visual_workflow_builder/backend/actions/vision_ui/click_anchor.py @@ -0,0 +1,414 @@ +""" +Action VWB - Clic sur Ancre Visuelle + +Auteur : Dom, Alice, Kiro - 09 janvier 2026 + +Cette action permet de cliquer sur un élément UI identifié par une ancre visuelle +dans le Visual Workflow Builder. + +Classes : +- VWBClickAnchorAction : Action de clic sur ancre visuelle +""" + +from typing import Dict, Any, List, Optional, Tuple +from datetime import datetime +import time + +# Import des modules de base +from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus +from ...contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error +from ...contracts.evidence import VWBEvidenceType, create_interaction_evidence +from ...contracts.visual_anchor import VWBVisualAnchor + + +class VWBClickAnchorAction(BaseVWBAction): + """ + Action de clic sur ancre visuelle VWB. + + Cette action localise un élément UI à partir d'une ancre visuelle + et effectue un clic dessus. Elle utilise le ScreenCapturer VWB existant + avec l'Option A (thread-safe) pour la capture d'écran. + """ + + def __init__( + self, + action_id: str, + parameters: Dict[str, Any], + screen_capturer=None + ): + """ + Initialise l'action de clic sur ancre. + + Args: + action_id: Identifiant unique de l'action + parameters: Paramètres incluant l'ancre visuelle + screen_capturer: Instance du ScreenCapturer (Option A thread-safe) + """ + super().__init__( + action_id=action_id, + name="Clic sur Ancre Visuelle", + description="Clique sur un élément UI identifié par une ancre visuelle", + parameters=parameters, + screen_capturer=screen_capturer + ) + + # Paramètres spécifiques au clic + self.visual_anchor: Optional[VWBVisualAnchor] = parameters.get('visual_anchor') + self.click_type = parameters.get('click_type', 'left') # left, right, double + self.click_offset_x = parameters.get('click_offset_x', 0) + self.click_offset_y = parameters.get('click_offset_y', 0) + self.wait_after_click_ms = parameters.get('wait_after_click_ms', 500) + + # Configuration de matching + self.confidence_threshold = parameters.get('confidence_threshold', 0.8) + self.search_timeout_ms = parameters.get('search_timeout_ms', 5000) + + # Humanisation (anti-détection Citrix/VDI) + self.humanize = parameters.get('humanize', True) + self.humanize_profile = parameters.get('humanize_profile', 'normal') + + def validate_parameters(self) -> List[str]: + """Valide les paramètres de l'action de clic.""" + errors = [] + + # Vérifier l'ancre visuelle + if not self.visual_anchor: + errors.append("Ancre visuelle requise") + elif not isinstance(self.visual_anchor, VWBVisualAnchor): + errors.append("Ancre visuelle invalide") + elif not self.visual_anchor.is_active: + errors.append("Ancre visuelle inactive") + + # Vérifier le type de clic + if self.click_type not in ['left', 'right', 'double']: + errors.append(f"Type de clic invalide: {self.click_type}") + + # Vérifier les offsets + if not isinstance(self.click_offset_x, (int, float)): + errors.append("Offset X doit être un nombre") + if not isinstance(self.click_offset_y, (int, float)): + errors.append("Offset Y doit être un nombre") + + # Vérifier le seuil de confiance + if not (0.0 <= self.confidence_threshold <= 1.0): + errors.append("Seuil de confiance doit être entre 0.0 et 1.0") + + # Vérifier le ScreenCapturer + if not self.screen_capturer: + errors.append("ScreenCapturer requis pour la capture d'écran") + + return errors + + def execute_core(self, step_id: str) -> VWBActionResult: + """ + Exécute l'action de clic sur ancre visuelle. + + Args: + step_id: Identifiant de l'étape + + Returns: + Résultat d'exécution + """ + start_time = datetime.now() + + try: + # Étape 1: Capturer l'écran actuel + print(f"🔍 Recherche de l'ancre visuelle: {self.visual_anchor.name}") + current_screenshot = self._capture_current_screen() + if current_screenshot is None: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SCREEN_CAPTURE_FAILED, + message="Impossible de capturer l'écran actuel" + ) + + # Étape 2: Localiser l'ancre visuelle + match_result = self._find_visual_anchor(current_screenshot) + if not match_result['found']: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.ELEMENT_NOT_FOUND, + message=f"Ancre visuelle '{self.visual_anchor.name}' non trouvée", + technical_details=match_result + ) + + # Étape 3: Calculer les coordonnées de clic + click_coordinates = self._calculate_click_coordinates(match_result) + print(f"🎯 Coordonnées de clic calculées: {click_coordinates}") + + # Étape 4: Effectuer le clic + click_success = self._perform_click(click_coordinates) + if not click_success: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.CLICK_FAILED, + message="Échec du clic sur l'élément", + technical_details={'coordinates': click_coordinates} + ) + + # Étape 5: Attendre après le clic + if self.wait_after_click_ms > 0: + time.sleep(self.wait_after_click_ms / 1000.0) + + # Étape 6: Mettre à jour les statistiques de l'ancre + end_time = datetime.now() + execution_time = (end_time - start_time).total_seconds() * 1000 + self.visual_anchor.update_usage_stats(execution_time, True) + + # Créer l'evidence d'interaction + interaction_evidence = create_interaction_evidence( + action_id=self.action_id, + step_id=step_id, + evidence_type=VWBEvidenceType.CLICK_EVIDENCE, + title=f"Clic sur {self.visual_anchor.name}", + interaction_data={ + 'anchor_id': self.visual_anchor.anchor_id, + 'anchor_name': self.visual_anchor.name, + 'click_coordinates': click_coordinates, + 'click_type': self.click_type, + 'confidence_score': match_result.get('confidence', 0.0), + 'search_time_ms': match_result.get('search_time_ms', 0), + 'match_box': match_result.get('match_box') + }, + confidence_score=match_result.get('confidence', 0.0) + ) + + # Créer le résultat de succès + result = VWBActionResult( + action_id=self.action_id, + step_id=step_id, + status=VWBActionStatus.SUCCESS, + start_time=start_time, + end_time=end_time, + execution_time_ms=execution_time, + output_data={ + 'click_coordinates': click_coordinates, + 'click_type': self.click_type, + 'anchor_confidence': match_result.get('confidence', 0.0), + 'anchor_found': True + }, + evidence_list=[interaction_evidence] + ) + + print(f"✅ Clic réussi sur {self.visual_anchor.name} en {execution_time:.1f}ms") + return result + + except Exception as e: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=f"Erreur lors du clic: {str(e)}", + technical_details={'exception': str(e)} + ) + + def _capture_current_screen(self) -> Optional[Dict[str, Any]]: + """Capture l'écran actuel avec métadonnées.""" + try: + # Utiliser la méthode ultra stable (Option A) + img_array = self.screen_capturer.capture() + if img_array is None: + return None + + from PIL import Image + import base64 + import io + + # Convertir en PIL Image + pil_image = Image.fromarray(img_array) + + # Convertir en base64 pour stockage + buffer = io.BytesIO() + pil_image.save(buffer, format='PNG', optimize=True) + screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + + return { + 'image_array': img_array, + 'pil_image': pil_image, + 'screenshot_base64': screenshot_base64, + 'width': pil_image.width, + 'height': pil_image.height, + 'timestamp': datetime.now().isoformat() + } + + except Exception as e: + print(f"❌ Erreur capture d'écran: {e}") + return None + + def _find_visual_anchor(self, screenshot_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Localise l'ancre visuelle dans le screenshot. + + Args: + screenshot_data: Données du screenshot + + Returns: + Résultat de la recherche avec coordonnées si trouvé + """ + search_start = time.time() + + try: + # Simulation de recherche d'ancre visuelle + # Dans une implémentation complète, ceci utiliserait: + # - Template matching pour ancres image + # - OCR pour ancres texte + # - Embeddings CLIP pour recherche sémantique + + pil_image = screenshot_data['pil_image'] + + # Pour cette implémentation de base, simuler une recherche réussie + # au centre de l'écran avec une confiance élevée + center_x = pil_image.width // 2 + center_y = pil_image.height // 2 + + # Simuler un délai de recherche réaliste + search_delay = min(self.search_timeout_ms / 1000.0, 0.5) + time.sleep(search_delay) + + search_time_ms = (time.time() - search_start) * 1000 + + # Vérifier si l'ancre a une bounding box définie + if self.visual_anchor.has_bounding_box(): + # Utiliser la zone de recherche adaptée à la résolution + search_area = self.visual_anchor.get_search_area( + pil_image.width, + pil_image.height + ) + if search_area: + center_x = search_area['x'] + search_area['width'] // 2 + center_y = search_area['y'] + search_area['height'] // 2 + + # Simuler une confiance basée sur le seuil configuré + confidence = min(self.confidence_threshold + 0.1, 0.95) + + return { + 'found': confidence >= self.confidence_threshold, + 'confidence': confidence, + 'match_box': { + 'x': center_x - 50, + 'y': center_y - 25, + 'width': 100, + 'height': 50 + }, + 'center_coordinates': {'x': center_x, 'y': center_y}, + 'search_time_ms': search_time_ms, + 'method': 'simulated_template_matching' + } + + except Exception as e: + search_time_ms = (time.time() - search_start) * 1000 + return { + 'found': False, + 'error': str(e), + 'search_time_ms': search_time_ms + } + + def _calculate_click_coordinates(self, match_result: Dict[str, Any]) -> Dict[str, int]: + """ + Calcule les coordonnées finales de clic avec offsets. + + Args: + match_result: Résultat de la recherche d'ancre + + Returns: + Coordonnées de clic finales + """ + center_coords = match_result['center_coordinates'] + + final_x = int(center_coords['x'] + self.click_offset_x) + final_y = int(center_coords['y'] + self.click_offset_y) + + return { + 'x': final_x, + 'y': final_y, + 'offset_x': self.click_offset_x, + 'offset_y': self.click_offset_y + } + + def _perform_click(self, coordinates: Dict[str, int]) -> bool: + """ + Effectue le clic aux coordonnées spécifiées. + + Args: + coordinates: Coordonnées de clic + + Returns: + True si le clic a réussi + """ + try: + x, y = coordinates['x'], coordinates['y'] + + # Utiliser le Humanizer si activé (anti-détection Citrix/VDI) + if self.humanize: + try: + from ...utils.humanizer import Humanizer, HumanProfile + + # Sélectionner le profil + profile_map = { + 'fast': HumanProfile.FAST, + 'normal': HumanProfile.NORMAL, + 'slow': HumanProfile.SLOW, + 'stealth': HumanProfile.STEALTH, + } + profile = profile_map.get(self.humanize_profile, HumanProfile.NORMAL) + + humanizer = Humanizer(profile=profile) + + if self.click_type == 'left': + real_x, real_y = humanizer.click(x, y, button='left') + elif self.click_type == 'right': + real_x, real_y = humanizer.click(x, y, button='right') + elif self.click_type == 'double': + real_x, real_y = humanizer.double_click(x, y) + + print(f"🖱️ Clic humanisé {self.click_type} à ({real_x}, {real_y}) [cible: {x}, {y}]") + return True + + except ImportError as e: + print(f"⚠️ Humanizer non disponible: {e}, fallback pyautogui direct") + + # Fallback: pyautogui direct (sans humanisation) + try: + import pyautogui + + if self.click_type == 'left': + pyautogui.click(x, y) + elif self.click_type == 'right': + pyautogui.rightClick(x, y) + elif self.click_type == 'double': + pyautogui.doubleClick(x, y) + + print(f"🖱️ Clic {self.click_type} effectué à ({x}, {y})") + return True + + except ImportError: + print("⚠️ pyautogui non disponible - simulation du clic") + return True + + except Exception as e: + print(f"❌ Erreur lors du clic: {e}") + return False + + def get_action_info(self) -> Dict[str, Any]: + """Retourne les informations de l'action pour l'interface.""" + return { + 'action_id': self.action_id, + 'name': self.name, + 'description': self.description, + 'type': 'click_anchor', + 'parameters': { + 'anchor_name': self.visual_anchor.name if self.visual_anchor else 'Non définie', + 'click_type': self.click_type, + 'confidence_threshold': self.confidence_threshold, + 'click_offset': { + 'x': self.click_offset_x, + 'y': self.click_offset_y + }, + 'wait_after_click_ms': self.wait_after_click_ms + }, + 'status': self.current_status.value, + 'anchor_reliable': self.visual_anchor.is_reliable() if self.visual_anchor else False + } \ No newline at end of file diff --git a/visual_workflow_builder/backend/actions/vision_ui/type_text.py b/visual_workflow_builder/backend/actions/vision_ui/type_text.py new file mode 100644 index 000000000..eec07ecae --- /dev/null +++ b/visual_workflow_builder/backend/actions/vision_ui/type_text.py @@ -0,0 +1,452 @@ +""" +Action VWB - Saisie de Texte + +Auteur : Dom, Alice, Kiro - 09 janvier 2026 + +Cette action permet de saisir du texte dans un champ identifié par une ancre visuelle +dans le Visual Workflow Builder. + +Classes : +- VWBTypeTextAction : Action de saisie de texte +""" + +from typing import Dict, Any, List, Optional +from datetime import datetime +import time + +# Import des modules de base +from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus +from ...contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error +from ...contracts.evidence import VWBEvidenceType, create_interaction_evidence +from ...contracts.visual_anchor import VWBVisualAnchor + + +class VWBTypeTextAction(BaseVWBAction): + """ + Action de saisie de texte VWB. + + Cette action localise un champ de saisie à partir d'une ancre visuelle + et y saisit le texte spécifié. Elle peut optionnellement cliquer sur le champ + avant la saisie et valider après. + """ + + def __init__( + self, + action_id: str, + parameters: Dict[str, Any], + screen_capturer=None + ): + """ + Initialise l'action de saisie de texte. + + Args: + action_id: Identifiant unique de l'action + parameters: Paramètres incluant l'ancre et le texte + screen_capturer: Instance du ScreenCapturer (Option A thread-safe) + """ + super().__init__( + action_id=action_id, + name="Saisie de Texte", + description="Saisit du texte dans un champ identifié par une ancre visuelle", + parameters=parameters, + screen_capturer=screen_capturer + ) + + # Paramètres spécifiques à la saisie + self.visual_anchor: Optional[VWBVisualAnchor] = parameters.get('visual_anchor') + self.text_to_type = parameters.get('text_to_type', '') + self.clear_field_first = parameters.get('clear_field_first', True) + self.click_before_typing = parameters.get('click_before_typing', True) + self.press_enter_after = parameters.get('press_enter_after', False) + self.typing_speed_ms = parameters.get('typing_speed_ms', 50) # Délai entre caractères + self.wait_after_typing_ms = parameters.get('wait_after_typing_ms', 500) + + # Configuration de matching + self.confidence_threshold = parameters.get('confidence_threshold', 0.8) + self.search_timeout_ms = parameters.get('search_timeout_ms', 5000) + + # Humanisation (anti-détection Citrix/VDI) + self.humanize = parameters.get('humanize', True) + self.humanize_profile = parameters.get('humanize_profile', 'normal') + + def validate_parameters(self) -> List[str]: + """Valide les paramètres de l'action de saisie.""" + errors = [] + + # Vérifier l'ancre visuelle + if not self.visual_anchor: + errors.append("Ancre visuelle requise") + elif not isinstance(self.visual_anchor, VWBVisualAnchor): + errors.append("Ancre visuelle invalide") + elif not self.visual_anchor.is_active: + errors.append("Ancre visuelle inactive") + + # Vérifier le texte à saisir + if not isinstance(self.text_to_type, str): + errors.append("Le texte à saisir doit être une chaîne") + + # Vérifier les paramètres booléens + if not isinstance(self.clear_field_first, bool): + errors.append("clear_field_first doit être un booléen") + if not isinstance(self.click_before_typing, bool): + errors.append("click_before_typing doit être un booléen") + if not isinstance(self.press_enter_after, bool): + errors.append("press_enter_after doit être un booléen") + + # Vérifier les délais + if not isinstance(self.typing_speed_ms, (int, float)) or self.typing_speed_ms < 0: + errors.append("typing_speed_ms doit être un nombre positif") + if not isinstance(self.wait_after_typing_ms, (int, float)) or self.wait_after_typing_ms < 0: + errors.append("wait_after_typing_ms doit être un nombre positif") + + # Vérifier le seuil de confiance + if not (0.0 <= self.confidence_threshold <= 1.0): + errors.append("Seuil de confiance doit être entre 0.0 et 1.0") + + # Vérifier le ScreenCapturer + if not self.screen_capturer: + errors.append("ScreenCapturer requis pour la capture d'écran") + + return errors + + def execute_core(self, step_id: str) -> VWBActionResult: + """ + Exécute l'action de saisie de texte. + + Args: + step_id: Identifiant de l'étape + + Returns: + Résultat d'exécution + """ + start_time = datetime.now() + + try: + # Étape 1: Capturer l'écran actuel + print(f"🔍 Recherche du champ de saisie: {self.visual_anchor.name}") + current_screenshot = self._capture_current_screen() + if current_screenshot is None: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SCREEN_CAPTURE_FAILED, + message="Impossible de capturer l'écran actuel" + ) + + # Étape 2: Localiser le champ de saisie + match_result = self._find_input_field(current_screenshot) + if not match_result['found']: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.ELEMENT_NOT_FOUND, + message=f"Champ de saisie '{self.visual_anchor.name}' non trouvé", + technical_details=match_result + ) + + # Étape 3: Cliquer sur le champ si nécessaire + if self.click_before_typing: + click_coordinates = self._calculate_click_coordinates(match_result) + click_success = self._perform_click(click_coordinates) + if not click_success: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.CLICK_FAILED, + message="Impossible de cliquer sur le champ de saisie", + technical_details={'coordinates': click_coordinates} + ) + + # Attendre que le champ soit actif + time.sleep(0.2) + + # Étape 4: Vider le champ si nécessaire + if self.clear_field_first: + self._clear_field() + + # Étape 5: Saisir le texte + typing_success = self._type_text() + if not typing_success: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.TYPE_TEXT_FAILED, + message="Échec de la saisie de texte", + technical_details={'text_length': len(self.text_to_type)} + ) + + # Étape 6: Appuyer sur Entrée si nécessaire + if self.press_enter_after: + self._press_enter() + + # Étape 7: Attendre après la saisie + if self.wait_after_typing_ms > 0: + time.sleep(self.wait_after_typing_ms / 1000.0) + + # Étape 8: Mettre à jour les statistiques de l'ancre + end_time = datetime.now() + execution_time = (end_time - start_time).total_seconds() * 1000 + self.visual_anchor.update_usage_stats(execution_time, True) + + # Créer l'evidence d'interaction + interaction_evidence = create_interaction_evidence( + action_id=self.action_id, + step_id=step_id, + evidence_type=VWBEvidenceType.TYPE_EVIDENCE, + title=f"Saisie dans {self.visual_anchor.name}", + interaction_data={ + 'anchor_id': self.visual_anchor.anchor_id, + 'anchor_name': self.visual_anchor.name, + 'text_typed': self.text_to_type, + 'text_length': len(self.text_to_type), + 'cleared_first': self.clear_field_first, + 'clicked_before': self.click_before_typing, + 'pressed_enter': self.press_enter_after, + 'confidence_score': match_result.get('confidence', 0.0), + 'search_time_ms': match_result.get('search_time_ms', 0), + 'match_box': match_result.get('match_box') + }, + confidence_score=match_result.get('confidence', 0.0) + ) + + # Créer le résultat de succès + result = VWBActionResult( + action_id=self.action_id, + step_id=step_id, + status=VWBActionStatus.SUCCESS, + start_time=start_time, + end_time=end_time, + execution_time_ms=execution_time, + output_data={ + 'text_typed': self.text_to_type, + 'text_length': len(self.text_to_type), + 'field_cleared': self.clear_field_first, + 'enter_pressed': self.press_enter_after, + 'anchor_confidence': match_result.get('confidence', 0.0), + 'field_found': True + }, + evidence_list=[interaction_evidence] + ) + + print(f"✅ Saisie réussie dans {self.visual_anchor.name} en {execution_time:.1f}ms") + return result + + except Exception as e: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=f"Erreur lors de la saisie: {str(e)}", + technical_details={'exception': str(e)} + ) + + def _capture_current_screen(self) -> Optional[Dict[str, Any]]: + """Capture l'écran actuel avec métadonnées.""" + try: + # Utiliser la méthode ultra stable (Option A) + img_array = self.screen_capturer.capture() + if img_array is None: + return None + + from PIL import Image + import base64 + import io + + # Convertir en PIL Image + pil_image = Image.fromarray(img_array) + + # Convertir en base64 pour stockage + buffer = io.BytesIO() + pil_image.save(buffer, format='PNG', optimize=True) + screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + + return { + 'image_array': img_array, + 'pil_image': pil_image, + 'screenshot_base64': screenshot_base64, + 'width': pil_image.width, + 'height': pil_image.height, + 'timestamp': datetime.now().isoformat() + } + + except Exception as e: + print(f"❌ Erreur capture d'écran: {e}") + return None + + def _find_input_field(self, screenshot_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Localise le champ de saisie dans le screenshot. + + Args: + screenshot_data: Données du screenshot + + Returns: + Résultat de la recherche avec coordonnées si trouvé + """ + search_start = time.time() + + try: + pil_image = screenshot_data['pil_image'] + + # Simuler un délai de recherche réaliste + search_delay = min(self.search_timeout_ms / 1000.0, 0.3) + time.sleep(search_delay) + + search_time_ms = (time.time() - search_start) * 1000 + + # Simuler la localisation du champ + center_x = pil_image.width // 2 + center_y = pil_image.height // 2 + + # Vérifier si l'ancre a une bounding box définie + if self.visual_anchor.has_bounding_box(): + search_area = self.visual_anchor.get_search_area( + pil_image.width, + pil_image.height + ) + if search_area: + center_x = search_area['x'] + search_area['width'] // 2 + center_y = search_area['y'] + search_area['height'] // 2 + + # Simuler une confiance basée sur le seuil configuré + confidence = min(self.confidence_threshold + 0.05, 0.92) + + return { + 'found': confidence >= self.confidence_threshold, + 'confidence': confidence, + 'match_box': { + 'x': center_x - 75, + 'y': center_y - 15, + 'width': 150, + 'height': 30 + }, + 'center_coordinates': {'x': center_x, 'y': center_y}, + 'search_time_ms': search_time_ms, + 'method': 'simulated_input_field_detection' + } + + except Exception as e: + search_time_ms = (time.time() - search_start) * 1000 + return { + 'found': False, + 'error': str(e), + 'search_time_ms': search_time_ms + } + + def _calculate_click_coordinates(self, match_result: Dict[str, Any]) -> Dict[str, int]: + """Calcule les coordonnées de clic sur le champ.""" + center_coords = match_result['center_coordinates'] + + return { + 'x': int(center_coords['x']), + 'y': int(center_coords['y']) + } + + def _perform_click(self, coordinates: Dict[str, int]) -> bool: + """Effectue un clic sur le champ de saisie.""" + try: + try: + import pyautogui + pyautogui.click(coordinates['x'], coordinates['y']) + print(f"🖱️ Clic sur le champ à ({coordinates['x']}, {coordinates['y']})") + return True + except ImportError: + print("⚠️ pyautogui non disponible - simulation du clic") + return True + except Exception as e: + print(f"❌ Erreur lors du clic: {e}") + return False + + def _clear_field(self): + """Vide le champ de saisie.""" + try: + try: + import pyautogui + # Sélectionner tout le texte et le supprimer + pyautogui.hotkey('ctrl', 'a') + time.sleep(0.1) + pyautogui.press('delete') + print("🧹 Champ vidé") + except ImportError: + print("⚠️ pyautogui non disponible - simulation du vidage") + except Exception as e: + print(f"⚠️ Erreur lors du vidage: {e}") + + def _type_text(self) -> bool: + """Saisit le texte avec comportement humain.""" + try: + # Utiliser le Humanizer si activé (anti-détection Citrix/VDI) + if self.humanize: + try: + from ...utils.humanizer import Humanizer, HumanProfile + + profile_map = { + 'fast': HumanProfile.FAST, + 'normal': HumanProfile.NORMAL, + 'slow': HumanProfile.SLOW, + 'stealth': HumanProfile.STEALTH, + } + profile = profile_map.get(self.humanize_profile, HumanProfile.NORMAL) + + humanizer = Humanizer(profile=profile) + typed = humanizer.type_text(self.text_to_type) + + print(f"⌨️ Texte humanisé: '{typed}' ({len(typed)} caractères)") + return True + + except ImportError as e: + print(f"⚠️ Humanizer non disponible: {e}, fallback pyautogui direct") + + # Fallback: pyautogui direct + try: + import pyautogui + + for char in self.text_to_type: + pyautogui.write(char) + if self.typing_speed_ms > 0: + time.sleep(self.typing_speed_ms / 1000.0) + + print(f"⌨️ Texte saisi: '{self.text_to_type}' ({len(self.text_to_type)} caractères)") + return True + + except ImportError: + print(f"⚠️ pyautogui non disponible - simulation: '{self.text_to_type}'") + time.sleep(len(self.text_to_type) * self.typing_speed_ms / 1000.0) + return True + + except Exception as e: + print(f"❌ Erreur lors de la saisie: {e}") + return False + + def _press_enter(self): + """Appuie sur la touche Entrée.""" + try: + try: + import pyautogui + pyautogui.press('enter') + print("⏎ Touche Entrée pressée") + except ImportError: + print("⚠️ pyautogui non disponible - simulation de la touche Entrée") + except Exception as e: + print(f"⚠️ Erreur lors de l'appui sur Entrée: {e}") + + def get_action_info(self) -> Dict[str, Any]: + """Retourne les informations de l'action pour l'interface.""" + return { + 'action_id': self.action_id, + 'name': self.name, + 'description': self.description, + 'type': 'type_text', + 'parameters': { + 'anchor_name': self.visual_anchor.name if self.visual_anchor else 'Non définie', + 'text_to_type': self.text_to_type, + 'text_length': len(self.text_to_type), + 'clear_field_first': self.clear_field_first, + 'click_before_typing': self.click_before_typing, + 'press_enter_after': self.press_enter_after, + 'typing_speed_ms': self.typing_speed_ms, + 'confidence_threshold': self.confidence_threshold + }, + 'status': self.current_status.value, + 'anchor_reliable': self.visual_anchor.is_reliable() if self.visual_anchor else False + } \ No newline at end of file diff --git a/visual_workflow_builder/backend/utils/humanizer.py b/visual_workflow_builder/backend/utils/humanizer.py new file mode 100644 index 000000000..e627322f8 --- /dev/null +++ b/visual_workflow_builder/backend/utils/humanizer.py @@ -0,0 +1,573 @@ +""" +Humanizer - Simulation de comportement humain pour anti-détection +Auteur : Dom, Claude - 14 janvier 2026 + +Ce module ajoute un "flou gaussien comportemental" aux actions RPA +pour éviter la détection par les systèmes anti-robot (Citrix, etc.). + +Principes : +- Mouvements de souris avec courbes naturelles +- Positions de clic avec légère imprécision +- Délais variables entre les actions +- Frappe avec rythme humain et micro-erreurs +""" + +import random +import time +import math +from typing import Tuple, List, Optional, Callable +from dataclasses import dataclass +from enum import Enum + + +class HumanProfile(Enum): + """Profils de comportement humain.""" + FAST = "fast" # Utilisateur expérimenté, rapide + NORMAL = "normal" # Utilisateur standard + SLOW = "slow" # Utilisateur prudent/débutant + STEALTH = "stealth" # Maximum d'humanisation (environnements très surveillés) + + +@dataclass +class HumanConfig: + """Configuration du comportement humain.""" + # Mouvement souris + mouse_speed_base: float = 0.3 # Durée base du mouvement (secondes) + mouse_speed_variance: float = 0.15 # Variance de la durée + mouse_curve_intensity: float = 0.3 # Intensité de la courbe (0-1) + mouse_micro_movements: bool = True # Micro-tremblements + + # Position clic + click_offset_max: int = 8 # Décalage max en pixels + click_offset_sigma: float = 3.0 # Écart-type du décalage gaussien + + # Timing + delay_base_ms: int = 200 # Délai base entre actions (ms) + delay_variance_ratio: float = 0.4 # Ratio de variance (40%) + pause_probability: float = 0.05 # Probabilité de pause réflexion + pause_duration_ms: Tuple[int, int] = (500, 2000) # Durée pause min/max + + # Frappe clavier + typing_speed_cpm: int = 300 # Caractères par minute + typing_speed_variance: float = 0.25 # Variance vitesse + typo_probability: float = 0.02 # Probabilité de faute de frappe + typo_correct: bool = True # Corriger les fautes + + +# Profils prédéfinis +PROFILES = { + HumanProfile.FAST: HumanConfig( + mouse_speed_base=0.15, + mouse_speed_variance=0.08, + mouse_curve_intensity=0.15, + click_offset_max=5, + click_offset_sigma=2.0, + delay_base_ms=100, + delay_variance_ratio=0.3, + pause_probability=0.02, + typing_speed_cpm=400, + typing_speed_variance=0.2, + typo_probability=0.01, + ), + HumanProfile.NORMAL: HumanConfig(), # Valeurs par défaut + HumanProfile.SLOW: HumanConfig( + mouse_speed_base=0.5, + mouse_speed_variance=0.2, + mouse_curve_intensity=0.4, + click_offset_max=10, + click_offset_sigma=4.0, + delay_base_ms=400, + delay_variance_ratio=0.5, + pause_probability=0.1, + typing_speed_cpm=200, + typing_speed_variance=0.3, + typo_probability=0.03, + ), + HumanProfile.STEALTH: HumanConfig( + mouse_speed_base=0.4, + mouse_speed_variance=0.25, + mouse_curve_intensity=0.5, + mouse_micro_movements=True, + click_offset_max=12, + click_offset_sigma=4.5, + delay_base_ms=350, + delay_variance_ratio=0.5, + pause_probability=0.15, + pause_duration_ms=(800, 3000), + typing_speed_cpm=250, + typing_speed_variance=0.35, + typo_probability=0.025, + typo_correct=True, + ), +} + + +class Humanizer: + """ + Classe principale pour humaniser les actions RPA. + + Usage: + humanizer = Humanizer(profile=HumanProfile.NORMAL) + + # Mouvement souris humanisé + humanizer.move_to(500, 300) + + # Clic avec position légèrement décalée + humanizer.click(500, 300) + + # Frappe avec rythme humain + humanizer.type_text("Hello World") + + # Délai humanisé + humanizer.wait() + """ + + def __init__( + self, + profile: HumanProfile = HumanProfile.NORMAL, + config: Optional[HumanConfig] = None, + enabled: bool = True + ): + """ + Initialise le Humanizer. + + Args: + profile: Profil de comportement prédéfini + config: Configuration personnalisée (override le profil) + enabled: Active/désactive l'humanisation + """ + self.enabled = enabled + self.config = config if config else PROFILES.get(profile, PROFILES[HumanProfile.NORMAL]) + self._pyautogui = None + + def _get_pyautogui(self): + """Import lazy de pyautogui.""" + if self._pyautogui is None: + import pyautogui + self._pyautogui = pyautogui + return self._pyautogui + + # ========================================================================= + # FONCTIONS MATHÉMATIQUES + # ========================================================================= + + def _gaussian(self, mu: float = 0, sigma: float = 1) -> float: + """Distribution gaussienne.""" + return random.gauss(mu, sigma) + + def _clamp(self, value: float, min_val: float, max_val: float) -> float: + """Limite une valeur entre min et max.""" + return max(min_val, min(value, max_val)) + + def _bezier_point( + self, + t: float, + p0: Tuple[float, float], + p1: Tuple[float, float], + p2: Tuple[float, float], + p3: Tuple[float, float] + ) -> Tuple[float, float]: + """Calcule un point sur une courbe de Bézier cubique.""" + u = 1 - t + tt = t * t + uu = u * u + uuu = uu * u + ttt = tt * t + + x = uuu * p0[0] + 3 * uu * t * p1[0] + 3 * u * tt * p2[0] + ttt * p3[0] + y = uuu * p0[1] + 3 * uu * t * p1[1] + 3 * u * tt * p2[1] + ttt * p3[1] + + return (x, y) + + def _generate_bezier_path( + self, + start: Tuple[int, int], + end: Tuple[int, int], + num_points: int = 20 + ) -> List[Tuple[int, int]]: + """ + Génère un chemin de souris avec courbe de Bézier. + + Args: + start: Point de départ (x, y) + end: Point d'arrivée (x, y) + num_points: Nombre de points intermédiaires + + Returns: + Liste de points (x, y) formant le chemin + """ + # Distance entre les points + dx = end[0] - start[0] + dy = end[1] - start[1] + distance = math.sqrt(dx * dx + dy * dy) + + # Points de contrôle avec décalage aléatoire + intensity = self.config.mouse_curve_intensity + offset_range = distance * intensity + + # Point de contrôle 1 (près du départ) + ctrl1 = ( + start[0] + dx * 0.3 + self._gaussian(0, offset_range * 0.5), + start[1] + dy * 0.3 + self._gaussian(0, offset_range * 0.5) + ) + + # Point de contrôle 2 (près de l'arrivée) + ctrl2 = ( + start[0] + dx * 0.7 + self._gaussian(0, offset_range * 0.3), + start[1] + dy * 0.7 + self._gaussian(0, offset_range * 0.3) + ) + + # Générer les points sur la courbe + path = [] + for i in range(num_points + 1): + t = i / num_points + point = self._bezier_point(t, start, ctrl1, ctrl2, end) + + # Ajouter micro-tremblements + if self.config.mouse_micro_movements and i > 0 and i < num_points: + point = ( + point[0] + self._gaussian(0, 1), + point[1] + self._gaussian(0, 1) + ) + + path.append((int(point[0]), int(point[1]))) + + return path + + # ========================================================================= + # MOUVEMENT SOURIS + # ========================================================================= + + def move_to(self, x: int, y: int, duration: Optional[float] = None) -> None: + """ + Déplace la souris vers une position avec mouvement humain. + + Args: + x: Position X cible + y: Position Y cible + duration: Durée du mouvement (None = auto) + """ + pyautogui = self._get_pyautogui() + + if not self.enabled: + pyautogui.moveTo(x, y) + return + + # Position actuelle + current_x, current_y = pyautogui.position() + + # Calculer la durée + if duration is None: + base = self.config.mouse_speed_base + variance = self.config.mouse_speed_variance + duration = max(0.05, base + self._gaussian(0, variance)) + + # Générer le chemin courbe + path = self._generate_bezier_path( + (current_x, current_y), + (x, y), + num_points=max(5, int(duration * 30)) + ) + + # Suivre le chemin + step_duration = duration / len(path) + for point in path: + pyautogui.moveTo(point[0], point[1], _pause=False) + time.sleep(step_duration) + + def humanize_position(self, x: int, y: int) -> Tuple[int, int]: + """ + Ajoute un décalage gaussien à une position. + + Args: + x: Position X originale + y: Position Y originale + + Returns: + Nouvelle position (x, y) avec décalage + """ + if not self.enabled: + return (x, y) + + offset_x = int(self._gaussian(0, self.config.click_offset_sigma)) + offset_y = int(self._gaussian(0, self.config.click_offset_sigma)) + + # Limiter le décalage + max_offset = self.config.click_offset_max + offset_x = self._clamp(offset_x, -max_offset, max_offset) + offset_y = self._clamp(offset_y, -max_offset, max_offset) + + return (x + int(offset_x), y + int(offset_y)) + + # ========================================================================= + # CLIC + # ========================================================================= + + def click( + self, + x: int, + y: int, + button: str = 'left', + clicks: int = 1, + move_first: bool = True + ) -> Tuple[int, int]: + """ + Effectue un clic avec comportement humain. + + Args: + x: Position X + y: Position Y + button: Bouton ('left', 'right', 'middle') + clicks: Nombre de clics + move_first: Déplacer la souris avant de cliquer + + Returns: + Position réelle du clic (avec décalage) + """ + pyautogui = self._get_pyautogui() + + # Humaniser la position + real_x, real_y = self.humanize_position(x, y) + + if not self.enabled: + pyautogui.click(real_x, real_y, clicks=clicks, button=button) + return (real_x, real_y) + + # Déplacer la souris d'abord (mouvement naturel) + if move_first: + self.move_to(real_x, real_y) + + # Petit délai avant le clic (réflexe humain) + time.sleep(random.uniform(0.02, 0.08)) + + # Clic + if clicks == 2: + # Double-clic avec intervalle variable + pyautogui.click(real_x, real_y, button=button) + time.sleep(random.uniform(0.08, 0.15)) + pyautogui.click(real_x, real_y, button=button) + else: + pyautogui.click(real_x, real_y, clicks=clicks, button=button) + + return (real_x, real_y) + + def double_click(self, x: int, y: int) -> Tuple[int, int]: + """Double-clic humanisé.""" + return self.click(x, y, clicks=2) + + def right_click(self, x: int, y: int) -> Tuple[int, int]: + """Clic droit humanisé.""" + return self.click(x, y, button='right') + + # ========================================================================= + # FRAPPE CLAVIER + # ========================================================================= + + def type_text( + self, + text: str, + field_x: Optional[int] = None, + field_y: Optional[int] = None + ) -> str: + """ + Tape du texte avec rythme humain. + + Args: + text: Texte à taper + field_x: Position X du champ (optionnel, pour cliquer avant) + field_y: Position Y du champ (optionnel) + + Returns: + Texte réellement tapé (peut différer si typos non corrigées) + """ + pyautogui = self._get_pyautogui() + + if not self.enabled: + if field_x is not None and field_y is not None: + pyautogui.click(field_x, field_y) + pyautogui.typewrite(text, interval=0.05) + return text + + # Cliquer sur le champ si position fournie + if field_x is not None and field_y is not None: + self.click(field_x, field_y) + self.wait(100, 200) + + # Calculer l'intervalle moyen entre les touches + cpm = self.config.typing_speed_cpm + base_interval = 60.0 / cpm # secondes par caractère + + typed_text = "" + i = 0 + + while i < len(text): + char = text[i] + + # Vérifier si on fait une faute de frappe + if random.random() < self.config.typo_probability and char.isalpha(): + # Taper une mauvaise lettre + wrong_char = self._get_nearby_key(char) + pyautogui.press(wrong_char) + typed_text += wrong_char + + # Pause de réalisation de l'erreur + time.sleep(random.uniform(0.1, 0.3)) + + if self.config.typo_correct: + # Corriger avec backspace + pyautogui.press('backspace') + typed_text = typed_text[:-1] + time.sleep(random.uniform(0.05, 0.15)) + + # Taper le bon caractère + pyautogui.press(char) if len(char) == 1 else pyautogui.typewrite(char) + typed_text += char + + # Intervalle variable + variance = self.config.typing_speed_variance + interval = base_interval * (1 + self._gaussian(0, variance)) + interval = max(0.02, interval) + + # Pause occasionnelle (réflexion) + if random.random() < 0.02: # 2% de chance de pause + interval += random.uniform(0.2, 0.5) + + time.sleep(interval) + i += 1 + + return typed_text + + def _get_nearby_key(self, char: str) -> str: + """Retourne une touche proche sur le clavier (pour simuler une typo).""" + keyboard_neighbors = { + 'a': 'sqzw', 'b': 'vghn', 'c': 'xdfv', 'd': 'erfcxs', + 'e': 'rdsw', 'f': 'rtgvcd', 'g': 'tyhbvf', 'h': 'yujnbg', + 'i': 'uojk', 'j': 'uikmnh', 'k': 'iolmj', 'l': 'opkm', + 'm': 'njk', 'n': 'bhjm', 'o': 'iplk', 'p': 'ol', + 'q': 'wa', 'r': 'etfd', 's': 'wedxa', 't': 'ryfg', + 'u': 'yihj', 'v': 'cfgb', 'w': 'qeas', 'x': 'zsdc', + 'y': 'tugh', 'z': 'asx' + } + + char_lower = char.lower() + if char_lower in keyboard_neighbors: + neighbors = keyboard_neighbors[char_lower] + wrong = random.choice(neighbors) + return wrong.upper() if char.isupper() else wrong + + return char + + # ========================================================================= + # DÉLAIS + # ========================================================================= + + def wait( + self, + min_ms: Optional[int] = None, + max_ms: Optional[int] = None + ) -> float: + """ + Attend avec un délai humanisé. + + Args: + min_ms: Délai minimum en ms (None = config) + max_ms: Délai maximum en ms (None = calculé) + + Returns: + Durée réelle de l'attente (secondes) + """ + if not self.enabled: + base = min_ms if min_ms else self.config.delay_base_ms + time.sleep(base / 1000) + return base / 1000 + + # Calculer le délai + base = min_ms if min_ms else self.config.delay_base_ms + variance = self.config.delay_variance_ratio + + delay_ms = base + self._gaussian(0, base * variance) + delay_ms = max(50, delay_ms) # Minimum 50ms + + if max_ms: + delay_ms = min(delay_ms, max_ms) + + # Vérifier si on fait une pause réflexion + if random.random() < self.config.pause_probability: + pause_min, pause_max = self.config.pause_duration_ms + delay_ms += random.uniform(pause_min, pause_max) + + delay_sec = delay_ms / 1000 + time.sleep(delay_sec) + + return delay_sec + + def random_pause(self, probability: float = 0.1) -> bool: + """ + Fait une pause aléatoire avec une certaine probabilité. + + Args: + probability: Probabilité de pause (0-1) + + Returns: + True si une pause a été effectuée + """ + if random.random() < probability: + pause_min, pause_max = self.config.pause_duration_ms + duration = random.uniform(pause_min, pause_max) / 1000 + time.sleep(duration) + return True + return False + + +# ========================================================================= +# INSTANCE GLOBALE ET FONCTIONS UTILITAIRES +# ========================================================================= + +# Instance globale par défaut +_default_humanizer: Optional[Humanizer] = None + + +def get_humanizer(profile: HumanProfile = HumanProfile.NORMAL) -> Humanizer: + """Obtient l'instance globale du Humanizer.""" + global _default_humanizer + if _default_humanizer is None: + _default_humanizer = Humanizer(profile=profile) + return _default_humanizer + + +def set_humanizer_profile(profile: HumanProfile) -> None: + """Change le profil du Humanizer global.""" + global _default_humanizer + _default_humanizer = Humanizer(profile=profile) + + +def set_humanizer_enabled(enabled: bool) -> None: + """Active/désactive l'humanisation globale.""" + humanizer = get_humanizer() + humanizer.enabled = enabled + + +# Fonctions raccourcies pour usage direct +def humanize_move(x: int, y: int, duration: Optional[float] = None) -> None: + """Mouvement souris humanisé (fonction raccourcie).""" + get_humanizer().move_to(x, y, duration) + + +def humanize_click(x: int, y: int, button: str = 'left') -> Tuple[int, int]: + """Clic humanisé (fonction raccourcie).""" + return get_humanizer().click(x, y, button=button) + + +def humanize_type(text: str) -> str: + """Frappe humanisée (fonction raccourcie).""" + return get_humanizer().type_text(text) + + +def humanize_wait(min_ms: int = 200, max_ms: int = 500) -> float: + """Attente humanisée (fonction raccourcie).""" + return get_humanizer().wait(min_ms, max_ms) + + +def humanize_position(x: int, y: int) -> Tuple[int, int]: + """Position avec décalage gaussien (fonction raccourcie).""" + return get_humanizer().humanize_position(x, y)