diff --git a/visual_workflow_builder/backend/actions/data/__init__.py b/visual_workflow_builder/backend/actions/data/__init__.py new file mode 100644 index 000000000..42e74c90b --- /dev/null +++ b/visual_workflow_builder/backend/actions/data/__init__.py @@ -0,0 +1,21 @@ +""" +Actions Data VWB - Module d'initialisation +Auteur : Dom, Claude - 14 janvier 2026 + +Ce module contient les actions de manipulation de données +pour le Visual Workflow Builder. + +Actions disponibles : +- VWBExtraireTableauAction : Extraction de données tabulaires +""" + +from .extraire_tableau import VWBExtraireTableauAction, VWBExtractTableAction + +__all__ = [ + 'VWBExtraireTableauAction', + 'VWBExtractTableAction', # Alias anglais +] + +__version__ = '1.0.0' +__author__ = 'Dom, Claude' +__date__ = '14 janvier 2026' diff --git a/visual_workflow_builder/backend/actions/data/extraire_tableau.py b/visual_workflow_builder/backend/actions/data/extraire_tableau.py new file mode 100644 index 000000000..d184e76fe --- /dev/null +++ b/visual_workflow_builder/backend/actions/data/extraire_tableau.py @@ -0,0 +1,583 @@ +""" +Action Extraire Tableau - Extraction de données tabulaires depuis l'écran +Auteur : Dom, Claude - 14 janvier 2026 + +Cette action permet d'extraire des données structurées depuis un tableau +visible à l'écran, en utilisant soit l'IA (Ollama) soit l'OCR. + +Cas d'usage : +- Extraire des données de tableaux Excel/Web +- Récupérer des listes de fichiers +- Capturer des grilles de données +- Extraire des rapports tabulaires +""" + +from typing import Dict, Any, List, Optional, Tuple +from datetime import datetime +import time +import base64 +import io +import json +import requests +import re + +from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus +from ...contracts.error import VWBErrorType, create_vwb_error +from ...contracts.visual_anchor import VWBVisualAnchor + + +# Configuration par défaut +OLLAMA_DEFAULT_URL = "http://localhost:11434" +OLLAMA_DEFAULT_MODEL = "qwen2.5-vl:7b" + + +class VWBExtraireTableauAction(BaseVWBAction): + """ + Action d'extraction de tableau. + + Extrait des données tabulaires depuis une région de l'écran + en utilisant l'IA vision (Ollama) ou l'OCR. + + Formats de sortie supportés : + - Liste de dictionnaires (JSON) + - CSV + - Liste de listes + """ + + def __init__( + self, + action_id: str, + parameters: Dict[str, Any], + screen_capturer=None + ): + """ + Initialise l'action d'extraction de tableau. + + Args: + action_id: Identifiant unique de l'action + parameters: Paramètres de l'extraction + screen_capturer: Instance du ScreenCapturer (optionnel) + """ + super().__init__( + action_id=action_id, + name="Extraire Tableau", + description="Extrait des données tabulaires depuis l'écran", + parameters=parameters, + screen_capturer=screen_capturer + ) + + # Zone à analyser + self.ancre_visuelle: Optional[VWBVisualAnchor] = ( + parameters.get('visual_anchor') or + parameters.get('ancre_visuelle') + ) + self.region = parameters.get('region') # {x, y, width, height} + + # Configuration de l'extraction + self.mode_extraction = parameters.get('mode_extraction', parameters.get('extraction_mode', 'ia')) # ia, ocr + self.format_sortie = parameters.get('format_sortie', parameters.get('output_format', 'json')) # json, csv, list + self.inclure_entetes = parameters.get('inclure_entetes', parameters.get('include_headers', True)) + + # Colonnes attendues (optionnel, aide l'IA) + self.colonnes_attendues = parameters.get('colonnes_attendues', parameters.get('expected_columns', [])) + + # Configuration Ollama + self.ollama_url = parameters.get('ollama_url', OLLAMA_DEFAULT_URL) + self.ollama_model = parameters.get('ollama_model', parameters.get('model', OLLAMA_DEFAULT_MODEL)) + + # Options + self.timeout_ms = parameters.get('timeout_ms', 45000) + self.max_lignes = parameters.get('max_lignes', parameters.get('max_rows', 100)) + + # Variable de sortie + self.variable_sortie = parameters.get('variable_sortie', parameters.get('output_variable', 'tableau_extrait')) + + def validate_parameters(self) -> List[str]: + """Valide les paramètres de l'action.""" + erreurs = [] + + if not self.ancre_visuelle and not self.region and not self.screen_capturer: + erreurs.append("Ancre visuelle, région ou screen_capturer requis") + + if self.mode_extraction not in ['ia', 'ocr']: + erreurs.append("Mode extraction doit être 'ia' ou 'ocr'") + + if self.format_sortie not in ['json', 'csv', 'list']: + erreurs.append("Format sortie doit être 'json', 'csv' ou 'list'") + + if self.max_lignes < 1 or self.max_lignes > 1000: + erreurs.append("max_lignes doit être entre 1 et 1000") + + return erreurs + + def execute_core(self, step_id: str) -> VWBActionResult: + """ + Exécute l'extraction du tableau. + + Args: + step_id: Identifiant de l'étape + + Returns: + Résultat avec les données extraites + """ + start_time = datetime.now() + + try: + # Étape 1: Capturer l'image + image_base64 = self._capturer_image() + + if not image_base64: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SCREEN_CAPTURE_FAILED, + message="Impossible de capturer l'image du tableau" + ) + + # Étape 2: Extraire les données + print(f"📊 Extraction du tableau ({self.mode_extraction})...") + + if self.mode_extraction == 'ia': + donnees = self._extraire_avec_ia(image_base64) + else: + donnees = self._extraire_avec_ocr(image_base64) + + if donnees is None: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message="Échec de l'extraction du tableau" + ) + + # Étape 3: Formater la sortie + sortie_formatee = self._formater_sortie(donnees) + + end_time = datetime.now() + execution_time = (end_time - start_time).total_seconds() * 1000 + + nb_lignes = len(donnees) if isinstance(donnees, list) else 0 + nb_colonnes = len(donnees[0]) if donnees and isinstance(donnees[0], (list, dict)) else 0 + + print(f"✅ Tableau extrait: {nb_lignes} lignes x {nb_colonnes} colonnes") + + return VWBActionResult( + action_id=self.action_id, + step_id=step_id, + status=VWBActionStatus.SUCCESS, + start_time=start_time, + end_time=end_time, + execution_time_ms=execution_time, + output_data={ + 'tableau': sortie_formatee, + 'nb_lignes': nb_lignes, + 'nb_colonnes': nb_colonnes, + 'format': self.format_sortie, + 'variable_sortie': self.variable_sortie, + 'mode': self.mode_extraction + }, + evidence_list=self.evidence_list.copy() + ) + + except Exception as e: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=f"Erreur: {str(e)}", + technical_details={'exception': str(e)} + ) + + def _capturer_image(self) -> Optional[str]: + """Capture l'image à analyser.""" + try: + # Option 1: Image depuis l'ancre visuelle + if self.ancre_visuelle: + if isinstance(self.ancre_visuelle, dict): + img = self.ancre_visuelle.get('screenshot') or self.ancre_visuelle.get('image_base64') + if img: + return img + elif isinstance(self.ancre_visuelle, VWBVisualAnchor): + if self.ancre_visuelle.screenshot_base64: + return self.ancre_visuelle.screenshot_base64 + + # Option 2: Capture d'une région + if self.region and self.screen_capturer: + return self._capturer_region(self.region) + + # Option 3: Écran entier + if self.screen_capturer: + return self._capturer_ecran() + + return None + + except Exception as e: + print(f"⚠️ Erreur capture: {e}") + return None + + def _capturer_region(self, region: Dict[str, int]) -> Optional[str]: + """Capture une région spécifique.""" + try: + from PIL import Image + + img_array = self.screen_capturer.capture() + if img_array is None: + return None + + pil_image = Image.fromarray(img_array) + + x = region.get('x', 0) + y = region.get('y', 0) + width = region.get('width', 100) + height = region.get('height', 100) + + cropped = pil_image.crop((x, y, x + width, y + height)) + + buffer = io.BytesIO() + cropped.save(buffer, format='PNG') + return base64.b64encode(buffer.getvalue()).decode('utf-8') + + except Exception as e: + print(f"⚠️ Erreur capture région: {e}") + return None + + def _capturer_ecran(self) -> Optional[str]: + """Capture l'écran entier.""" + try: + from PIL import Image + + img_array = self.screen_capturer.capture() + if img_array is None: + return None + + pil_image = Image.fromarray(img_array) + + buffer = io.BytesIO() + pil_image.save(buffer, format='PNG', optimize=True) + return base64.b64encode(buffer.getvalue()).decode('utf-8') + + except Exception as e: + print(f"⚠️ Erreur capture: {e}") + return None + + def _extraire_avec_ia(self, image_base64: str) -> Optional[List]: + """ + Extrait le tableau avec l'IA (Ollama). + + Args: + image_base64: Image en base64 + + Returns: + Liste de dictionnaires ou liste de listes + """ + try: + # Construire le prompt + prompt = self._construire_prompt_ia() + + payload = { + "model": self.ollama_model, + "prompt": prompt, + "images": [image_base64], + "stream": False, + "options": { + "temperature": 0.1, # Basse température pour extraction précise + "num_predict": 4000, + } + } + + response = requests.post( + f"{self.ollama_url}/api/generate", + json=payload, + timeout=self.timeout_ms / 1000 + ) + + if response.status_code == 200: + result = response.json() + texte_reponse = result.get('response', '').strip() + return self._parser_reponse_ia(texte_reponse) + else: + print(f"⚠️ Erreur Ollama: {response.status_code}") + return None + + except requests.exceptions.Timeout: + print(f"⚠️ Timeout Ollama") + return None + + except requests.exceptions.ConnectionError: + print(f"⚠️ Ollama non accessible, fallback OCR") + return self._extraire_avec_ocr(image_base64) + + except Exception as e: + print(f"⚠️ Erreur IA: {e}") + return None + + def _construire_prompt_ia(self) -> str: + """Construit le prompt pour l'extraction IA.""" + prompt = """Analyse cette image et extrait les données du tableau visible. + +INSTRUCTIONS: +1. Identifie toutes les lignes et colonnes du tableau +2. Retourne les données au format JSON: une liste de dictionnaires +3. Utilise les en-têtes de colonnes comme clés si visibles +4. Si pas d'en-têtes, utilise "col1", "col2", etc. +5. Retourne UNIQUEMENT le JSON, sans explication + +""" + if self.colonnes_attendues: + prompt += f"Colonnes attendues: {', '.join(self.colonnes_attendues)}\n\n" + + if self.max_lignes < 100: + prompt += f"Limite: {self.max_lignes} premières lignes maximum.\n\n" + + prompt += """FORMAT DE SORTIE (exemple): +[ + {"Nom": "Dupont", "Prénom": "Jean", "Age": "35"}, + {"Nom": "Martin", "Prénom": "Marie", "Age": "28"} +] + +Retourne maintenant le JSON du tableau:""" + + return prompt + + def _parser_reponse_ia(self, texte: str) -> Optional[List]: + """ + Parse la réponse de l'IA en données structurées. + + Args: + texte: Réponse texte de l'IA + + Returns: + Liste parsée ou None + """ + try: + # Chercher le JSON dans la réponse + # Parfois l'IA ajoute du texte avant/après + + # Essayer de trouver un array JSON + match = re.search(r'\[[\s\S]*\]', texte) + if match: + json_str = match.group() + return json.loads(json_str) + + # Essayer de parser directement + return json.loads(texte) + + except json.JSONDecodeError: + # Fallback: parser comme texte tabulé + return self._parser_texte_tabule(texte) + + except Exception as e: + print(f"⚠️ Erreur parsing: {e}") + return None + + def _parser_texte_tabule(self, texte: str) -> List[List[str]]: + """ + Parse du texte tabulé en liste de listes. + + Args: + texte: Texte avec lignes et colonnes + + Returns: + Liste de listes + """ + lignes = texte.strip().split('\n') + tableau = [] + + for ligne in lignes[:self.max_lignes]: + ligne = ligne.strip() + if not ligne or ligne.startswith('#'): + continue + + # Essayer différents séparateurs + if '\t' in ligne: + cellules = ligne.split('\t') + elif '|' in ligne: + cellules = [c.strip() for c in ligne.split('|') if c.strip()] + elif ',' in ligne: + cellules = ligne.split(',') + else: + cellules = ligne.split() + + if cellules: + tableau.append(cellules) + + return tableau + + def _extraire_avec_ocr(self, image_base64: str) -> Optional[List]: + """ + Extrait le tableau avec OCR. + + Args: + image_base64: Image en base64 + + Returns: + Liste de listes + """ + try: + # Décoder l'image + from PIL import Image + + image_data = base64.b64decode(image_base64) + pil_image = Image.open(io.BytesIO(image_data)) + + # Essayer EasyOCR + try: + import easyocr + import numpy as np + + reader = easyocr.Reader(['fr', 'en'], gpu=True) + img_array = np.array(pil_image) + + results = reader.readtext(img_array) + + # Grouper par lignes (par coordonnée Y) + return self._grouper_ocr_en_lignes(results) + + except ImportError: + print("⚠️ EasyOCR non disponible") + return None + + except Exception as e: + print(f"⚠️ Erreur OCR: {e}") + return None + + def _grouper_ocr_en_lignes(self, results: List) -> List[List[str]]: + """ + Groupe les résultats OCR en lignes de tableau. + + Args: + results: Résultats EasyOCR [(bbox, text, conf), ...] + + Returns: + Liste de lignes + """ + if not results: + return [] + + # Extraire les positions et textes + items = [] + for bbox, text, conf in results: + if conf < 0.3: # Ignorer basse confiance + continue + # Calculer le centre Y + y_center = (bbox[0][1] + bbox[2][1]) / 2 + x_center = (bbox[0][0] + bbox[2][0]) / 2 + items.append({'text': text, 'x': x_center, 'y': y_center}) + + if not items: + return [] + + # Trier par Y puis X + items.sort(key=lambda i: (i['y'], i['x'])) + + # Grouper par lignes (seuil de 20 pixels) + lignes = [] + ligne_courante = [] + y_precedent = items[0]['y'] + + for item in items: + if abs(item['y'] - y_precedent) > 20: + # Nouvelle ligne + if ligne_courante: + ligne_courante.sort(key=lambda i: i['x']) + lignes.append([i['text'] for i in ligne_courante]) + ligne_courante = [item] + y_precedent = item['y'] + else: + ligne_courante.append(item) + + # Ajouter la dernière ligne + if ligne_courante: + ligne_courante.sort(key=lambda i: i['x']) + lignes.append([i['text'] for i in ligne_courante]) + + return lignes[:self.max_lignes] + + def _formater_sortie(self, donnees: List) -> Any: + """ + Formate les données selon le format de sortie demandé. + + Args: + donnees: Données brutes (liste de dicts ou liste de listes) + + Returns: + Données formatées + """ + if not donnees: + return [] if self.format_sortie in ['json', 'list'] else '' + + # Si c'est déjà des dictionnaires + if isinstance(donnees[0], dict): + if self.format_sortie == 'json': + return donnees + elif self.format_sortie == 'csv': + return self._dicts_vers_csv(donnees) + else: # list + return [list(d.values()) for d in donnees] + + # Si c'est des listes + if isinstance(donnees[0], list): + if self.format_sortie == 'list': + return donnees + elif self.format_sortie == 'csv': + return self._listes_vers_csv(donnees) + else: # json + return self._listes_vers_dicts(donnees) + + return donnees + + def _dicts_vers_csv(self, donnees: List[Dict]) -> str: + """Convertit une liste de dicts en CSV.""" + if not donnees: + return '' + + lignes = [] + + # En-têtes + if self.inclure_entetes: + lignes.append(','.join(donnees[0].keys())) + + # Données + for row in donnees: + lignes.append(','.join(str(v) for v in row.values())) + + return '\n'.join(lignes) + + def _listes_vers_csv(self, donnees: List[List]) -> str: + """Convertit une liste de listes en CSV.""" + return '\n'.join(','.join(str(c) for c in ligne) for ligne in donnees) + + def _listes_vers_dicts(self, donnees: List[List]) -> List[Dict]: + """Convertit une liste de listes en liste de dicts.""" + if not donnees: + return [] + + # Utiliser la première ligne comme en-têtes si demandé + if self.inclure_entetes and len(donnees) > 1: + entetes = donnees[0] + return [dict(zip(entetes, ligne)) for ligne in donnees[1:]] + else: + # Générer des noms de colonnes + nb_cols = max(len(ligne) for ligne in donnees) + entetes = [f'col{i+1}' for i in range(nb_cols)] + return [dict(zip(entetes, ligne)) for ligne in donnees] + + def get_action_info(self) -> Dict[str, Any]: + """Retourne les informations de l'action.""" + return { + 'action_id': self.action_id, + 'name': self.name, + 'description': self.description, + 'type': 'extraire_tableau', + 'parameters': { + 'mode': self.mode_extraction, + 'format_sortie': self.format_sortie, + 'max_lignes': self.max_lignes, + 'variable_sortie': self.variable_sortie + }, + 'status': self.current_status.value + } + + +# Alias anglais pour compatibilité +VWBExtractTableAction = VWBExtraireTableauAction