diff --git a/deploy/configs/config_dev_windows.txt b/deploy/configs/config_dev_windows.txt new file mode 100644 index 000000000..e2da845c9 --- /dev/null +++ b/deploy/configs/config_dev_windows.txt @@ -0,0 +1,19 @@ +# ============================================================ +# Configuration Lea — Poste Dev / Chef de projet (Windows) +# ============================================================ +# +# Poste : PC dev chef de projet +# Objectif : enrichir connaissance Windows, evaluer robustesse +# Serveur : 192.168.1.40:5005 (RTX 5070) +# +# ============================================================ + +RPA_SERVER_URL=http://192.168.1.40:5005/api/v1 +RPA_API_TOKEN=86031addb338e449fccdb1a983f61807aec15d42d482b9c7748ad607dc23caab +RPA_MACHINE_ID=DEV_WINDOWS +RPA_USER_LABEL=Dev + +# --- Parametres avances (ne pas modifier sauf indication) --- +# RPA_OLLAMA_HOST=localhost +RPA_BLUR_SENSITIVE=false +RPA_LOG_RETENTION_DAYS=180 diff --git a/deploy/configs/config_tim_pauline.txt b/deploy/configs/config_tim_pauline.txt new file mode 100644 index 000000000..b44e015ab --- /dev/null +++ b/deploy/configs/config_tim_pauline.txt @@ -0,0 +1,19 @@ +# ============================================================ +# Configuration Lea — Poste TIM Pauline (LAN Anoust) +# ============================================================ +# +# Poste : PC de Pauline (TIM urgences) +# Objectif : apprentissage outil metier (DPI OSIRIS) +# Serveur : 192.168.1.40:5005 (RTX 5070) +# +# ============================================================ + +RPA_SERVER_URL=http://192.168.1.40:5005/api/v1 +RPA_API_TOKEN=86031addb338e449fccdb1a983f61807aec15d42d482b9c7748ad607dc23caab +RPA_MACHINE_ID=TIM_PAULINE +RPA_USER_LABEL=Pauline + +# --- Parametres avances (ne pas modifier sauf indication) --- +# RPA_OLLAMA_HOST=localhost +RPA_BLUR_SENSITIVE=true +RPA_LOG_RETENTION_DAYS=180 diff --git a/visual_workflow_builder/backend/actions/vision_ui/extract_text.py b/visual_workflow_builder/backend/actions/vision_ui/extract_text.py index 51f1f1d90..13b2141a9 100644 --- a/visual_workflow_builder/backend/actions/vision_ui/extract_text.py +++ b/visual_workflow_builder/backend/actions/vision_ui/extract_text.py @@ -12,6 +12,7 @@ from datetime import datetime import time import traceback import re +import os from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus from ...contracts.error import VWBActionError, VWBErrorType, VWBErrorSeverity, create_vwb_error @@ -435,14 +436,48 @@ class VWBExtractTextAction(BaseVWBAction): return None def _find_visual_element(self, screenshot, visual_anchor, threshold): - """Simulation de recherche d'élément visuel.""" - import random - confidence = random.uniform(0.6, 0.95) - - if confidence >= threshold: - return True, {'x': 300, 'y': 200, 'width': 250, 'height': 80}, confidence - else: - return False, {}, confidence + """Recherche d'élément visuel via template matching.""" + try: + from ...catalog_routes import find_visual_anchor_on_screen + + image_ancre = None + bounding_box = None + + if isinstance(visual_anchor, VWBVisualAnchor): + image_ancre = visual_anchor.screenshot_base64 + if visual_anchor.has_bounding_box(): + bounding_box = visual_anchor.bounding_box + elif isinstance(visual_anchor, dict): + image_ancre = visual_anchor.get('screenshot') or visual_anchor.get('image_base64') + bounding_box = visual_anchor.get('bounding_box') + + if image_ancre: + resultat = find_visual_anchor_on_screen( + anchor_image_base64=image_ancre, + confidence_threshold=threshold, + bounding_box=bounding_box + ) + if resultat and resultat.get('found'): + coords = { + 'x': resultat.get('x', resultat.get('center_x', 0)), + 'y': resultat.get('y', resultat.get('center_y', 0)), + 'width': resultat.get('width', 200), + 'height': resultat.get('height', 80) + } + return True, coords, resultat.get('confidence', 0.9) + + if bounding_box: + return True, bounding_box, 0.7 + + return False, {}, 0.0 + + except ImportError: + if hasattr(visual_anchor, 'bounding_box') and visual_anchor.bounding_box: + return True, visual_anchor.bounding_box, 0.7 + return False, {}, 0.0 + except Exception as e: + print(f"⚠️ Erreur recherche visuelle: {e}") + return False, {}, 0.0 def _encode_screenshot(self, screenshot_data) -> str: """Encode un screenshot en base64.""" @@ -485,21 +520,28 @@ class VWBExtractTextAction(BaseVWBAction): } def _extract_image_region(self, screenshot_data, coords: Dict[str, int]): - """ - Extrait une région spécifique de l'image. - - Args: - screenshot_data: Données de l'image complète - coords: Coordonnées de la région - - Returns: - Image de la région ou None - """ + """Extrait une région spécifique de l'image.""" try: - # Ici, on utiliserait PIL ou OpenCV pour extraire la région - # Pour la simulation, on retourne un objet factice - print(f"✂️ Extraction région {coords['width']}x{coords['height']}") - return {"width": coords['width'], "height": coords['height'], "data": "simulated"} + from PIL import Image + import numpy as np + + x = int(coords.get('x', 0)) + y = int(coords.get('y', 0)) + w = int(coords.get('width', 100)) + h = int(coords.get('height', 100)) + + if isinstance(screenshot_data, np.ndarray): + pil_image = Image.fromarray(screenshot_data) + elif isinstance(screenshot_data, Image.Image): + pil_image = screenshot_data + else: + print(f"⚠️ Type screenshot non supporté: {type(screenshot_data)}") + return None + + cropped = pil_image.crop((x, y, x + w, y + h)) + print(f"✂️ Extraction région {w}x{h}") + return cropped + except Exception as e: print(f"❌ Erreur extraction région: {e}") return None @@ -533,44 +575,77 @@ class VWBExtractTextAction(BaseVWBAction): return image_data def _perform_ocr_extraction(self, image_data) -> tuple[str, float, Dict[str, Any]]: - """ - Effectue l'extraction OCR sur l'image. - - Args: - image_data: Image prétraitée - - Returns: - Tuple (texte, confiance, structure) - """ + """Effectue l'extraction OCR via Ollama VLM.""" try: - # Simulation d'extraction OCR - # En réalité, on utiliserait pytesseract ou une API OCR - - if self.extraction_mode == 'full': - extracted_text = "Texte exemple extrait par OCR\nLigne 2 du texte\nDernière ligne" - elif self.extraction_mode == 'numbers': - extracted_text = "123456 789 2026" - elif self.extraction_mode == 'words': - extracted_text = "mot1 mot2 mot3 mot4" - elif self.extraction_mode == 'lines': - extracted_text = "Ligne 1\nLigne 2\nLigne 3" + import requests + import json + import io + import base64 + from PIL import Image + + if isinstance(image_data, Image.Image): + buffer = io.BytesIO() + image_data.save(buffer, format='PNG') + image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + elif isinstance(image_data, dict): + return "", 0.0, {} else: - extracted_text = "Texte personnalisé" - - # Confiance simulée - confidence = 0.85 - - # Structure simulée - structure = { - "lines": extracted_text.split('\n') if '\n' in extracted_text else [extracted_text], - "words": extracted_text.split(), - "characters": len(extracted_text), - "language_detected": self.ocr_language + return "", 0.0, {} + + prompt_map = { + 'full': "Extrais TOUT le texte visible dans cette image. Retourne uniquement le texte brut, sans commentaire.", + 'numbers': "Extrais uniquement les nombres et chiffres visibles. Retourne-les séparés par des espaces.", + 'lines': "Extrais tout le texte visible ligne par ligne.", + 'words': "Extrais tous les mots visibles, séparés par des espaces.", } - - print(f"🔤 OCR terminé - Confiance: {confidence:.3f}") - return extracted_text, confidence, structure - + prompt = prompt_map.get(self.extraction_mode, prompt_map['full']) + + ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434") + model = os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b")) + + if 'qwen' in model.lower() and not prompt.startswith('/no_think'): + prompt = f"/no_think\n{prompt}" + + print(f"🔤 OCR VLM avec {model} (mode: {self.extraction_mode})...") + + payload = { + "model": model, + "prompt": prompt, + "images": [image_base64], + "stream": False, + "options": {"temperature": 0.1, "num_predict": 4000} + } + + response = requests.post( + f"{ollama_url}/api/generate", + json=payload, + timeout=60 + ) + + if response.status_code == 200: + result = response.json() + extracted_text = result.get('response', '').strip() + if not extracted_text and result.get('thinking'): + extracted_text = result.get('thinking', '').strip() + + confidence = 0.85 if extracted_text else 0.0 + + structure = { + "lines": extracted_text.split('\n') if '\n' in extracted_text else [extracted_text], + "words": extracted_text.split(), + "characters": len(extracted_text), + "language_detected": self.ocr_language + } + + print(f"✅ OCR terminé - {len(extracted_text)} caractères") + return extracted_text, confidence, structure + else: + print(f"⚠️ Erreur Ollama: {response.status_code}") + return "", 0.0, {} + + except requests.exceptions.ConnectionError: + print("⚠️ Ollama non accessible pour OCR") + return "", 0.0, {} except Exception as e: print(f"❌ Erreur OCR: {e}") return "", 0.0, {} diff --git a/visual_workflow_builder/backend/actions/vision_ui/focus_anchor.py b/visual_workflow_builder/backend/actions/vision_ui/focus_anchor.py index ddb7f32d9..31c13d615 100644 --- a/visual_workflow_builder/backend/actions/vision_ui/focus_anchor.py +++ b/visual_workflow_builder/backend/actions/vision_ui/focus_anchor.py @@ -198,23 +198,70 @@ class VWBFocusAnchorAction(BaseVWBAction): for attempt in range(self.max_attempts): print(f" Tentative {attempt + 1}/{self.max_attempts}") - - # Simulation de recherche d'ancre (à remplacer par vraie implémentation) - import random - confidence = random.uniform(0.6, 0.95) - - if confidence >= self.confidence_threshold: - # Ancre trouvée - match_found = True - best_match = { - 'confidence': confidence, - 'bbox': {'x': 400, 'y': 300, 'width': 120, 'height': 30}, - 'center': {'x': 460, 'y': 315} - } - break - + + try: + from ...catalog_routes import find_visual_anchor_on_screen + + image_ancre = None + bounding_box = None + if isinstance(self.visual_anchor, VWBVisualAnchor): + image_ancre = self.visual_anchor.screenshot_base64 + if self.visual_anchor.has_bounding_box(): + bounding_box = self.visual_anchor.bounding_box + elif isinstance(self.visual_anchor, dict): + image_ancre = self.visual_anchor.get('screenshot') or self.visual_anchor.get('image_base64') + bounding_box = self.visual_anchor.get('bounding_box') + + if image_ancre: + resultat = find_visual_anchor_on_screen( + anchor_image_base64=image_ancre, + confidence_threshold=self.confidence_threshold, + bounding_box=bounding_box + ) + if resultat and resultat.get('found'): + confidence = resultat.get('confidence', 0.9) + cx = resultat.get('center_x', resultat.get('x', 460)) + cy = resultat.get('center_y', resultat.get('y', 315)) + match_found = True + best_match = { + 'confidence': confidence, + 'bbox': { + 'x': resultat.get('x', cx - 60), + 'y': resultat.get('y', cy - 15), + 'width': resultat.get('width', 120), + 'height': resultat.get('height', 30) + }, + 'center': {'x': cx, 'y': cy} + } + break + + if bounding_box: + match_found = True + bx = bounding_box.get('x', 0) + by = bounding_box.get('y', 0) + bw = bounding_box.get('width', 120) + bh = bounding_box.get('height', 30) + best_match = { + 'confidence': 0.7, + 'bbox': bounding_box, + 'center': {'x': bx + bw // 2, 'y': by + bh // 2} + } + break + + except ImportError: + if hasattr(self.visual_anchor, 'bounding_box') and self.visual_anchor.bounding_box: + bb = self.visual_anchor.bounding_box + match_found = True + best_match = { + 'confidence': 0.7, + 'bbox': bb, + 'center': {'x': bb.get('x', 0) + bb.get('width', 0) // 2, + 'y': bb.get('y', 0) + bb.get('height', 0) // 2} + } + break + if attempt < self.max_attempts - 1: - time.sleep(0.5) # Attendre avant nouvelle tentative + time.sleep(0.5) if not match_found: # Ancre non trouvée @@ -334,24 +381,23 @@ class VWBFocusAnchorAction(BaseVWBAction): try: center = match_info['center'] + import pyautogui + if self.focus_method == 'hover': - # Survol de l'élément print(f" Survol à ({center['x']}, {center['y']}) pendant {self.hover_duration_ms}ms") - # Simulation du survol + pyautogui.moveTo(center['x'], center['y'], duration=0.3) time.sleep(self.hover_duration_ms / 1000.0) return True - + elif self.focus_method == 'click_light': - # Clic léger (sans appui prolongé) print(f" Clic léger à ({center['x']}, {center['y']})") - # Simulation du clic léger + pyautogui.click(center['x'], center['y']) time.sleep(0.1) return True - + elif self.focus_method == 'tab': - # Navigation par tabulation (approximative) print(" Navigation par tabulation") - # Simulation de la tabulation + pyautogui.press('tab') time.sleep(0.2) return True diff --git a/visual_workflow_builder/backend/actions/vision_ui/scroll_to_anchor.py b/visual_workflow_builder/backend/actions/vision_ui/scroll_to_anchor.py index 3649faff8..20829967c 100644 --- a/visual_workflow_builder/backend/actions/vision_ui/scroll_to_anchor.py +++ b/visual_workflow_builder/backend/actions/vision_ui/scroll_to_anchor.py @@ -449,14 +449,48 @@ class VWBScrollToAnchorAction(BaseVWBAction): return None def _find_visual_element(self, screenshot, visual_anchor, threshold): - """Simulation de recherche d'élément visuel.""" - import random - confidence = random.uniform(0.6, 0.95) - - if confidence >= threshold: - return True, {'x': 400, 'y': 300, 'width': 200, 'height': 50}, confidence - else: - return False, {}, confidence + """Recherche d'élément visuel via template matching.""" + try: + from ...catalog_routes import find_visual_anchor_on_screen + + image_ancre = None + bounding_box = None + + if isinstance(visual_anchor, VWBVisualAnchor): + image_ancre = visual_anchor.screenshot_base64 + if visual_anchor.has_bounding_box(): + bounding_box = visual_anchor.bounding_box + elif isinstance(visual_anchor, dict): + image_ancre = visual_anchor.get('screenshot') or visual_anchor.get('image_base64') + bounding_box = visual_anchor.get('bounding_box') + + if image_ancre: + resultat = find_visual_anchor_on_screen( + anchor_image_base64=image_ancre, + confidence_threshold=threshold, + bounding_box=bounding_box + ) + if resultat and resultat.get('found'): + coords = { + 'x': resultat.get('x', resultat.get('center_x', 0)), + 'y': resultat.get('y', resultat.get('center_y', 0)), + 'width': resultat.get('width', 200), + 'height': resultat.get('height', 50) + } + return True, coords, resultat.get('confidence', 0.9) + + if bounding_box: + return True, bounding_box, 0.7 + + return False, {}, 0.0 + + except ImportError: + if hasattr(visual_anchor, 'bounding_box') and visual_anchor.bounding_box: + return True, visual_anchor.bounding_box, 0.7 + return False, {}, 0.0 + except Exception as e: + print(f"⚠️ Erreur recherche visuelle: {e}") + return False, {}, 0.0 def _encode_screenshot(self, screenshot_data) -> str: """Encode un screenshot en base64.""" @@ -492,19 +526,18 @@ class VWBScrollToAnchorAction(BaseVWBAction): scroll_y = 0 try: + import pyautogui + if self.scroll_direction in ['vertical', 'both']: - # Défilement vertical vers le bas scroll_y = self.scroll_step_pixels print(f" ⬇️ Défilement vertical: {scroll_y}px") - # En réalité: pyautogui.scroll(-scroll_y) - + pyautogui.scroll(-scroll_y // 100) + if self.scroll_direction in ['horizontal', 'both']: - # Défilement horizontal vers la droite scroll_x = self.scroll_step_pixels print(f" ➡️ Défilement horizontal: {scroll_x}px") - # En réalité: pyautogui.hscroll(scroll_x) - - # Simuler le délai de défilement + pyautogui.hscroll(scroll_x // 100) + time.sleep(0.1) except Exception as e: diff --git a/visual_workflow_builder/backend/api_v3/execute.py b/visual_workflow_builder/backend/api_v3/execute.py index c8d413d76..c43ed80f4 100644 --- a/visual_workflow_builder/backend/api_v3/execute.py +++ b/visual_workflow_builder/backend/api_v3/execute.py @@ -388,7 +388,7 @@ def execute_ai_analyze(params: dict) -> dict: try: prompt = params.get('analysis_prompt', params.get('prompt', '')) - model = params.get('model', params.get('ollama_model', 'qwen3-vl:8b')) + model = params.get('model', params.get('ollama_model', os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b")))) output_variable = params.get('output_variable', 'resultat_analyse') timeout_ms = params.get('timeout_ms', 120000) # 2 minutes par défaut temperature = params.get('temperature', 0.7) # Même défaut que CLI Ollama @@ -532,6 +532,125 @@ def execute_ai_analyze(params: dict) -> dict: return {'success': False, 'error': str(e)} +def execute_extract_text(params: dict) -> dict: + """ + Extrait du texte depuis l'écran via Ollama VLM. + Capture la zone de l'ancre (ou l'écran entier) et demande au VLM d'extraire le texte. + """ + import requests + import re + global _execution_state + + try: + anchor = params.get('visual_anchor', {}) + model = params.get('model', os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b"))) + output_variable = params.get('output_variable', 'texte_extrait') + timeout_ms = params.get('timeout_ms', 60000) + extraction_mode = params.get('extraction_mode', 'full') + text_filters = params.get('text_filters', []) + + screenshot_base64 = anchor.get('screenshot') if anchor else None + + if not screenshot_base64: + try: + from PIL import ImageGrab + import io + + bbox = anchor.get('bounding_box', {}) if anchor else {} + + if bbox: + x, y = int(bbox.get('x', 0)), int(bbox.get('y', 0)) + w, h = int(bbox.get('width', 100)), int(bbox.get('height', 100)) + print(f"📸 [OCR] Capture zone: ({x}, {y}) -> ({x+w}, {y+h})") + screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h)) + else: + print(f"📸 [OCR] Capture écran complet") + screenshot = ImageGrab.grab() + + buffer = io.BytesIO() + screenshot.save(buffer, format='PNG') + screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + except Exception as cap_err: + return {'success': False, 'error': f"Erreur capture: {cap_err}"} + + if not screenshot_base64: + return {'success': False, 'error': "Pas d'image à analyser"} + + prompt_map = { + 'full': "Extrais TOUT le texte visible dans cette image. Retourne uniquement le texte brut, sans commentaire.", + 'numbers': "Extrais uniquement les nombres et chiffres visibles dans cette image. Retourne-les séparés par des espaces.", + 'lines': "Extrais tout le texte visible ligne par ligne. Une ligne par ligne de texte visible.", + 'words': "Extrais tous les mots visibles dans cette image, séparés par des espaces.", + } + prompt = prompt_map.get(extraction_mode, prompt_map['full']) + + if 'qwen' in model.lower() and not prompt.startswith('/no_think'): + prompt = f"/no_think\n{prompt}" + + print(f"📝 [OCR] Extraction texte avec {model} (mode: {extraction_mode})...") + + ollama_url = params.get('ollama_url', 'http://localhost:11434') + payload = { + "model": model, + "prompt": prompt, + "images": [screenshot_base64], + "stream": False, + "options": {"temperature": 0.1, "num_predict": 4000} + } + + response = requests.post( + f"{ollama_url}/api/generate", + json=payload, + timeout=timeout_ms / 1000 + ) + + if response.status_code != 200: + return {'success': False, 'error': f"Erreur Ollama: {response.status_code}"} + + result = response.json() + extracted_text = result.get('response', '').strip() + + if not extracted_text and result.get('thinking'): + extracted_text = result.get('thinking', '').strip() + + for f in text_filters: + if f == 'digits_only': + extracted_text = re.sub(r'[^\d\s]', '', extracted_text) + elif f == 'letters_only': + extracted_text = re.sub(r'[^a-zA-ZÀ-ÿ\s]', '', extracted_text) + elif f == 'trim_whitespace': + extracted_text = extracted_text.strip() + elif f == 'uppercase': + extracted_text = extracted_text.upper() + elif f == 'lowercase': + extracted_text = extracted_text.lower() + + print(f"✅ [OCR] Texte extrait ({len(extracted_text)} caractères)") + if extracted_text: + print(f" Résultat: {extracted_text[:150]}...") + + _execution_state['variables'][output_variable] = extracted_text + + return { + 'success': True, + 'output': { + 'extracted_text': extracted_text, + 'variable': output_variable, + 'character_count': len(extracted_text), + 'word_count': len(extracted_text.split()) if extracted_text else 0, + 'mode': extraction_mode, + 'model': model + } + } + + except requests.exceptions.Timeout: + return {'success': False, 'error': f"Timeout Ollama après {timeout_ms}ms"} + except requests.exceptions.ConnectionError: + return {'success': False, 'error': "Ollama non accessible"} + except Exception as e: + return {'success': False, 'error': str(e)} + + def execute_action_with_coords(action_type: str, params: dict, coords: dict) -> dict: """ Exécute une action avec des coordonnées spécifiées par l'utilisateur (self-healing). @@ -792,6 +911,167 @@ def execute_action(action_type: str, params: dict) -> dict: # Analyse de texte avec IA (Ollama) return execute_ai_analyze(params) + elif action_type in ['hover_anchor', 'hover']: + anchor = params.get('visual_anchor', {}) + bbox = anchor.get('bounding_box', {}) + if not bbox: + return {'success': False, 'error': 'Pas de bounding_box dans visual_anchor'} + + x = bbox.get('x', 0) + bbox.get('width', 0) / 2 + y = bbox.get('y', 0) + bbox.get('height', 0) / 2 + duration_ms = params.get('hover_duration_ms', params.get('duration_ms', 1000)) + + print(f"🖱️ [Action] Survol à ({x}, {y}) pendant {duration_ms}ms") + pyautogui.moveTo(x, y, duration=0.3) + time.sleep(duration_ms / 1000) + return {'success': True, 'output': {'hovered_at': {'x': x, 'y': y}, 'duration_ms': duration_ms}} + + elif action_type in ['drag_drop_anchor', 'drag_drop']: + source_anchor = params.get('source_anchor', params.get('visual_anchor', {})) + dest_anchor = params.get('destination_anchor', {}) + source_bbox = source_anchor.get('bounding_box', {}) + dest_bbox = dest_anchor.get('bounding_box', {}) + + if not source_bbox or not dest_bbox: + return {'success': False, 'error': 'bounding_box source et destination requis'} + + src_x = source_bbox.get('x', 0) + source_bbox.get('width', 0) / 2 + src_y = source_bbox.get('y', 0) + source_bbox.get('height', 0) / 2 + dst_x = dest_bbox.get('x', 0) + dest_bbox.get('width', 0) / 2 + dst_y = dest_bbox.get('y', 0) + dest_bbox.get('height', 0) / 2 + duration_ms = params.get('drag_duration_ms', 500) + + print(f"🖱️ [Action] Glisser de ({src_x}, {src_y}) vers ({dst_x}, {dst_y})") + pyautogui.moveTo(src_x, src_y, duration=0.2) + time.sleep(0.1) + pyautogui.drag(dst_x - src_x, dst_y - src_y, duration=duration_ms / 1000, button='left') + return {'success': True, 'output': {'from': {'x': src_x, 'y': src_y}, 'to': {'x': dst_x, 'y': dst_y}}} + + elif action_type in ['scroll_to_anchor', 'scroll']: + direction = params.get('scroll_direction', 'down') + amount = params.get('scroll_amount', params.get('scroll_step_pixels', 3)) + anchor = params.get('visual_anchor', {}) + bbox = anchor.get('bounding_box', {}) + + if bbox: + x = bbox.get('x', 0) + bbox.get('width', 0) / 2 + y = bbox.get('y', 0) + bbox.get('height', 0) / 2 + pyautogui.moveTo(x, y, duration=0.1) + + scroll_value = amount if direction in ['up', 'left'] else -amount + + print(f"📜 [Action] Scroll {direction} ({amount})") + if direction in ['left', 'right']: + pyautogui.hscroll(scroll_value) + else: + pyautogui.scroll(scroll_value) + + time.sleep(0.5) + return {'success': True, 'output': {'direction': direction, 'amount': amount}} + + elif action_type in ['focus_anchor', 'focus']: + anchor = params.get('visual_anchor', {}) + bbox = anchor.get('bounding_box', {}) + if not bbox: + return {'success': False, 'error': 'Pas de bounding_box dans visual_anchor'} + + x = bbox.get('x', 0) + bbox.get('width', 0) / 2 + y = bbox.get('y', 0) + bbox.get('height', 0) / 2 + + print(f"🎯 [Action] Focus à ({x}, {y})") + pyautogui.click(x, y) + time.sleep(0.3) + return {'success': True, 'output': {'focused_at': {'x': x, 'y': y}}} + + elif action_type == 'extract_text': + return execute_extract_text(params) + + elif action_type == 'ai_ocr': + params.setdefault('analysis_prompt', "Extrais TOUT le texte visible dans cette image. Retourne uniquement le texte brut, ligne par ligne, sans commentaire.") + return execute_ai_analyze(params) + + elif action_type == 'ai_summarize': + params.setdefault('analysis_prompt', "Résume le contenu visible dans cette image en 3-5 phrases concises. Identifie les informations clés.") + return execute_ai_analyze(params) + + elif action_type == 'ai_extract': + params.setdefault('analysis_prompt', "Extrais les données structurées visibles (noms, dates, montants, identifiants). Retourne un JSON structuré.") + return execute_ai_analyze(params) + + elif action_type == 'ai_classify': + categories = params.get('categories', []) + cats_str = ', '.join(categories) if categories else 'les catégories pertinentes' + params.setdefault('analysis_prompt', f"Classe le contenu visible parmi : {cats_str}. Retourne la catégorie et un score de confiance.") + return execute_ai_analyze(params) + + elif action_type == 'ai_custom': + system_prompt = params.get('system_prompt', '') + if system_prompt and 'analysis_prompt' not in params: + params['analysis_prompt'] = system_prompt + return execute_ai_analyze(params) + + elif action_type == 'screenshot_evidence': + import pyautogui + from PIL import Image + from pathlib import Path + import io + + label = params.get('label', params.get('description', 'evidence')) + output_variable = params.get('output_variable', 'screenshot_evidence') + + screenshot = pyautogui.screenshot() + + # Sauvegarder la preuve + evidence_dir = Path('data/evidence') + evidence_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filepath = evidence_dir / f"evidence_{timestamp}_{label[:30]}.png" + screenshot.save(str(filepath)) + + # Encoder en base64 pour la variable + buffer = io.BytesIO() + screenshot.save(buffer, format='PNG') + screenshot_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + + _execution_state['variables'][output_variable] = screenshot_b64 + + print(f"📸 [Evidence] Capture sauvegardée: {filepath}") + return {'success': True, 'output': {'filepath': str(filepath), 'variable': output_variable}} + + elif action_type in ['verify_element_exists', 'verify_element']: + anchor = params.get('visual_anchor', {}) + screenshot_base64 = anchor.get('screenshot') + bbox = anchor.get('bounding_box', {}) + expected = params.get('expected', True) + output_variable = params.get('output_variable', 'element_exists') + + found = False + confidence = 0.0 + + if screenshot_base64 and execution_mode in ['intelligent', 'debug']: + try: + from services.intelligent_executor import find_and_click + result = find_and_click( + anchor_image_base64=screenshot_base64, + anchor_bbox=bbox, + method='clip', + detection_threshold=0.35 + ) + found = result.get('found', False) + confidence = result.get('confidence', 0.0) + except Exception as e: + print(f"⚠️ [Verify] Erreur vision: {e}") + elif bbox: + found = True + confidence = 0.5 + + match = (found == expected) + _execution_state['variables'][output_variable] = found + + status = "trouvé" if found else "absent" + print(f"🔍 [Verify] Élément {status} (confiance: {confidence:.2f}, attendu: {expected})") + return {'success': match, 'output': {'found': found, 'confidence': confidence, 'expected': expected, 'match': match}} + else: return {'success': False, 'error': f"Type d'action non supporté: {action_type}"} diff --git a/visual_workflow_builder/backend/api_v3/learned_workflows.py b/visual_workflow_builder/backend/api_v3/learned_workflows.py index 8e6dd9ef4..2738fab28 100644 --- a/visual_workflow_builder/backend/api_v3/learned_workflows.py +++ b/visual_workflow_builder/backend/api_v3/learned_workflows.py @@ -27,7 +27,7 @@ from flask import jsonify, request from . import api_v3_bp from .workflow import generate_id -from db.models import db, Workflow, Step +from db.models import db, Workflow, Step, VisualAnchor logger = logging.getLogger(__name__) @@ -303,7 +303,7 @@ def import_learned_workflow(workflow_id: str): db.session.add(workflow) - # Créer les steps + # Créer les steps (avec sauvegarde des screenshots d'ancres) for step_data in steps_list: step = Step( id=generate_id("step"), @@ -314,7 +314,57 @@ def import_learned_workflow(workflow_id: str): position_y=step_data.get("position_y", 200), label=step_data.get("label", step_data["action_type"]), ) - step.parameters = step_data.get("parameters", {}) + params = dict(step_data.get("parameters", {})) + + # Extraire et sauvegarder le screenshot d'ancre si présent + anchor_b64 = params.pop("_anchor_image_base64", None) + params.pop("_anchor_bbox", None) + if anchor_b64: + try: + from services.anchor_image_service import ( + save_anchor_image, generate_anchor_id + ) + from PIL import Image + from io import BytesIO + import base64 as b64mod + + if ',' in anchor_b64: + anchor_b64 = anchor_b64.split(',', 1)[1] + img_data = b64mod.b64decode(anchor_b64) + img = Image.open(BytesIO(img_data)) + bbox = { + "x": 0, "y": 0, + "width": img.width, "height": img.height + } + anchor_id = generate_anchor_id() + result = save_anchor_image( + anchor_id=anchor_id, + image_base64=anchor_b64, + bounding_box=bbox, + metadata={"source": "learned_import", "workflow_id": wf_id} + ) + if result.get("success"): + from services.anchor_image_service import ( + get_original_path, get_thumbnail_path + ) + va = VisualAnchor( + id=anchor_id, + image_path=str(get_original_path(anchor_id) or ""), + thumbnail_path=str(get_thumbnail_path(anchor_id) or ""), + bbox_x=0, bbox_y=0, + bbox_width=img.width, bbox_height=img.height, + description=step_data.get("label", ""), + capture_method="learned_import", + ) + db.session.add(va) + step.anchor_id = anchor_id + logger.info("Ancre sauvegardée: %s pour step %s", + anchor_id, step.id) + except Exception as e: + logger.warning("Échec sauvegarde ancre pour step %s: %s", + step_data.get("order"), e) + + step.parameters = params db.session.add(step) db.session.commit() diff --git a/visual_workflow_builder/backend/instance/workflows.db b/visual_workflow_builder/backend/instance/workflows.db index 1b69e9124..cefaf37df 100644 Binary files a/visual_workflow_builder/backend/instance/workflows.db and b/visual_workflow_builder/backend/instance/workflows.db differ diff --git a/visual_workflow_builder/backend/services/learned_workflow_bridge.py b/visual_workflow_builder/backend/services/learned_workflow_bridge.py index c7f0ff6a0..718ea106f 100644 --- a/visual_workflow_builder/backend/services/learned_workflow_bridge.py +++ b/visual_workflow_builder/backend/services/learned_workflow_bridge.py @@ -218,6 +218,20 @@ def convert_learned_to_vwb_steps( if target.get("by_text"): vwb_params["target_text"] = target["by_text"] + # Extraire le screenshot de l'ancre pour la preview dans le VWB + anchor_b64 = ( + target.get("anchor_image_base64") + or target.get("screenshot") + or action_params.get("anchor_image_base64") + ) + if anchor_b64: + vwb_params["_anchor_image_base64"] = anchor_b64 + bbox = target.get("by_position") + if bbox and isinstance(bbox, (list, tuple)) and len(bbox) >= 2: + vwb_params["_anchor_bbox"] = { + "x_pct": bbox[0], "y_pct": bbox[1] + } + label = _build_step_label(vwb_action_type, vwb_params, from_name, to_name) steps.append({ "action_type": vwb_action_type, @@ -229,6 +243,10 @@ def convert_learned_to_vwb_steps( "metadata": edge_meta, }) + # Fusionner les type_text consécutifs et les key_press en combos + steps = _merge_consecutive_text_inputs(steps) + steps = _merge_consecutive_key_presses(steps) + # Appliquer le layout serpentin à tous les steps _compute_layout(steps) @@ -298,6 +316,79 @@ def _convert_compound_substep( return vwb_type, vwb_params +def _merge_consecutive_text_inputs( + steps: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """ + Fusionne les steps type_text consécutifs en un seul. + + Quand un compound est décomposé lettre par lettre (ex: "bonjour" → 7 steps), + cette fonction les recombine en un seul step "Saisir : bonjour". + """ + if not steps: + return steps + + merged = [steps[0]] + for step in steps[1:]: + prev = merged[-1] + if (prev["action_type"] == "type_text" + and step["action_type"] == "type_text"): + # Concaténer le texte + prev_text = prev.get("parameters", {}).get("text", "") + curr_text = step.get("parameters", {}).get("text", "") + prev["parameters"]["text"] = prev_text + curr_text + # Mettre à jour le label + combined = prev["parameters"]["text"] + prev["label"] = f'Saisir : "{combined}"' + else: + merged.append(step) + + # Réindexer les ordres + for idx, step in enumerate(merged): + step["order"] = idx + + return merged + + +def _merge_consecutive_key_presses( + steps: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """ + Fusionne les key_press / keyboard_shortcut consécutifs portant une seule touche + en un seul keyboard_shortcut combo (ex: ctrl puis s → ctrl+s). + + Ne fusionne que les steps keyboard_shortcut consécutifs dont chacun ne porte + qu'une seule touche (signe d'un combo décomposé). Les raccourcis déjà composés + (keys avec 2+ éléments) ne sont pas touchés. + """ + if not steps: + return steps + + merged = [steps[0]] + for step in steps[1:]: + prev = merged[-1] + if (prev["action_type"] == "keyboard_shortcut" + and step["action_type"] == "keyboard_shortcut"): + prev_keys = prev.get("parameters", {}).get("keys", []) + curr_keys = step.get("parameters", {}).get("keys", []) + # Ne fusionner que si chaque step porte exactement 1 touche + # (un combo déjà composé comme ["ctrl", "s"] ne doit pas absorber le suivant) + if len(curr_keys) == 1 and len(prev_keys) >= 1: + # Vérifier que le prev est lui-même issu d'une fusion ou d'une seule touche + # On fusionne tant que c'est un enchaînement de touches simples + prev["parameters"]["keys"] = prev_keys + curr_keys + combo_str = "+".join(prev["parameters"]["keys"]) + prev["label"] = f"Raccourci : {combo_str}" + continue + merged.append(step) + + # Réindexer les ordres + for idx, step in enumerate(merged): + step["order"] = idx + + return merged + + def _compute_layout( steps: List[Dict[str, Any]], cols: int = 3, diff --git a/visual_workflow_builder/frontend_v4/src/components/ToolPalette.tsx b/visual_workflow_builder/frontend_v4/src/components/ToolPalette.tsx index 56ab46e0b..79532cbb1 100644 --- a/visual_workflow_builder/frontend_v4/src/components/ToolPalette.tsx +++ b/visual_workflow_builder/frontend_v4/src/components/ToolPalette.tsx @@ -27,7 +27,7 @@ export default function ToolPalette() {
{categories.map((catKey) => { const cat = ACTION_CATEGORIES[catKey]; - const tools = ACTIONS.filter(a => a.category === catKey); + const tools = ACTIONS.filter(a => a.category === catKey && !a.hidden); const isExpanded = expandedCategories.includes(catKey); if (tools.length === 0) return null; diff --git a/visual_workflow_builder/frontend_v4/src/types.ts b/visual_workflow_builder/frontend_v4/src/types.ts index 9df8b8d91..75b442418 100644 --- a/visual_workflow_builder/frontend_v4/src/types.ts +++ b/visual_workflow_builder/frontend_v4/src/types.ts @@ -71,6 +71,7 @@ export interface ActionDefinition { category: 'mouse' | 'keyboard' | 'wait' | 'data' | 'logic' | 'ai' | 'llm' | 'validation' | 'files'; needsAnchor: boolean; params: { name: string; type: string; description: string }[]; + hidden?: boolean; } export const ACTIONS: ActionDefinition[] = [ @@ -116,11 +117,11 @@ export const ACTIONS: ActionDefinition[] = [ ] }, // === LOGIQUE === - { type: 'visual_condition', label: 'Condition visuelle', icon: '🔀', description: 'Branchement conditionnel : si l\'ancre est trouvée, suit la sortie bas ; sinon, la sortie droite.', category: 'logic', needsAnchor: true, params: [ + { type: 'visual_condition', label: 'Condition visuelle', icon: '🔀', description: 'Branchement conditionnel : si l\'ancre est trouvée, suit la sortie bas ; sinon, la sortie droite.', category: 'logic', needsAnchor: true, hidden: true, params: [ { name: 'on_found', type: 'string', description: 'ID de l\'étape si l\'élément est trouvé' }, { name: 'on_not_found', type: 'string', description: 'ID de l\'étape si l\'élément n\'est pas trouvé' } ] }, - { type: 'loop_visual', label: 'Boucle visuelle', icon: '🔁', description: 'Répète les étapes connectées tant que l\'ancre est visible.', category: 'logic', needsAnchor: true, params: [ + { type: 'loop_visual', label: 'Boucle visuelle', icon: '🔁', description: 'Répète les étapes connectées tant que l\'ancre est visible.', category: 'logic', needsAnchor: true, hidden: true, params: [ { name: 'max_iterations', type: 'number', description: 'Nombre maximum d\'itérations' } ] },