Feat: Humanizer anti-détection pour environnements Citrix/VDI

- Module humanizer.py avec simulation comportement humain
- Courbes de Bézier pour mouvements souris
- Décalage gaussien pour positions de clic
- Frappe avec rythme variable et micro-erreurs
- 4 profils: fast, normal, slow, stealth
- Intégré dans click_anchor et type_text (humanize=True par défaut)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-14 22:45:17 +01:00
parent 728fac3b59
commit 4eb48d10d5
3 changed files with 1439 additions and 0 deletions

View File

@@ -0,0 +1,414 @@
"""
Action VWB - Clic sur Ancre Visuelle
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Cette action permet de cliquer sur un élément UI identifié par une ancre visuelle
dans le Visual Workflow Builder.
Classes :
- VWBClickAnchorAction : Action de clic sur ancre visuelle
"""
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
import time
# Import des modules de base
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
from ...contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error
from ...contracts.evidence import VWBEvidenceType, create_interaction_evidence
from ...contracts.visual_anchor import VWBVisualAnchor
class VWBClickAnchorAction(BaseVWBAction):
"""
Action de clic sur ancre visuelle VWB.
Cette action localise un élément UI à partir d'une ancre visuelle
et effectue un clic dessus. Elle utilise le ScreenCapturer VWB existant
avec l'Option A (thread-safe) pour la capture d'écran.
"""
def __init__(
self,
action_id: str,
parameters: Dict[str, Any],
screen_capturer=None
):
"""
Initialise l'action de clic sur ancre.
Args:
action_id: Identifiant unique de l'action
parameters: Paramètres incluant l'ancre visuelle
screen_capturer: Instance du ScreenCapturer (Option A thread-safe)
"""
super().__init__(
action_id=action_id,
name="Clic sur Ancre Visuelle",
description="Clique sur un élément UI identifié par une ancre visuelle",
parameters=parameters,
screen_capturer=screen_capturer
)
# Paramètres spécifiques au clic
self.visual_anchor: Optional[VWBVisualAnchor] = parameters.get('visual_anchor')
self.click_type = parameters.get('click_type', 'left') # left, right, double
self.click_offset_x = parameters.get('click_offset_x', 0)
self.click_offset_y = parameters.get('click_offset_y', 0)
self.wait_after_click_ms = parameters.get('wait_after_click_ms', 500)
# Configuration de matching
self.confidence_threshold = parameters.get('confidence_threshold', 0.8)
self.search_timeout_ms = parameters.get('search_timeout_ms', 5000)
# Humanisation (anti-détection Citrix/VDI)
self.humanize = parameters.get('humanize', True)
self.humanize_profile = parameters.get('humanize_profile', 'normal')
def validate_parameters(self) -> List[str]:
"""Valide les paramètres de l'action de clic."""
errors = []
# Vérifier l'ancre visuelle
if not self.visual_anchor:
errors.append("Ancre visuelle requise")
elif not isinstance(self.visual_anchor, VWBVisualAnchor):
errors.append("Ancre visuelle invalide")
elif not self.visual_anchor.is_active:
errors.append("Ancre visuelle inactive")
# Vérifier le type de clic
if self.click_type not in ['left', 'right', 'double']:
errors.append(f"Type de clic invalide: {self.click_type}")
# Vérifier les offsets
if not isinstance(self.click_offset_x, (int, float)):
errors.append("Offset X doit être un nombre")
if not isinstance(self.click_offset_y, (int, float)):
errors.append("Offset Y doit être un nombre")
# Vérifier le seuil de confiance
if not (0.0 <= self.confidence_threshold <= 1.0):
errors.append("Seuil de confiance doit être entre 0.0 et 1.0")
# Vérifier le ScreenCapturer
if not self.screen_capturer:
errors.append("ScreenCapturer requis pour la capture d'écran")
return errors
def execute_core(self, step_id: str) -> VWBActionResult:
"""
Exécute l'action de clic sur ancre visuelle.
Args:
step_id: Identifiant de l'étape
Returns:
Résultat d'exécution
"""
start_time = datetime.now()
try:
# Étape 1: Capturer l'écran actuel
print(f"🔍 Recherche de l'ancre visuelle: {self.visual_anchor.name}")
current_screenshot = self._capture_current_screen()
if current_screenshot is None:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SCREEN_CAPTURE_FAILED,
message="Impossible de capturer l'écran actuel"
)
# Étape 2: Localiser l'ancre visuelle
match_result = self._find_visual_anchor(current_screenshot)
if not match_result['found']:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.ELEMENT_NOT_FOUND,
message=f"Ancre visuelle '{self.visual_anchor.name}' non trouvée",
technical_details=match_result
)
# Étape 3: Calculer les coordonnées de clic
click_coordinates = self._calculate_click_coordinates(match_result)
print(f"🎯 Coordonnées de clic calculées: {click_coordinates}")
# Étape 4: Effectuer le clic
click_success = self._perform_click(click_coordinates)
if not click_success:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.CLICK_FAILED,
message="Échec du clic sur l'élément",
technical_details={'coordinates': click_coordinates}
)
# Étape 5: Attendre après le clic
if self.wait_after_click_ms > 0:
time.sleep(self.wait_after_click_ms / 1000.0)
# Étape 6: Mettre à jour les statistiques de l'ancre
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds() * 1000
self.visual_anchor.update_usage_stats(execution_time, True)
# Créer l'evidence d'interaction
interaction_evidence = create_interaction_evidence(
action_id=self.action_id,
step_id=step_id,
evidence_type=VWBEvidenceType.CLICK_EVIDENCE,
title=f"Clic sur {self.visual_anchor.name}",
interaction_data={
'anchor_id': self.visual_anchor.anchor_id,
'anchor_name': self.visual_anchor.name,
'click_coordinates': click_coordinates,
'click_type': self.click_type,
'confidence_score': match_result.get('confidence', 0.0),
'search_time_ms': match_result.get('search_time_ms', 0),
'match_box': match_result.get('match_box')
},
confidence_score=match_result.get('confidence', 0.0)
)
# Créer le résultat de succès
result = VWBActionResult(
action_id=self.action_id,
step_id=step_id,
status=VWBActionStatus.SUCCESS,
start_time=start_time,
end_time=end_time,
execution_time_ms=execution_time,
output_data={
'click_coordinates': click_coordinates,
'click_type': self.click_type,
'anchor_confidence': match_result.get('confidence', 0.0),
'anchor_found': True
},
evidence_list=[interaction_evidence]
)
print(f"✅ Clic réussi sur {self.visual_anchor.name} en {execution_time:.1f}ms")
return result
except Exception as e:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message=f"Erreur lors du clic: {str(e)}",
technical_details={'exception': str(e)}
)
def _capture_current_screen(self) -> Optional[Dict[str, Any]]:
"""Capture l'écran actuel avec métadonnées."""
try:
# Utiliser la méthode ultra stable (Option A)
img_array = self.screen_capturer.capture()
if img_array is None:
return None
from PIL import Image
import base64
import io
# Convertir en PIL Image
pil_image = Image.fromarray(img_array)
# Convertir en base64 pour stockage
buffer = io.BytesIO()
pil_image.save(buffer, format='PNG', optimize=True)
screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return {
'image_array': img_array,
'pil_image': pil_image,
'screenshot_base64': screenshot_base64,
'width': pil_image.width,
'height': pil_image.height,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
print(f"❌ Erreur capture d'écran: {e}")
return None
def _find_visual_anchor(self, screenshot_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Localise l'ancre visuelle dans le screenshot.
Args:
screenshot_data: Données du screenshot
Returns:
Résultat de la recherche avec coordonnées si trouvé
"""
search_start = time.time()
try:
# Simulation de recherche d'ancre visuelle
# Dans une implémentation complète, ceci utiliserait:
# - Template matching pour ancres image
# - OCR pour ancres texte
# - Embeddings CLIP pour recherche sémantique
pil_image = screenshot_data['pil_image']
# Pour cette implémentation de base, simuler une recherche réussie
# au centre de l'écran avec une confiance élevée
center_x = pil_image.width // 2
center_y = pil_image.height // 2
# Simuler un délai de recherche réaliste
search_delay = min(self.search_timeout_ms / 1000.0, 0.5)
time.sleep(search_delay)
search_time_ms = (time.time() - search_start) * 1000
# Vérifier si l'ancre a une bounding box définie
if self.visual_anchor.has_bounding_box():
# Utiliser la zone de recherche adaptée à la résolution
search_area = self.visual_anchor.get_search_area(
pil_image.width,
pil_image.height
)
if search_area:
center_x = search_area['x'] + search_area['width'] // 2
center_y = search_area['y'] + search_area['height'] // 2
# Simuler une confiance basée sur le seuil configuré
confidence = min(self.confidence_threshold + 0.1, 0.95)
return {
'found': confidence >= self.confidence_threshold,
'confidence': confidence,
'match_box': {
'x': center_x - 50,
'y': center_y - 25,
'width': 100,
'height': 50
},
'center_coordinates': {'x': center_x, 'y': center_y},
'search_time_ms': search_time_ms,
'method': 'simulated_template_matching'
}
except Exception as e:
search_time_ms = (time.time() - search_start) * 1000
return {
'found': False,
'error': str(e),
'search_time_ms': search_time_ms
}
def _calculate_click_coordinates(self, match_result: Dict[str, Any]) -> Dict[str, int]:
"""
Calcule les coordonnées finales de clic avec offsets.
Args:
match_result: Résultat de la recherche d'ancre
Returns:
Coordonnées de clic finales
"""
center_coords = match_result['center_coordinates']
final_x = int(center_coords['x'] + self.click_offset_x)
final_y = int(center_coords['y'] + self.click_offset_y)
return {
'x': final_x,
'y': final_y,
'offset_x': self.click_offset_x,
'offset_y': self.click_offset_y
}
def _perform_click(self, coordinates: Dict[str, int]) -> bool:
"""
Effectue le clic aux coordonnées spécifiées.
Args:
coordinates: Coordonnées de clic
Returns:
True si le clic a réussi
"""
try:
x, y = coordinates['x'], coordinates['y']
# Utiliser le Humanizer si activé (anti-détection Citrix/VDI)
if self.humanize:
try:
from ...utils.humanizer import Humanizer, HumanProfile
# Sélectionner le profil
profile_map = {
'fast': HumanProfile.FAST,
'normal': HumanProfile.NORMAL,
'slow': HumanProfile.SLOW,
'stealth': HumanProfile.STEALTH,
}
profile = profile_map.get(self.humanize_profile, HumanProfile.NORMAL)
humanizer = Humanizer(profile=profile)
if self.click_type == 'left':
real_x, real_y = humanizer.click(x, y, button='left')
elif self.click_type == 'right':
real_x, real_y = humanizer.click(x, y, button='right')
elif self.click_type == 'double':
real_x, real_y = humanizer.double_click(x, y)
print(f"🖱️ Clic humanisé {self.click_type} à ({real_x}, {real_y}) [cible: {x}, {y}]")
return True
except ImportError as e:
print(f"⚠️ Humanizer non disponible: {e}, fallback pyautogui direct")
# Fallback: pyautogui direct (sans humanisation)
try:
import pyautogui
if self.click_type == 'left':
pyautogui.click(x, y)
elif self.click_type == 'right':
pyautogui.rightClick(x, y)
elif self.click_type == 'double':
pyautogui.doubleClick(x, y)
print(f"🖱️ Clic {self.click_type} effectué à ({x}, {y})")
return True
except ImportError:
print("⚠️ pyautogui non disponible - simulation du clic")
return True
except Exception as e:
print(f"❌ Erreur lors du clic: {e}")
return False
def get_action_info(self) -> Dict[str, Any]:
"""Retourne les informations de l'action pour l'interface."""
return {
'action_id': self.action_id,
'name': self.name,
'description': self.description,
'type': 'click_anchor',
'parameters': {
'anchor_name': self.visual_anchor.name if self.visual_anchor else 'Non définie',
'click_type': self.click_type,
'confidence_threshold': self.confidence_threshold,
'click_offset': {
'x': self.click_offset_x,
'y': self.click_offset_y
},
'wait_after_click_ms': self.wait_after_click_ms
},
'status': self.current_status.value,
'anchor_reliable': self.visual_anchor.is_reliable() if self.visual_anchor else False
}

View File

@@ -0,0 +1,452 @@
"""
Action VWB - Saisie de Texte
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Cette action permet de saisir du texte dans un champ identifié par une ancre visuelle
dans le Visual Workflow Builder.
Classes :
- VWBTypeTextAction : Action de saisie de texte
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
import time
# Import des modules de base
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
from ...contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error
from ...contracts.evidence import VWBEvidenceType, create_interaction_evidence
from ...contracts.visual_anchor import VWBVisualAnchor
class VWBTypeTextAction(BaseVWBAction):
"""
Action de saisie de texte VWB.
Cette action localise un champ de saisie à partir d'une ancre visuelle
et y saisit le texte spécifié. Elle peut optionnellement cliquer sur le champ
avant la saisie et valider après.
"""
def __init__(
self,
action_id: str,
parameters: Dict[str, Any],
screen_capturer=None
):
"""
Initialise l'action de saisie de texte.
Args:
action_id: Identifiant unique de l'action
parameters: Paramètres incluant l'ancre et le texte
screen_capturer: Instance du ScreenCapturer (Option A thread-safe)
"""
super().__init__(
action_id=action_id,
name="Saisie de Texte",
description="Saisit du texte dans un champ identifié par une ancre visuelle",
parameters=parameters,
screen_capturer=screen_capturer
)
# Paramètres spécifiques à la saisie
self.visual_anchor: Optional[VWBVisualAnchor] = parameters.get('visual_anchor')
self.text_to_type = parameters.get('text_to_type', '')
self.clear_field_first = parameters.get('clear_field_first', True)
self.click_before_typing = parameters.get('click_before_typing', True)
self.press_enter_after = parameters.get('press_enter_after', False)
self.typing_speed_ms = parameters.get('typing_speed_ms', 50) # Délai entre caractères
self.wait_after_typing_ms = parameters.get('wait_after_typing_ms', 500)
# Configuration de matching
self.confidence_threshold = parameters.get('confidence_threshold', 0.8)
self.search_timeout_ms = parameters.get('search_timeout_ms', 5000)
# Humanisation (anti-détection Citrix/VDI)
self.humanize = parameters.get('humanize', True)
self.humanize_profile = parameters.get('humanize_profile', 'normal')
def validate_parameters(self) -> List[str]:
"""Valide les paramètres de l'action de saisie."""
errors = []
# Vérifier l'ancre visuelle
if not self.visual_anchor:
errors.append("Ancre visuelle requise")
elif not isinstance(self.visual_anchor, VWBVisualAnchor):
errors.append("Ancre visuelle invalide")
elif not self.visual_anchor.is_active:
errors.append("Ancre visuelle inactive")
# Vérifier le texte à saisir
if not isinstance(self.text_to_type, str):
errors.append("Le texte à saisir doit être une chaîne")
# Vérifier les paramètres booléens
if not isinstance(self.clear_field_first, bool):
errors.append("clear_field_first doit être un booléen")
if not isinstance(self.click_before_typing, bool):
errors.append("click_before_typing doit être un booléen")
if not isinstance(self.press_enter_after, bool):
errors.append("press_enter_after doit être un booléen")
# Vérifier les délais
if not isinstance(self.typing_speed_ms, (int, float)) or self.typing_speed_ms < 0:
errors.append("typing_speed_ms doit être un nombre positif")
if not isinstance(self.wait_after_typing_ms, (int, float)) or self.wait_after_typing_ms < 0:
errors.append("wait_after_typing_ms doit être un nombre positif")
# Vérifier le seuil de confiance
if not (0.0 <= self.confidence_threshold <= 1.0):
errors.append("Seuil de confiance doit être entre 0.0 et 1.0")
# Vérifier le ScreenCapturer
if not self.screen_capturer:
errors.append("ScreenCapturer requis pour la capture d'écran")
return errors
def execute_core(self, step_id: str) -> VWBActionResult:
"""
Exécute l'action de saisie de texte.
Args:
step_id: Identifiant de l'étape
Returns:
Résultat d'exécution
"""
start_time = datetime.now()
try:
# Étape 1: Capturer l'écran actuel
print(f"🔍 Recherche du champ de saisie: {self.visual_anchor.name}")
current_screenshot = self._capture_current_screen()
if current_screenshot is None:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SCREEN_CAPTURE_FAILED,
message="Impossible de capturer l'écran actuel"
)
# Étape 2: Localiser le champ de saisie
match_result = self._find_input_field(current_screenshot)
if not match_result['found']:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.ELEMENT_NOT_FOUND,
message=f"Champ de saisie '{self.visual_anchor.name}' non trouvé",
technical_details=match_result
)
# Étape 3: Cliquer sur le champ si nécessaire
if self.click_before_typing:
click_coordinates = self._calculate_click_coordinates(match_result)
click_success = self._perform_click(click_coordinates)
if not click_success:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.CLICK_FAILED,
message="Impossible de cliquer sur le champ de saisie",
technical_details={'coordinates': click_coordinates}
)
# Attendre que le champ soit actif
time.sleep(0.2)
# Étape 4: Vider le champ si nécessaire
if self.clear_field_first:
self._clear_field()
# Étape 5: Saisir le texte
typing_success = self._type_text()
if not typing_success:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.TYPE_TEXT_FAILED,
message="Échec de la saisie de texte",
technical_details={'text_length': len(self.text_to_type)}
)
# Étape 6: Appuyer sur Entrée si nécessaire
if self.press_enter_after:
self._press_enter()
# Étape 7: Attendre après la saisie
if self.wait_after_typing_ms > 0:
time.sleep(self.wait_after_typing_ms / 1000.0)
# Étape 8: Mettre à jour les statistiques de l'ancre
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds() * 1000
self.visual_anchor.update_usage_stats(execution_time, True)
# Créer l'evidence d'interaction
interaction_evidence = create_interaction_evidence(
action_id=self.action_id,
step_id=step_id,
evidence_type=VWBEvidenceType.TYPE_EVIDENCE,
title=f"Saisie dans {self.visual_anchor.name}",
interaction_data={
'anchor_id': self.visual_anchor.anchor_id,
'anchor_name': self.visual_anchor.name,
'text_typed': self.text_to_type,
'text_length': len(self.text_to_type),
'cleared_first': self.clear_field_first,
'clicked_before': self.click_before_typing,
'pressed_enter': self.press_enter_after,
'confidence_score': match_result.get('confidence', 0.0),
'search_time_ms': match_result.get('search_time_ms', 0),
'match_box': match_result.get('match_box')
},
confidence_score=match_result.get('confidence', 0.0)
)
# Créer le résultat de succès
result = VWBActionResult(
action_id=self.action_id,
step_id=step_id,
status=VWBActionStatus.SUCCESS,
start_time=start_time,
end_time=end_time,
execution_time_ms=execution_time,
output_data={
'text_typed': self.text_to_type,
'text_length': len(self.text_to_type),
'field_cleared': self.clear_field_first,
'enter_pressed': self.press_enter_after,
'anchor_confidence': match_result.get('confidence', 0.0),
'field_found': True
},
evidence_list=[interaction_evidence]
)
print(f"✅ Saisie réussie dans {self.visual_anchor.name} en {execution_time:.1f}ms")
return result
except Exception as e:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message=f"Erreur lors de la saisie: {str(e)}",
technical_details={'exception': str(e)}
)
def _capture_current_screen(self) -> Optional[Dict[str, Any]]:
"""Capture l'écran actuel avec métadonnées."""
try:
# Utiliser la méthode ultra stable (Option A)
img_array = self.screen_capturer.capture()
if img_array is None:
return None
from PIL import Image
import base64
import io
# Convertir en PIL Image
pil_image = Image.fromarray(img_array)
# Convertir en base64 pour stockage
buffer = io.BytesIO()
pil_image.save(buffer, format='PNG', optimize=True)
screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return {
'image_array': img_array,
'pil_image': pil_image,
'screenshot_base64': screenshot_base64,
'width': pil_image.width,
'height': pil_image.height,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
print(f"❌ Erreur capture d'écran: {e}")
return None
def _find_input_field(self, screenshot_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Localise le champ de saisie dans le screenshot.
Args:
screenshot_data: Données du screenshot
Returns:
Résultat de la recherche avec coordonnées si trouvé
"""
search_start = time.time()
try:
pil_image = screenshot_data['pil_image']
# Simuler un délai de recherche réaliste
search_delay = min(self.search_timeout_ms / 1000.0, 0.3)
time.sleep(search_delay)
search_time_ms = (time.time() - search_start) * 1000
# Simuler la localisation du champ
center_x = pil_image.width // 2
center_y = pil_image.height // 2
# Vérifier si l'ancre a une bounding box définie
if self.visual_anchor.has_bounding_box():
search_area = self.visual_anchor.get_search_area(
pil_image.width,
pil_image.height
)
if search_area:
center_x = search_area['x'] + search_area['width'] // 2
center_y = search_area['y'] + search_area['height'] // 2
# Simuler une confiance basée sur le seuil configuré
confidence = min(self.confidence_threshold + 0.05, 0.92)
return {
'found': confidence >= self.confidence_threshold,
'confidence': confidence,
'match_box': {
'x': center_x - 75,
'y': center_y - 15,
'width': 150,
'height': 30
},
'center_coordinates': {'x': center_x, 'y': center_y},
'search_time_ms': search_time_ms,
'method': 'simulated_input_field_detection'
}
except Exception as e:
search_time_ms = (time.time() - search_start) * 1000
return {
'found': False,
'error': str(e),
'search_time_ms': search_time_ms
}
def _calculate_click_coordinates(self, match_result: Dict[str, Any]) -> Dict[str, int]:
"""Calcule les coordonnées de clic sur le champ."""
center_coords = match_result['center_coordinates']
return {
'x': int(center_coords['x']),
'y': int(center_coords['y'])
}
def _perform_click(self, coordinates: Dict[str, int]) -> bool:
"""Effectue un clic sur le champ de saisie."""
try:
try:
import pyautogui
pyautogui.click(coordinates['x'], coordinates['y'])
print(f"🖱️ Clic sur le champ à ({coordinates['x']}, {coordinates['y']})")
return True
except ImportError:
print("⚠️ pyautogui non disponible - simulation du clic")
return True
except Exception as e:
print(f"❌ Erreur lors du clic: {e}")
return False
def _clear_field(self):
"""Vide le champ de saisie."""
try:
try:
import pyautogui
# Sélectionner tout le texte et le supprimer
pyautogui.hotkey('ctrl', 'a')
time.sleep(0.1)
pyautogui.press('delete')
print("🧹 Champ vidé")
except ImportError:
print("⚠️ pyautogui non disponible - simulation du vidage")
except Exception as e:
print(f"⚠️ Erreur lors du vidage: {e}")
def _type_text(self) -> bool:
"""Saisit le texte avec comportement humain."""
try:
# Utiliser le Humanizer si activé (anti-détection Citrix/VDI)
if self.humanize:
try:
from ...utils.humanizer import Humanizer, HumanProfile
profile_map = {
'fast': HumanProfile.FAST,
'normal': HumanProfile.NORMAL,
'slow': HumanProfile.SLOW,
'stealth': HumanProfile.STEALTH,
}
profile = profile_map.get(self.humanize_profile, HumanProfile.NORMAL)
humanizer = Humanizer(profile=profile)
typed = humanizer.type_text(self.text_to_type)
print(f"⌨️ Texte humanisé: '{typed}' ({len(typed)} caractères)")
return True
except ImportError as e:
print(f"⚠️ Humanizer non disponible: {e}, fallback pyautogui direct")
# Fallback: pyautogui direct
try:
import pyautogui
for char in self.text_to_type:
pyautogui.write(char)
if self.typing_speed_ms > 0:
time.sleep(self.typing_speed_ms / 1000.0)
print(f"⌨️ Texte saisi: '{self.text_to_type}' ({len(self.text_to_type)} caractères)")
return True
except ImportError:
print(f"⚠️ pyautogui non disponible - simulation: '{self.text_to_type}'")
time.sleep(len(self.text_to_type) * self.typing_speed_ms / 1000.0)
return True
except Exception as e:
print(f"❌ Erreur lors de la saisie: {e}")
return False
def _press_enter(self):
"""Appuie sur la touche Entrée."""
try:
try:
import pyautogui
pyautogui.press('enter')
print("⏎ Touche Entrée pressée")
except ImportError:
print("⚠️ pyautogui non disponible - simulation de la touche Entrée")
except Exception as e:
print(f"⚠️ Erreur lors de l'appui sur Entrée: {e}")
def get_action_info(self) -> Dict[str, Any]:
"""Retourne les informations de l'action pour l'interface."""
return {
'action_id': self.action_id,
'name': self.name,
'description': self.description,
'type': 'type_text',
'parameters': {
'anchor_name': self.visual_anchor.name if self.visual_anchor else 'Non définie',
'text_to_type': self.text_to_type,
'text_length': len(self.text_to_type),
'clear_field_first': self.clear_field_first,
'click_before_typing': self.click_before_typing,
'press_enter_after': self.press_enter_after,
'typing_speed_ms': self.typing_speed_ms,
'confidence_threshold': self.confidence_threshold
},
'status': self.current_status.value,
'anchor_reliable': self.visual_anchor.is_reliable() if self.visual_anchor else False
}

View File

@@ -0,0 +1,573 @@
"""
Humanizer - Simulation de comportement humain pour anti-détection
Auteur : Dom, Claude - 14 janvier 2026
Ce module ajoute un "flou gaussien comportemental" aux actions RPA
pour éviter la détection par les systèmes anti-robot (Citrix, etc.).
Principes :
- Mouvements de souris avec courbes naturelles
- Positions de clic avec légère imprécision
- Délais variables entre les actions
- Frappe avec rythme humain et micro-erreurs
"""
import random
import time
import math
from typing import Tuple, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
class HumanProfile(Enum):
"""Profils de comportement humain."""
FAST = "fast" # Utilisateur expérimenté, rapide
NORMAL = "normal" # Utilisateur standard
SLOW = "slow" # Utilisateur prudent/débutant
STEALTH = "stealth" # Maximum d'humanisation (environnements très surveillés)
@dataclass
class HumanConfig:
"""Configuration du comportement humain."""
# Mouvement souris
mouse_speed_base: float = 0.3 # Durée base du mouvement (secondes)
mouse_speed_variance: float = 0.15 # Variance de la durée
mouse_curve_intensity: float = 0.3 # Intensité de la courbe (0-1)
mouse_micro_movements: bool = True # Micro-tremblements
# Position clic
click_offset_max: int = 8 # Décalage max en pixels
click_offset_sigma: float = 3.0 # Écart-type du décalage gaussien
# Timing
delay_base_ms: int = 200 # Délai base entre actions (ms)
delay_variance_ratio: float = 0.4 # Ratio de variance (40%)
pause_probability: float = 0.05 # Probabilité de pause réflexion
pause_duration_ms: Tuple[int, int] = (500, 2000) # Durée pause min/max
# Frappe clavier
typing_speed_cpm: int = 300 # Caractères par minute
typing_speed_variance: float = 0.25 # Variance vitesse
typo_probability: float = 0.02 # Probabilité de faute de frappe
typo_correct: bool = True # Corriger les fautes
# Profils prédéfinis
PROFILES = {
HumanProfile.FAST: HumanConfig(
mouse_speed_base=0.15,
mouse_speed_variance=0.08,
mouse_curve_intensity=0.15,
click_offset_max=5,
click_offset_sigma=2.0,
delay_base_ms=100,
delay_variance_ratio=0.3,
pause_probability=0.02,
typing_speed_cpm=400,
typing_speed_variance=0.2,
typo_probability=0.01,
),
HumanProfile.NORMAL: HumanConfig(), # Valeurs par défaut
HumanProfile.SLOW: HumanConfig(
mouse_speed_base=0.5,
mouse_speed_variance=0.2,
mouse_curve_intensity=0.4,
click_offset_max=10,
click_offset_sigma=4.0,
delay_base_ms=400,
delay_variance_ratio=0.5,
pause_probability=0.1,
typing_speed_cpm=200,
typing_speed_variance=0.3,
typo_probability=0.03,
),
HumanProfile.STEALTH: HumanConfig(
mouse_speed_base=0.4,
mouse_speed_variance=0.25,
mouse_curve_intensity=0.5,
mouse_micro_movements=True,
click_offset_max=12,
click_offset_sigma=4.5,
delay_base_ms=350,
delay_variance_ratio=0.5,
pause_probability=0.15,
pause_duration_ms=(800, 3000),
typing_speed_cpm=250,
typing_speed_variance=0.35,
typo_probability=0.025,
typo_correct=True,
),
}
class Humanizer:
"""
Classe principale pour humaniser les actions RPA.
Usage:
humanizer = Humanizer(profile=HumanProfile.NORMAL)
# Mouvement souris humanisé
humanizer.move_to(500, 300)
# Clic avec position légèrement décalée
humanizer.click(500, 300)
# Frappe avec rythme humain
humanizer.type_text("Hello World")
# Délai humanisé
humanizer.wait()
"""
def __init__(
self,
profile: HumanProfile = HumanProfile.NORMAL,
config: Optional[HumanConfig] = None,
enabled: bool = True
):
"""
Initialise le Humanizer.
Args:
profile: Profil de comportement prédéfini
config: Configuration personnalisée (override le profil)
enabled: Active/désactive l'humanisation
"""
self.enabled = enabled
self.config = config if config else PROFILES.get(profile, PROFILES[HumanProfile.NORMAL])
self._pyautogui = None
def _get_pyautogui(self):
"""Import lazy de pyautogui."""
if self._pyautogui is None:
import pyautogui
self._pyautogui = pyautogui
return self._pyautogui
# =========================================================================
# FONCTIONS MATHÉMATIQUES
# =========================================================================
def _gaussian(self, mu: float = 0, sigma: float = 1) -> float:
"""Distribution gaussienne."""
return random.gauss(mu, sigma)
def _clamp(self, value: float, min_val: float, max_val: float) -> float:
"""Limite une valeur entre min et max."""
return max(min_val, min(value, max_val))
def _bezier_point(
self,
t: float,
p0: Tuple[float, float],
p1: Tuple[float, float],
p2: Tuple[float, float],
p3: Tuple[float, float]
) -> Tuple[float, float]:
"""Calcule un point sur une courbe de Bézier cubique."""
u = 1 - t
tt = t * t
uu = u * u
uuu = uu * u
ttt = tt * t
x = uuu * p0[0] + 3 * uu * t * p1[0] + 3 * u * tt * p2[0] + ttt * p3[0]
y = uuu * p0[1] + 3 * uu * t * p1[1] + 3 * u * tt * p2[1] + ttt * p3[1]
return (x, y)
def _generate_bezier_path(
self,
start: Tuple[int, int],
end: Tuple[int, int],
num_points: int = 20
) -> List[Tuple[int, int]]:
"""
Génère un chemin de souris avec courbe de Bézier.
Args:
start: Point de départ (x, y)
end: Point d'arrivée (x, y)
num_points: Nombre de points intermédiaires
Returns:
Liste de points (x, y) formant le chemin
"""
# Distance entre les points
dx = end[0] - start[0]
dy = end[1] - start[1]
distance = math.sqrt(dx * dx + dy * dy)
# Points de contrôle avec décalage aléatoire
intensity = self.config.mouse_curve_intensity
offset_range = distance * intensity
# Point de contrôle 1 (près du départ)
ctrl1 = (
start[0] + dx * 0.3 + self._gaussian(0, offset_range * 0.5),
start[1] + dy * 0.3 + self._gaussian(0, offset_range * 0.5)
)
# Point de contrôle 2 (près de l'arrivée)
ctrl2 = (
start[0] + dx * 0.7 + self._gaussian(0, offset_range * 0.3),
start[1] + dy * 0.7 + self._gaussian(0, offset_range * 0.3)
)
# Générer les points sur la courbe
path = []
for i in range(num_points + 1):
t = i / num_points
point = self._bezier_point(t, start, ctrl1, ctrl2, end)
# Ajouter micro-tremblements
if self.config.mouse_micro_movements and i > 0 and i < num_points:
point = (
point[0] + self._gaussian(0, 1),
point[1] + self._gaussian(0, 1)
)
path.append((int(point[0]), int(point[1])))
return path
# =========================================================================
# MOUVEMENT SOURIS
# =========================================================================
def move_to(self, x: int, y: int, duration: Optional[float] = None) -> None:
"""
Déplace la souris vers une position avec mouvement humain.
Args:
x: Position X cible
y: Position Y cible
duration: Durée du mouvement (None = auto)
"""
pyautogui = self._get_pyautogui()
if not self.enabled:
pyautogui.moveTo(x, y)
return
# Position actuelle
current_x, current_y = pyautogui.position()
# Calculer la durée
if duration is None:
base = self.config.mouse_speed_base
variance = self.config.mouse_speed_variance
duration = max(0.05, base + self._gaussian(0, variance))
# Générer le chemin courbe
path = self._generate_bezier_path(
(current_x, current_y),
(x, y),
num_points=max(5, int(duration * 30))
)
# Suivre le chemin
step_duration = duration / len(path)
for point in path:
pyautogui.moveTo(point[0], point[1], _pause=False)
time.sleep(step_duration)
def humanize_position(self, x: int, y: int) -> Tuple[int, int]:
"""
Ajoute un décalage gaussien à une position.
Args:
x: Position X originale
y: Position Y originale
Returns:
Nouvelle position (x, y) avec décalage
"""
if not self.enabled:
return (x, y)
offset_x = int(self._gaussian(0, self.config.click_offset_sigma))
offset_y = int(self._gaussian(0, self.config.click_offset_sigma))
# Limiter le décalage
max_offset = self.config.click_offset_max
offset_x = self._clamp(offset_x, -max_offset, max_offset)
offset_y = self._clamp(offset_y, -max_offset, max_offset)
return (x + int(offset_x), y + int(offset_y))
# =========================================================================
# CLIC
# =========================================================================
def click(
self,
x: int,
y: int,
button: str = 'left',
clicks: int = 1,
move_first: bool = True
) -> Tuple[int, int]:
"""
Effectue un clic avec comportement humain.
Args:
x: Position X
y: Position Y
button: Bouton ('left', 'right', 'middle')
clicks: Nombre de clics
move_first: Déplacer la souris avant de cliquer
Returns:
Position réelle du clic (avec décalage)
"""
pyautogui = self._get_pyautogui()
# Humaniser la position
real_x, real_y = self.humanize_position(x, y)
if not self.enabled:
pyautogui.click(real_x, real_y, clicks=clicks, button=button)
return (real_x, real_y)
# Déplacer la souris d'abord (mouvement naturel)
if move_first:
self.move_to(real_x, real_y)
# Petit délai avant le clic (réflexe humain)
time.sleep(random.uniform(0.02, 0.08))
# Clic
if clicks == 2:
# Double-clic avec intervalle variable
pyautogui.click(real_x, real_y, button=button)
time.sleep(random.uniform(0.08, 0.15))
pyautogui.click(real_x, real_y, button=button)
else:
pyautogui.click(real_x, real_y, clicks=clicks, button=button)
return (real_x, real_y)
def double_click(self, x: int, y: int) -> Tuple[int, int]:
"""Double-clic humanisé."""
return self.click(x, y, clicks=2)
def right_click(self, x: int, y: int) -> Tuple[int, int]:
"""Clic droit humanisé."""
return self.click(x, y, button='right')
# =========================================================================
# FRAPPE CLAVIER
# =========================================================================
def type_text(
self,
text: str,
field_x: Optional[int] = None,
field_y: Optional[int] = None
) -> str:
"""
Tape du texte avec rythme humain.
Args:
text: Texte à taper
field_x: Position X du champ (optionnel, pour cliquer avant)
field_y: Position Y du champ (optionnel)
Returns:
Texte réellement tapé (peut différer si typos non corrigées)
"""
pyautogui = self._get_pyautogui()
if not self.enabled:
if field_x is not None and field_y is not None:
pyautogui.click(field_x, field_y)
pyautogui.typewrite(text, interval=0.05)
return text
# Cliquer sur le champ si position fournie
if field_x is not None and field_y is not None:
self.click(field_x, field_y)
self.wait(100, 200)
# Calculer l'intervalle moyen entre les touches
cpm = self.config.typing_speed_cpm
base_interval = 60.0 / cpm # secondes par caractère
typed_text = ""
i = 0
while i < len(text):
char = text[i]
# Vérifier si on fait une faute de frappe
if random.random() < self.config.typo_probability and char.isalpha():
# Taper une mauvaise lettre
wrong_char = self._get_nearby_key(char)
pyautogui.press(wrong_char)
typed_text += wrong_char
# Pause de réalisation de l'erreur
time.sleep(random.uniform(0.1, 0.3))
if self.config.typo_correct:
# Corriger avec backspace
pyautogui.press('backspace')
typed_text = typed_text[:-1]
time.sleep(random.uniform(0.05, 0.15))
# Taper le bon caractère
pyautogui.press(char) if len(char) == 1 else pyautogui.typewrite(char)
typed_text += char
# Intervalle variable
variance = self.config.typing_speed_variance
interval = base_interval * (1 + self._gaussian(0, variance))
interval = max(0.02, interval)
# Pause occasionnelle (réflexion)
if random.random() < 0.02: # 2% de chance de pause
interval += random.uniform(0.2, 0.5)
time.sleep(interval)
i += 1
return typed_text
def _get_nearby_key(self, char: str) -> str:
"""Retourne une touche proche sur le clavier (pour simuler une typo)."""
keyboard_neighbors = {
'a': 'sqzw', 'b': 'vghn', 'c': 'xdfv', 'd': 'erfcxs',
'e': 'rdsw', 'f': 'rtgvcd', 'g': 'tyhbvf', 'h': 'yujnbg',
'i': 'uojk', 'j': 'uikmnh', 'k': 'iolmj', 'l': 'opkm',
'm': 'njk', 'n': 'bhjm', 'o': 'iplk', 'p': 'ol',
'q': 'wa', 'r': 'etfd', 's': 'wedxa', 't': 'ryfg',
'u': 'yihj', 'v': 'cfgb', 'w': 'qeas', 'x': 'zsdc',
'y': 'tugh', 'z': 'asx'
}
char_lower = char.lower()
if char_lower in keyboard_neighbors:
neighbors = keyboard_neighbors[char_lower]
wrong = random.choice(neighbors)
return wrong.upper() if char.isupper() else wrong
return char
# =========================================================================
# DÉLAIS
# =========================================================================
def wait(
self,
min_ms: Optional[int] = None,
max_ms: Optional[int] = None
) -> float:
"""
Attend avec un délai humanisé.
Args:
min_ms: Délai minimum en ms (None = config)
max_ms: Délai maximum en ms (None = calculé)
Returns:
Durée réelle de l'attente (secondes)
"""
if not self.enabled:
base = min_ms if min_ms else self.config.delay_base_ms
time.sleep(base / 1000)
return base / 1000
# Calculer le délai
base = min_ms if min_ms else self.config.delay_base_ms
variance = self.config.delay_variance_ratio
delay_ms = base + self._gaussian(0, base * variance)
delay_ms = max(50, delay_ms) # Minimum 50ms
if max_ms:
delay_ms = min(delay_ms, max_ms)
# Vérifier si on fait une pause réflexion
if random.random() < self.config.pause_probability:
pause_min, pause_max = self.config.pause_duration_ms
delay_ms += random.uniform(pause_min, pause_max)
delay_sec = delay_ms / 1000
time.sleep(delay_sec)
return delay_sec
def random_pause(self, probability: float = 0.1) -> bool:
"""
Fait une pause aléatoire avec une certaine probabilité.
Args:
probability: Probabilité de pause (0-1)
Returns:
True si une pause a été effectuée
"""
if random.random() < probability:
pause_min, pause_max = self.config.pause_duration_ms
duration = random.uniform(pause_min, pause_max) / 1000
time.sleep(duration)
return True
return False
# =========================================================================
# INSTANCE GLOBALE ET FONCTIONS UTILITAIRES
# =========================================================================
# Instance globale par défaut
_default_humanizer: Optional[Humanizer] = None
def get_humanizer(profile: HumanProfile = HumanProfile.NORMAL) -> Humanizer:
"""Obtient l'instance globale du Humanizer."""
global _default_humanizer
if _default_humanizer is None:
_default_humanizer = Humanizer(profile=profile)
return _default_humanizer
def set_humanizer_profile(profile: HumanProfile) -> None:
"""Change le profil du Humanizer global."""
global _default_humanizer
_default_humanizer = Humanizer(profile=profile)
def set_humanizer_enabled(enabled: bool) -> None:
"""Active/désactive l'humanisation globale."""
humanizer = get_humanizer()
humanizer.enabled = enabled
# Fonctions raccourcies pour usage direct
def humanize_move(x: int, y: int, duration: Optional[float] = None) -> None:
"""Mouvement souris humanisé (fonction raccourcie)."""
get_humanizer().move_to(x, y, duration)
def humanize_click(x: int, y: int, button: str = 'left') -> Tuple[int, int]:
"""Clic humanisé (fonction raccourcie)."""
return get_humanizer().click(x, y, button=button)
def humanize_type(text: str) -> str:
"""Frappe humanisée (fonction raccourcie)."""
return get_humanizer().type_text(text)
def humanize_wait(min_ms: int = 200, max_ms: int = 500) -> float:
"""Attente humanisée (fonction raccourcie)."""
return get_humanizer().wait(min_ms, max_ms)
def humanize_position(x: int, y: int) -> Tuple[int, int]:
"""Position avec décalage gaussien (fonction raccourcie)."""
return get_humanizer().humanize_position(x, y)