Feat: Action extraire_tableau (IA + OCR)
Nouvelle action d'extraction de données tabulaires: - Mode IA: Ollama qwen2.5-vl pour extraction intelligente - Mode OCR: EasyOCR avec groupement par lignes - Formats de sortie: JSON, CSV, liste - Support colonnes attendues pour guider l'IA - Parsing de texte tabulé (tab, pipe, virgule) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
21
visual_workflow_builder/backend/actions/data/__init__.py
Normal file
21
visual_workflow_builder/backend/actions/data/__init__.py
Normal file
@@ -0,0 +1,21 @@
|
||||
"""
|
||||
Actions Data VWB - Module d'initialisation
|
||||
Auteur : Dom, Claude - 14 janvier 2026
|
||||
|
||||
Ce module contient les actions de manipulation de données
|
||||
pour le Visual Workflow Builder.
|
||||
|
||||
Actions disponibles :
|
||||
- VWBExtraireTableauAction : Extraction de données tabulaires
|
||||
"""
|
||||
|
||||
from .extraire_tableau import VWBExtraireTableauAction, VWBExtractTableAction
|
||||
|
||||
__all__ = [
|
||||
'VWBExtraireTableauAction',
|
||||
'VWBExtractTableAction', # Alias anglais
|
||||
]
|
||||
|
||||
__version__ = '1.0.0'
|
||||
__author__ = 'Dom, Claude'
|
||||
__date__ = '14 janvier 2026'
|
||||
583
visual_workflow_builder/backend/actions/data/extraire_tableau.py
Normal file
583
visual_workflow_builder/backend/actions/data/extraire_tableau.py
Normal file
@@ -0,0 +1,583 @@
|
||||
"""
|
||||
Action Extraire Tableau - Extraction de données tabulaires depuis l'écran
|
||||
Auteur : Dom, Claude - 14 janvier 2026
|
||||
|
||||
Cette action permet d'extraire des données structurées depuis un tableau
|
||||
visible à l'écran, en utilisant soit l'IA (Ollama) soit l'OCR.
|
||||
|
||||
Cas d'usage :
|
||||
- Extraire des données de tableaux Excel/Web
|
||||
- Récupérer des listes de fichiers
|
||||
- Capturer des grilles de données
|
||||
- Extraire des rapports tabulaires
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
from datetime import datetime
|
||||
import time
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
import requests
|
||||
import re
|
||||
|
||||
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
|
||||
from ...contracts.error import VWBErrorType, create_vwb_error
|
||||
from ...contracts.visual_anchor import VWBVisualAnchor
|
||||
|
||||
|
||||
# Configuration par défaut
|
||||
OLLAMA_DEFAULT_URL = "http://localhost:11434"
|
||||
OLLAMA_DEFAULT_MODEL = "qwen2.5-vl:7b"
|
||||
|
||||
|
||||
class VWBExtraireTableauAction(BaseVWBAction):
|
||||
"""
|
||||
Action d'extraction de tableau.
|
||||
|
||||
Extrait des données tabulaires depuis une région de l'écran
|
||||
en utilisant l'IA vision (Ollama) ou l'OCR.
|
||||
|
||||
Formats de sortie supportés :
|
||||
- Liste de dictionnaires (JSON)
|
||||
- CSV
|
||||
- Liste de listes
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
action_id: str,
|
||||
parameters: Dict[str, Any],
|
||||
screen_capturer=None
|
||||
):
|
||||
"""
|
||||
Initialise l'action d'extraction de tableau.
|
||||
|
||||
Args:
|
||||
action_id: Identifiant unique de l'action
|
||||
parameters: Paramètres de l'extraction
|
||||
screen_capturer: Instance du ScreenCapturer (optionnel)
|
||||
"""
|
||||
super().__init__(
|
||||
action_id=action_id,
|
||||
name="Extraire Tableau",
|
||||
description="Extrait des données tabulaires depuis l'écran",
|
||||
parameters=parameters,
|
||||
screen_capturer=screen_capturer
|
||||
)
|
||||
|
||||
# Zone à analyser
|
||||
self.ancre_visuelle: Optional[VWBVisualAnchor] = (
|
||||
parameters.get('visual_anchor') or
|
||||
parameters.get('ancre_visuelle')
|
||||
)
|
||||
self.region = parameters.get('region') # {x, y, width, height}
|
||||
|
||||
# Configuration de l'extraction
|
||||
self.mode_extraction = parameters.get('mode_extraction', parameters.get('extraction_mode', 'ia')) # ia, ocr
|
||||
self.format_sortie = parameters.get('format_sortie', parameters.get('output_format', 'json')) # json, csv, list
|
||||
self.inclure_entetes = parameters.get('inclure_entetes', parameters.get('include_headers', True))
|
||||
|
||||
# Colonnes attendues (optionnel, aide l'IA)
|
||||
self.colonnes_attendues = parameters.get('colonnes_attendues', parameters.get('expected_columns', []))
|
||||
|
||||
# Configuration Ollama
|
||||
self.ollama_url = parameters.get('ollama_url', OLLAMA_DEFAULT_URL)
|
||||
self.ollama_model = parameters.get('ollama_model', parameters.get('model', OLLAMA_DEFAULT_MODEL))
|
||||
|
||||
# Options
|
||||
self.timeout_ms = parameters.get('timeout_ms', 45000)
|
||||
self.max_lignes = parameters.get('max_lignes', parameters.get('max_rows', 100))
|
||||
|
||||
# Variable de sortie
|
||||
self.variable_sortie = parameters.get('variable_sortie', parameters.get('output_variable', 'tableau_extrait'))
|
||||
|
||||
def validate_parameters(self) -> List[str]:
|
||||
"""Valide les paramètres de l'action."""
|
||||
erreurs = []
|
||||
|
||||
if not self.ancre_visuelle and not self.region and not self.screen_capturer:
|
||||
erreurs.append("Ancre visuelle, région ou screen_capturer requis")
|
||||
|
||||
if self.mode_extraction not in ['ia', 'ocr']:
|
||||
erreurs.append("Mode extraction doit être 'ia' ou 'ocr'")
|
||||
|
||||
if self.format_sortie not in ['json', 'csv', 'list']:
|
||||
erreurs.append("Format sortie doit être 'json', 'csv' ou 'list'")
|
||||
|
||||
if self.max_lignes < 1 or self.max_lignes > 1000:
|
||||
erreurs.append("max_lignes doit être entre 1 et 1000")
|
||||
|
||||
return erreurs
|
||||
|
||||
def execute_core(self, step_id: str) -> VWBActionResult:
|
||||
"""
|
||||
Exécute l'extraction du tableau.
|
||||
|
||||
Args:
|
||||
step_id: Identifiant de l'étape
|
||||
|
||||
Returns:
|
||||
Résultat avec les données extraites
|
||||
"""
|
||||
start_time = datetime.now()
|
||||
|
||||
try:
|
||||
# Étape 1: Capturer l'image
|
||||
image_base64 = self._capturer_image()
|
||||
|
||||
if not image_base64:
|
||||
return self._create_error_result(
|
||||
step_id=step_id,
|
||||
start_time=start_time,
|
||||
error_type=VWBErrorType.SCREEN_CAPTURE_FAILED,
|
||||
message="Impossible de capturer l'image du tableau"
|
||||
)
|
||||
|
||||
# Étape 2: Extraire les données
|
||||
print(f"📊 Extraction du tableau ({self.mode_extraction})...")
|
||||
|
||||
if self.mode_extraction == 'ia':
|
||||
donnees = self._extraire_avec_ia(image_base64)
|
||||
else:
|
||||
donnees = self._extraire_avec_ocr(image_base64)
|
||||
|
||||
if donnees is None:
|
||||
return self._create_error_result(
|
||||
step_id=step_id,
|
||||
start_time=start_time,
|
||||
error_type=VWBErrorType.SYSTEM_ERROR,
|
||||
message="Échec de l'extraction du tableau"
|
||||
)
|
||||
|
||||
# Étape 3: Formater la sortie
|
||||
sortie_formatee = self._formater_sortie(donnees)
|
||||
|
||||
end_time = datetime.now()
|
||||
execution_time = (end_time - start_time).total_seconds() * 1000
|
||||
|
||||
nb_lignes = len(donnees) if isinstance(donnees, list) else 0
|
||||
nb_colonnes = len(donnees[0]) if donnees and isinstance(donnees[0], (list, dict)) else 0
|
||||
|
||||
print(f"✅ Tableau extrait: {nb_lignes} lignes x {nb_colonnes} colonnes")
|
||||
|
||||
return VWBActionResult(
|
||||
action_id=self.action_id,
|
||||
step_id=step_id,
|
||||
status=VWBActionStatus.SUCCESS,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
execution_time_ms=execution_time,
|
||||
output_data={
|
||||
'tableau': sortie_formatee,
|
||||
'nb_lignes': nb_lignes,
|
||||
'nb_colonnes': nb_colonnes,
|
||||
'format': self.format_sortie,
|
||||
'variable_sortie': self.variable_sortie,
|
||||
'mode': self.mode_extraction
|
||||
},
|
||||
evidence_list=self.evidence_list.copy()
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return self._create_error_result(
|
||||
step_id=step_id,
|
||||
start_time=start_time,
|
||||
error_type=VWBErrorType.SYSTEM_ERROR,
|
||||
message=f"Erreur: {str(e)}",
|
||||
technical_details={'exception': str(e)}
|
||||
)
|
||||
|
||||
def _capturer_image(self) -> Optional[str]:
|
||||
"""Capture l'image à analyser."""
|
||||
try:
|
||||
# Option 1: Image depuis l'ancre visuelle
|
||||
if self.ancre_visuelle:
|
||||
if isinstance(self.ancre_visuelle, dict):
|
||||
img = self.ancre_visuelle.get('screenshot') or self.ancre_visuelle.get('image_base64')
|
||||
if img:
|
||||
return img
|
||||
elif isinstance(self.ancre_visuelle, VWBVisualAnchor):
|
||||
if self.ancre_visuelle.screenshot_base64:
|
||||
return self.ancre_visuelle.screenshot_base64
|
||||
|
||||
# Option 2: Capture d'une région
|
||||
if self.region and self.screen_capturer:
|
||||
return self._capturer_region(self.region)
|
||||
|
||||
# Option 3: Écran entier
|
||||
if self.screen_capturer:
|
||||
return self._capturer_ecran()
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Erreur capture: {e}")
|
||||
return None
|
||||
|
||||
def _capturer_region(self, region: Dict[str, int]) -> Optional[str]:
|
||||
"""Capture une région spécifique."""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
img_array = self.screen_capturer.capture()
|
||||
if img_array is None:
|
||||
return None
|
||||
|
||||
pil_image = Image.fromarray(img_array)
|
||||
|
||||
x = region.get('x', 0)
|
||||
y = region.get('y', 0)
|
||||
width = region.get('width', 100)
|
||||
height = region.get('height', 100)
|
||||
|
||||
cropped = pil_image.crop((x, y, x + width, y + height))
|
||||
|
||||
buffer = io.BytesIO()
|
||||
cropped.save(buffer, format='PNG')
|
||||
return base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Erreur capture région: {e}")
|
||||
return None
|
||||
|
||||
def _capturer_ecran(self) -> Optional[str]:
|
||||
"""Capture l'écran entier."""
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
img_array = self.screen_capturer.capture()
|
||||
if img_array is None:
|
||||
return None
|
||||
|
||||
pil_image = Image.fromarray(img_array)
|
||||
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format='PNG', optimize=True)
|
||||
return base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Erreur capture: {e}")
|
||||
return None
|
||||
|
||||
def _extraire_avec_ia(self, image_base64: str) -> Optional[List]:
|
||||
"""
|
||||
Extrait le tableau avec l'IA (Ollama).
|
||||
|
||||
Args:
|
||||
image_base64: Image en base64
|
||||
|
||||
Returns:
|
||||
Liste de dictionnaires ou liste de listes
|
||||
"""
|
||||
try:
|
||||
# Construire le prompt
|
||||
prompt = self._construire_prompt_ia()
|
||||
|
||||
payload = {
|
||||
"model": self.ollama_model,
|
||||
"prompt": prompt,
|
||||
"images": [image_base64],
|
||||
"stream": False,
|
||||
"options": {
|
||||
"temperature": 0.1, # Basse température pour extraction précise
|
||||
"num_predict": 4000,
|
||||
}
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f"{self.ollama_url}/api/generate",
|
||||
json=payload,
|
||||
timeout=self.timeout_ms / 1000
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
texte_reponse = result.get('response', '').strip()
|
||||
return self._parser_reponse_ia(texte_reponse)
|
||||
else:
|
||||
print(f"⚠️ Erreur Ollama: {response.status_code}")
|
||||
return None
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
print(f"⚠️ Timeout Ollama")
|
||||
return None
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
print(f"⚠️ Ollama non accessible, fallback OCR")
|
||||
return self._extraire_avec_ocr(image_base64)
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Erreur IA: {e}")
|
||||
return None
|
||||
|
||||
def _construire_prompt_ia(self) -> str:
|
||||
"""Construit le prompt pour l'extraction IA."""
|
||||
prompt = """Analyse cette image et extrait les données du tableau visible.
|
||||
|
||||
INSTRUCTIONS:
|
||||
1. Identifie toutes les lignes et colonnes du tableau
|
||||
2. Retourne les données au format JSON: une liste de dictionnaires
|
||||
3. Utilise les en-têtes de colonnes comme clés si visibles
|
||||
4. Si pas d'en-têtes, utilise "col1", "col2", etc.
|
||||
5. Retourne UNIQUEMENT le JSON, sans explication
|
||||
|
||||
"""
|
||||
if self.colonnes_attendues:
|
||||
prompt += f"Colonnes attendues: {', '.join(self.colonnes_attendues)}\n\n"
|
||||
|
||||
if self.max_lignes < 100:
|
||||
prompt += f"Limite: {self.max_lignes} premières lignes maximum.\n\n"
|
||||
|
||||
prompt += """FORMAT DE SORTIE (exemple):
|
||||
[
|
||||
{"Nom": "Dupont", "Prénom": "Jean", "Age": "35"},
|
||||
{"Nom": "Martin", "Prénom": "Marie", "Age": "28"}
|
||||
]
|
||||
|
||||
Retourne maintenant le JSON du tableau:"""
|
||||
|
||||
return prompt
|
||||
|
||||
def _parser_reponse_ia(self, texte: str) -> Optional[List]:
|
||||
"""
|
||||
Parse la réponse de l'IA en données structurées.
|
||||
|
||||
Args:
|
||||
texte: Réponse texte de l'IA
|
||||
|
||||
Returns:
|
||||
Liste parsée ou None
|
||||
"""
|
||||
try:
|
||||
# Chercher le JSON dans la réponse
|
||||
# Parfois l'IA ajoute du texte avant/après
|
||||
|
||||
# Essayer de trouver un array JSON
|
||||
match = re.search(r'\[[\s\S]*\]', texte)
|
||||
if match:
|
||||
json_str = match.group()
|
||||
return json.loads(json_str)
|
||||
|
||||
# Essayer de parser directement
|
||||
return json.loads(texte)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
# Fallback: parser comme texte tabulé
|
||||
return self._parser_texte_tabule(texte)
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Erreur parsing: {e}")
|
||||
return None
|
||||
|
||||
def _parser_texte_tabule(self, texte: str) -> List[List[str]]:
|
||||
"""
|
||||
Parse du texte tabulé en liste de listes.
|
||||
|
||||
Args:
|
||||
texte: Texte avec lignes et colonnes
|
||||
|
||||
Returns:
|
||||
Liste de listes
|
||||
"""
|
||||
lignes = texte.strip().split('\n')
|
||||
tableau = []
|
||||
|
||||
for ligne in lignes[:self.max_lignes]:
|
||||
ligne = ligne.strip()
|
||||
if not ligne or ligne.startswith('#'):
|
||||
continue
|
||||
|
||||
# Essayer différents séparateurs
|
||||
if '\t' in ligne:
|
||||
cellules = ligne.split('\t')
|
||||
elif '|' in ligne:
|
||||
cellules = [c.strip() for c in ligne.split('|') if c.strip()]
|
||||
elif ',' in ligne:
|
||||
cellules = ligne.split(',')
|
||||
else:
|
||||
cellules = ligne.split()
|
||||
|
||||
if cellules:
|
||||
tableau.append(cellules)
|
||||
|
||||
return tableau
|
||||
|
||||
def _extraire_avec_ocr(self, image_base64: str) -> Optional[List]:
|
||||
"""
|
||||
Extrait le tableau avec OCR.
|
||||
|
||||
Args:
|
||||
image_base64: Image en base64
|
||||
|
||||
Returns:
|
||||
Liste de listes
|
||||
"""
|
||||
try:
|
||||
# Décoder l'image
|
||||
from PIL import Image
|
||||
|
||||
image_data = base64.b64decode(image_base64)
|
||||
pil_image = Image.open(io.BytesIO(image_data))
|
||||
|
||||
# Essayer EasyOCR
|
||||
try:
|
||||
import easyocr
|
||||
import numpy as np
|
||||
|
||||
reader = easyocr.Reader(['fr', 'en'], gpu=True)
|
||||
img_array = np.array(pil_image)
|
||||
|
||||
results = reader.readtext(img_array)
|
||||
|
||||
# Grouper par lignes (par coordonnée Y)
|
||||
return self._grouper_ocr_en_lignes(results)
|
||||
|
||||
except ImportError:
|
||||
print("⚠️ EasyOCR non disponible")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Erreur OCR: {e}")
|
||||
return None
|
||||
|
||||
def _grouper_ocr_en_lignes(self, results: List) -> List[List[str]]:
|
||||
"""
|
||||
Groupe les résultats OCR en lignes de tableau.
|
||||
|
||||
Args:
|
||||
results: Résultats EasyOCR [(bbox, text, conf), ...]
|
||||
|
||||
Returns:
|
||||
Liste de lignes
|
||||
"""
|
||||
if not results:
|
||||
return []
|
||||
|
||||
# Extraire les positions et textes
|
||||
items = []
|
||||
for bbox, text, conf in results:
|
||||
if conf < 0.3: # Ignorer basse confiance
|
||||
continue
|
||||
# Calculer le centre Y
|
||||
y_center = (bbox[0][1] + bbox[2][1]) / 2
|
||||
x_center = (bbox[0][0] + bbox[2][0]) / 2
|
||||
items.append({'text': text, 'x': x_center, 'y': y_center})
|
||||
|
||||
if not items:
|
||||
return []
|
||||
|
||||
# Trier par Y puis X
|
||||
items.sort(key=lambda i: (i['y'], i['x']))
|
||||
|
||||
# Grouper par lignes (seuil de 20 pixels)
|
||||
lignes = []
|
||||
ligne_courante = []
|
||||
y_precedent = items[0]['y']
|
||||
|
||||
for item in items:
|
||||
if abs(item['y'] - y_precedent) > 20:
|
||||
# Nouvelle ligne
|
||||
if ligne_courante:
|
||||
ligne_courante.sort(key=lambda i: i['x'])
|
||||
lignes.append([i['text'] for i in ligne_courante])
|
||||
ligne_courante = [item]
|
||||
y_precedent = item['y']
|
||||
else:
|
||||
ligne_courante.append(item)
|
||||
|
||||
# Ajouter la dernière ligne
|
||||
if ligne_courante:
|
||||
ligne_courante.sort(key=lambda i: i['x'])
|
||||
lignes.append([i['text'] for i in ligne_courante])
|
||||
|
||||
return lignes[:self.max_lignes]
|
||||
|
||||
def _formater_sortie(self, donnees: List) -> Any:
|
||||
"""
|
||||
Formate les données selon le format de sortie demandé.
|
||||
|
||||
Args:
|
||||
donnees: Données brutes (liste de dicts ou liste de listes)
|
||||
|
||||
Returns:
|
||||
Données formatées
|
||||
"""
|
||||
if not donnees:
|
||||
return [] if self.format_sortie in ['json', 'list'] else ''
|
||||
|
||||
# Si c'est déjà des dictionnaires
|
||||
if isinstance(donnees[0], dict):
|
||||
if self.format_sortie == 'json':
|
||||
return donnees
|
||||
elif self.format_sortie == 'csv':
|
||||
return self._dicts_vers_csv(donnees)
|
||||
else: # list
|
||||
return [list(d.values()) for d in donnees]
|
||||
|
||||
# Si c'est des listes
|
||||
if isinstance(donnees[0], list):
|
||||
if self.format_sortie == 'list':
|
||||
return donnees
|
||||
elif self.format_sortie == 'csv':
|
||||
return self._listes_vers_csv(donnees)
|
||||
else: # json
|
||||
return self._listes_vers_dicts(donnees)
|
||||
|
||||
return donnees
|
||||
|
||||
def _dicts_vers_csv(self, donnees: List[Dict]) -> str:
|
||||
"""Convertit une liste de dicts en CSV."""
|
||||
if not donnees:
|
||||
return ''
|
||||
|
||||
lignes = []
|
||||
|
||||
# En-têtes
|
||||
if self.inclure_entetes:
|
||||
lignes.append(','.join(donnees[0].keys()))
|
||||
|
||||
# Données
|
||||
for row in donnees:
|
||||
lignes.append(','.join(str(v) for v in row.values()))
|
||||
|
||||
return '\n'.join(lignes)
|
||||
|
||||
def _listes_vers_csv(self, donnees: List[List]) -> str:
|
||||
"""Convertit une liste de listes en CSV."""
|
||||
return '\n'.join(','.join(str(c) for c in ligne) for ligne in donnees)
|
||||
|
||||
def _listes_vers_dicts(self, donnees: List[List]) -> List[Dict]:
|
||||
"""Convertit une liste de listes en liste de dicts."""
|
||||
if not donnees:
|
||||
return []
|
||||
|
||||
# Utiliser la première ligne comme en-têtes si demandé
|
||||
if self.inclure_entetes and len(donnees) > 1:
|
||||
entetes = donnees[0]
|
||||
return [dict(zip(entetes, ligne)) for ligne in donnees[1:]]
|
||||
else:
|
||||
# Générer des noms de colonnes
|
||||
nb_cols = max(len(ligne) for ligne in donnees)
|
||||
entetes = [f'col{i+1}' for i in range(nb_cols)]
|
||||
return [dict(zip(entetes, ligne)) for ligne in donnees]
|
||||
|
||||
def get_action_info(self) -> Dict[str, Any]:
|
||||
"""Retourne les informations de l'action."""
|
||||
return {
|
||||
'action_id': self.action_id,
|
||||
'name': self.name,
|
||||
'description': self.description,
|
||||
'type': 'extraire_tableau',
|
||||
'parameters': {
|
||||
'mode': self.mode_extraction,
|
||||
'format_sortie': self.format_sortie,
|
||||
'max_lignes': self.max_lignes,
|
||||
'variable_sortie': self.variable_sortie
|
||||
},
|
||||
'status': self.current_status.value
|
||||
}
|
||||
|
||||
|
||||
# Alias anglais pour compatibilité
|
||||
VWBExtractTableAction = VWBExtraireTableauAction
|
||||
Reference in New Issue
Block a user