From 9da804bb6e103fa0564f806825855a4916cd4d2a Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 16 Mar 2026 23:10:51 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20import=20Excel=20=E2=86=92=20SQLite=20+?= =?UTF-8?q?=20boucle=20donn=C3=A9es=20=E2=86=92=20UI=20dans=20le=20VWB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ExcelImporter : import .xlsx → SQLite auto (détection types, batch insert) - DBIterator : lecture ligne par ligne avec filtre/tri/limite - VWB actions : "Importer Excel" + "Pour chaque ligne" dans la palette - DAG executor : pré-exécution import, boucle foreach avec injection ${current_row.colonne} dans les étapes dépendantes - 36 tests unitaires Excel/DB (tous passent) Co-Authored-By: Claude Opus 4.6 (1M context) --- core/data/__init__.py | 17 + core/data/db_iterator.py | 219 +++++++ core/data/excel_importer.py | 555 ++++++++++++++++++ tests/unit/test_excel_importer.py | 520 ++++++++++++++++ .../backend/api_v3/dag_execute.py | 394 ++++++++++++- .../backend/contracts/action_contracts.py | 17 + .../src/components/PropertiesPanel.tsx | 88 +++ .../frontend_v4/src/components/StepNode.tsx | 20 +- .../frontend_v4/src/types.ts | 6 + 9 files changed, 1832 insertions(+), 4 deletions(-) create mode 100644 core/data/__init__.py create mode 100644 core/data/db_iterator.py create mode 100644 core/data/excel_importer.py create mode 100644 tests/unit/test_excel_importer.py diff --git a/core/data/__init__.py b/core/data/__init__.py new file mode 100644 index 000000000..f80b3f013 --- /dev/null +++ b/core/data/__init__.py @@ -0,0 +1,17 @@ +""" +core.data — Import Excel et itération SQLite pour l'injection UI. + +Modules : +- ExcelImporter : import Excel → SQLite (auto-détection colonnes/types) +- DBIterator : itération sur tables SQLite pour le DAGExecutor +""" + +from .excel_importer import ExcelImporter, ImportResult, PreviewResult +from .db_iterator import DBIterator + +__all__ = [ + "ExcelImporter", + "ImportResult", + "PreviewResult", + "DBIterator", +] diff --git a/core/data/db_iterator.py b/core/data/db_iterator.py new file mode 100644 index 000000000..2d36d64d9 --- /dev/null +++ b/core/data/db_iterator.py @@ -0,0 +1,219 @@ +""" +DBIterator — Itération sur les lignes d'une table SQLite pour l'injection UI. + +Utilisé par le DAGExecutor pour lire chaque ligne et remplir +les champs d'un logiciel via les actions UI. + +Auteur : Dom, Claude — mars 2026 +""" + +import logging +import sqlite3 +from pathlib import Path +from typing import Dict, Iterator, List, Optional + +logger = logging.getLogger(__name__) + +# Chemin par défaut (même que ExcelImporter) +DEFAULT_DB_PATH = "data/databases/rpa_data.db" + + +class DBIterator: + """Itère sur les lignes d'une table SQLite pour l'injection UI. + + Fournit une interface simple pour : + - Itérer sur les lignes (comme dictionnaires) + - Compter les lignes + - Lister les tables et colonnes + - Récupérer une ligne par son rowid + + Thread-safe : chaque appel ouvre sa propre connexion. + """ + + def __init__(self, db_path: str = DEFAULT_DB_PATH): + """ + Initialise l'itérateur. + + Args: + db_path: Chemin vers la base SQLite + """ + self.db_path = Path(db_path) + + # ------------------------------------------------------------------ + # Connexion + # ------------------------------------------------------------------ + + def _connect(self) -> sqlite3.Connection: + """Ouvre une connexion SQLite en mode WAL.""" + if not self.db_path.exists(): + raise FileNotFoundError( + f"Base de données introuvable : {self.db_path}" + ) + conn = sqlite3.connect(str(self.db_path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + return conn + + # ------------------------------------------------------------------ + # Itération + # ------------------------------------------------------------------ + + def iterate( + self, + table_name: str, + where: Optional[str] = None, + order_by: Optional[str] = None, + limit: Optional[int] = None, + ) -> Iterator[Dict]: + """Itère sur les lignes d'une table comme dictionnaires. + + Chaque ligne est un dict {nom_colonne: valeur}. + Les colonnes internes (_rowid, _imported_at) sont incluses. + + Args: + table_name: Nom de la table + where: Clause WHERE (sans le mot-clé WHERE), ex: "age > 18" + order_by: Clause ORDER BY, ex: "nom ASC" + limit: Nombre max de lignes + + Yields: + Dict pour chaque ligne + + Exemple: + for row in iterator.iterate("patients", where="age > 30"): + print(row["nom"], row["age"]) + """ + sql = f'SELECT * FROM "{table_name}"' + params: list = [] + + if where: + sql += f" WHERE {where}" + + if order_by: + sql += f" ORDER BY {order_by}" + + if limit is not None: + sql += " LIMIT ?" + params.append(limit) + + conn = self._connect() + try: + cursor = conn.execute(sql, params) + for row in cursor: + yield dict(row) + finally: + conn.close() + + # ------------------------------------------------------------------ + # Comptage + # ------------------------------------------------------------------ + + def count(self, table_name: str, where: Optional[str] = None) -> int: + """Compte les lignes d'une table. + + Args: + table_name: Nom de la table + where: Clause WHERE optionnelle + + Returns: + Nombre de lignes + """ + sql = f'SELECT COUNT(*) as cnt FROM "{table_name}"' + + if where: + sql += f" WHERE {where}" + + conn = self._connect() + try: + row = conn.execute(sql).fetchone() + return row["cnt"] if row else 0 + finally: + conn.close() + + # ------------------------------------------------------------------ + # Métadonnées + # ------------------------------------------------------------------ + + def get_columns(self, table_name: str) -> List[Dict]: + """Retourne les colonnes et leurs types. + + Args: + table_name: Nom de la table + + Returns: + Liste de dicts avec : name, type, notnull, default_value, pk + """ + conn = self._connect() + try: + cursor = conn.execute(f'PRAGMA table_info("{table_name}")') + columns = [] + for row in cursor: + columns.append({ + "name": row["name"], + "type": row["type"], + "notnull": bool(row["notnull"]), + "default_value": row["dflt_value"], + "pk": bool(row["pk"]), + }) + return columns + finally: + conn.close() + + def list_tables(self, db_path: Optional[str] = None) -> List[str]: + """Liste les tables disponibles. + + Args: + db_path: Chemin alternatif (utilise self.db_path par défaut) + + Returns: + Liste des noms de tables (hors tables système sqlite_*) + """ + path = Path(db_path) if db_path else self.db_path + + if not path.exists(): + return [] + + conn = sqlite3.connect(str(path)) + try: + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' " + "AND name NOT LIKE 'sqlite_%' ORDER BY name" + ) + return [row[0] for row in cursor.fetchall()] + finally: + conn.close() + + # ------------------------------------------------------------------ + # Accès par identifiant + # ------------------------------------------------------------------ + + def get_row(self, table_name: str, row_id: int) -> Optional[Dict]: + """Récupère une ligne par son _rowid (ou rowid SQLite). + + Args: + table_name: Nom de la table + row_id: ID de la ligne (_rowid ou rowid) + + Returns: + Dict de la ligne ou None si introuvable + """ + # Essayer d'abord avec _rowid (colonne explicite créée par ExcelImporter) + # puis avec le rowid implicite de SQLite + conn = self._connect() + try: + # Vérifier si la table a une colonne _rowid explicite + columns = [ + row[1] for row in conn.execute( + f'PRAGMA table_info("{table_name}")' + ).fetchall() + ] + + if "_rowid" in columns: + sql = f'SELECT * FROM "{table_name}" WHERE _rowid = ?' + else: + sql = f'SELECT * FROM "{table_name}" WHERE rowid = ?' + + row = conn.execute(sql, (row_id,)).fetchone() + return dict(row) if row else None + finally: + conn.close() diff --git a/core/data/excel_importer.py b/core/data/excel_importer.py new file mode 100644 index 000000000..ac6507070 --- /dev/null +++ b/core/data/excel_importer.py @@ -0,0 +1,555 @@ +""" +ExcelImporter — Import de fichiers Excel dans une base SQLite. + +Détecte automatiquement les colonnes, types, et crée la table. +Supporte .xlsx et .xls (via openpyxl). + +Auteur : Dom, Claude — mars 2026 +""" + +import logging +import re +import sqlite3 +from dataclasses import dataclass, field +from datetime import datetime, date +from pathlib import Path +from typing import Any, Dict, List, Optional + +import openpyxl + +logger = logging.getLogger(__name__) + +# Chemin par défaut de la base de données +DEFAULT_DB_PATH = "data/databases/rpa_data.db" + +# Nombre de lignes analysées pour la détection de types +TYPE_DETECTION_SAMPLE_SIZE = 100 + + +@dataclass +class ImportResult: + """Résultat d'un import Excel.""" + table_name: str + row_count: int + column_count: int + columns: Dict[str, str] # nom_colonne → type_sqlite + db_path: str + sheet_name: str + skipped_rows: int = 0 + errors: List[str] = field(default_factory=list) + + @property + def success(self) -> bool: + return self.row_count > 0 and not self.errors + + +@dataclass +class PreviewResult: + """Aperçu d'un fichier Excel avant import.""" + headers: List[str] + rows: List[List[Any]] + total_rows: int + sheet_name: str + detected_types: Dict[str, str] + + +class ExcelImporter: + """Importe un fichier Excel dans une base SQLite. + + Détecte automatiquement les colonnes, types, et crée la table. + Supporte .xlsx et .xls (via openpyxl). + """ + + def __init__(self, db_path: str = DEFAULT_DB_PATH): + """ + Initialise l'importeur. + + Args: + db_path: Chemin vers la base SQLite (créée si inexistante) + """ + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + # ------------------------------------------------------------------ + # API publique + # ------------------------------------------------------------------ + + def import_file( + self, + excel_path: str, + table_name: Optional[str] = None, + sheet_name: Optional[str] = None, + ) -> ImportResult: + """Importe un fichier Excel complet. + + 1. Lit les headers (première ligne) + 2. Détecte les types (text, integer, real, date) + 3. Crée la table SQLite (CREATE TABLE IF NOT EXISTS) + 4. Insère toutes les lignes en batch (INSERT) + 5. Retourne un résumé + + Args: + excel_path: Chemin du fichier .xlsx + table_name: Nom de la table (déduit du fichier si absent) + sheet_name: Feuille à importer (première par défaut) + + Returns: + ImportResult avec le résumé de l'import + """ + excel_path = Path(excel_path) + if not excel_path.exists(): + raise FileNotFoundError(f"Fichier introuvable : {excel_path}") + + # Ouvrir le classeur + wb = openpyxl.load_workbook(str(excel_path), read_only=True, data_only=True) + try: + ws = self._get_sheet(wb, sheet_name) + actual_sheet_name = ws.title + + # Lire toutes les lignes + all_rows = list(ws.iter_rows(values_only=True)) + finally: + wb.close() + + if len(all_rows) < 1: + raise ValueError(f"Le fichier est vide : {excel_path}") + + # Extraire les headers de la première ligne + raw_headers = all_rows[0] + headers = self._clean_headers(raw_headers) + data_rows = all_rows[1:] + + if not headers: + raise ValueError("Aucune colonne détectée dans la première ligne") + + # Déterminer le nom de la table + if table_name is None: + table_name = self._sanitize_table_name(excel_path.stem) + else: + table_name = self._sanitize_table_name(table_name) + + # Détecter les types de colonnes + col_types = self._detect_column_types(headers, data_rows) + + # Créer la table et insérer les données + row_count, skipped, errors = self._create_and_insert( + table_name, headers, col_types, data_rows + ) + + result = ImportResult( + table_name=table_name, + row_count=row_count, + column_count=len(headers), + columns=col_types, + db_path=str(self.db_path), + sheet_name=actual_sheet_name, + skipped_rows=skipped, + errors=errors, + ) + + logger.info( + "Import terminé : %s → table '%s' (%d lignes, %d colonnes)", + excel_path.name, + table_name, + row_count, + len(headers), + ) + return result + + def preview( + self, + excel_path: str, + max_rows: int = 5, + sheet_name: Optional[str] = None, + ) -> PreviewResult: + """Aperçu des données avant import (headers + quelques lignes). + + Args: + excel_path: Chemin du fichier .xlsx + max_rows: Nombre max de lignes à prévisualiser + sheet_name: Feuille à lire (première par défaut) + + Returns: + PreviewResult avec les headers, quelques lignes et types détectés + """ + excel_path = Path(excel_path) + if not excel_path.exists(): + raise FileNotFoundError(f"Fichier introuvable : {excel_path}") + + wb = openpyxl.load_workbook(str(excel_path), read_only=True, data_only=True) + try: + ws = self._get_sheet(wb, sheet_name) + actual_sheet_name = ws.title + all_rows = list(ws.iter_rows(values_only=True)) + finally: + wb.close() + + if len(all_rows) < 1: + raise ValueError(f"Le fichier est vide : {excel_path}") + + raw_headers = all_rows[0] + headers = self._clean_headers(raw_headers) + data_rows = all_rows[1:] + + # Convertir les lignes d'aperçu en listes (pas tuples) + preview_rows = [list(row) for row in data_rows[:max_rows]] + + # Détecter les types sur l'ensemble des données (pas juste l'aperçu) + detected_types = self._detect_column_types(headers, data_rows) + + return PreviewResult( + headers=headers, + rows=preview_rows, + total_rows=len(data_rows), + sheet_name=actual_sheet_name, + detected_types=detected_types, + ) + + def list_sheets(self, excel_path: str) -> List[str]: + """Liste les feuilles d'un fichier Excel. + + Args: + excel_path: Chemin du fichier .xlsx + + Returns: + Liste des noms de feuilles + """ + excel_path = Path(excel_path) + if not excel_path.exists(): + raise FileNotFoundError(f"Fichier introuvable : {excel_path}") + + wb = openpyxl.load_workbook(str(excel_path), read_only=True) + try: + return wb.sheetnames + finally: + wb.close() + + # ------------------------------------------------------------------ + # Détection de types + # ------------------------------------------------------------------ + + def _detect_column_types( + self, + headers: List[str], + data_rows: List[tuple], + ) -> Dict[str, str]: + """Détecte les types SQLite à partir des données. + + Analyse un échantillon de lignes et détermine le meilleur type + SQLite pour chaque colonne : TEXT, INTEGER, REAL ou TEXT (pour dates). + + Args: + headers: Liste des noms de colonnes + data_rows: Lignes de données (tuples) + + Returns: + Dict nom_colonne → type SQLite + """ + sample = data_rows[:TYPE_DETECTION_SAMPLE_SIZE] + col_types: Dict[str, str] = {} + + for col_idx, header in enumerate(headers): + # Collecter les valeurs non-nulles de cette colonne + values = [] + for row in sample: + if col_idx < len(row) and row[col_idx] is not None: + values.append(row[col_idx]) + + if not values: + col_types[header] = "TEXT" + continue + + col_types[header] = self._infer_type(values) + + return col_types + + def _infer_type(self, values: List[Any]) -> str: + """Infère le type SQLite d'une colonne à partir de ses valeurs. + + Priorité : INTEGER > REAL > TEXT + Les dates sont stockées en TEXT (format ISO). + """ + has_int = False + has_float = False + has_date = False + has_text = False + + for val in values: + if isinstance(val, bool): + # bool est un sous-type de int en Python, traiter avant int + has_int = True + elif isinstance(val, int): + has_int = True + elif isinstance(val, float): + has_float = True + elif isinstance(val, (datetime, date)): + has_date = True + elif isinstance(val, str): + # Essayer de parser comme nombre + stripped = val.strip() + if self._is_integer(stripped): + has_int = True + elif self._is_float(stripped): + has_float = True + elif self._is_date_string(stripped): + has_date = True + else: + has_text = True + else: + has_text = True + + # Si on a du texte pur, c'est TEXT + if has_text: + return "TEXT" + + # Si mélange int/float, c'est REAL + if has_float: + return "REAL" + + # Si que des int (ou bools) + if has_int and not has_date: + return "INTEGER" + + # Dates pures → TEXT (format ISO) + if has_date: + return "TEXT" + + return "TEXT" + + @staticmethod + def _is_integer(s: str) -> bool: + """Vérifie si une chaîne représente un entier.""" + if not s: + return False + try: + int(s) + return True + except ValueError: + return False + + @staticmethod + def _is_float(s: str) -> bool: + """Vérifie si une chaîne représente un nombre décimal.""" + if not s: + return False + try: + float(s) + # Rejeter les cas déjà gérés par is_integer (pas de point) + return "." in s or "e" in s.lower() + except ValueError: + return False + + @staticmethod + def _is_date_string(s: str) -> bool: + """Vérifie si une chaîne ressemble à une date (formats courants).""" + # Patterns courants : YYYY-MM-DD, DD/MM/YYYY, DD-MM-YYYY + date_patterns = [ + r"^\d{4}-\d{2}-\d{2}$", + r"^\d{2}/\d{2}/\d{4}$", + r"^\d{2}-\d{2}-\d{4}$", + r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}", + ] + return any(re.match(p, s) for p in date_patterns) + + # ------------------------------------------------------------------ + # Création de table et insertion + # ------------------------------------------------------------------ + + def _create_and_insert( + self, + table_name: str, + headers: List[str], + col_types: Dict[str, str], + data_rows: List[tuple], + ) -> tuple: + """Crée la table SQLite et insère les données en batch. + + Args: + table_name: Nom de la table + headers: Noms de colonnes + col_types: Types SQLite par colonne + data_rows: Lignes de données + + Returns: + Tuple (nombre_insérées, nombre_ignorées, liste_erreurs) + """ + # Construire la requête CREATE TABLE + col_defs = [] + for h in headers: + sqlite_type = col_types.get(h, "TEXT") + # Noms de colonnes échappés avec des guillemets doubles + col_defs.append(f'"{h}" {sqlite_type}') + + create_sql = ( + f'CREATE TABLE IF NOT EXISTS "{table_name}" (\n' + f" _rowid INTEGER PRIMARY KEY AUTOINCREMENT,\n" + f" {', '.join(col_defs)},\n" + f" _imported_at TEXT DEFAULT CURRENT_TIMESTAMP\n" + f")" + ) + + # Requête d'insertion + placeholders = ", ".join(["?"] * len(headers)) + col_names = ", ".join(f'"{h}"' for h in headers) + insert_sql = f'INSERT INTO "{table_name}" ({col_names}) VALUES ({placeholders})' + + row_count = 0 + skipped = 0 + errors: List[str] = [] + + conn = sqlite3.connect(str(self.db_path)) + try: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + + # Créer la table + conn.execute(create_sql) + + # Préparer les données pour l'insertion batch + batch: List[tuple] = [] + for row_idx, row in enumerate(data_rows): + # Ignorer les lignes entièrement vides + if all(v is None for v in row): + skipped += 1 + continue + + # Aligner la longueur de la ligne sur le nombre de colonnes + values = [] + for col_idx, header in enumerate(headers): + if col_idx < len(row): + val = row[col_idx] + val = self._convert_value(val, col_types.get(header, "TEXT")) + else: + val = None + values.append(val) + + batch.append(tuple(values)) + + # Insertion en batch + if batch: + conn.executemany(insert_sql, batch) + row_count = len(batch) + + conn.commit() + + except Exception as e: + conn.rollback() + errors.append(f"Erreur lors de l'insertion : {e}") + logger.error("Erreur import : %s", e) + finally: + conn.close() + + return row_count, skipped, errors + + @staticmethod + def _convert_value(val: Any, target_type: str) -> Any: + """Convertit une valeur Python pour SQLite. + + - datetime/date → str ISO + - int/float → tel quel + - str → nettoyé (strip) + - None → None + """ + if val is None: + return None + + if isinstance(val, (datetime, date)): + return val.isoformat() + + if isinstance(val, str): + val = val.strip() + if not val: + return None + + if target_type == "INTEGER": + try: + return int(val) + except (ValueError, TypeError): + return val + elif target_type == "REAL": + try: + return float(val) + except (ValueError, TypeError): + return val + + return val + + return val + + # ------------------------------------------------------------------ + # Utilitaires + # ------------------------------------------------------------------ + + def _sanitize_table_name(self, name: str) -> str: + """Nettoie le nom pour SQLite. + + - Remplace les caractères spéciaux par des underscores + - Supprime les espaces en début/fin + - Ajoute un préfixe si le nom commence par un chiffre + - Convertit en minuscules + """ + if not name: + return "import_data" + + # Supprimer les accents/caractères spéciaux (garder alphanum + underscore) + clean = re.sub(r"[^\w]", "_", name.strip()) + + # Supprimer les underscores multiples + clean = re.sub(r"_+", "_", clean) + + # Supprimer les underscores en début/fin + clean = clean.strip("_") + + # Minuscules + clean = clean.lower() + + # Préfixer si commence par un chiffre + if clean and clean[0].isdigit(): + clean = f"t_{clean}" + + # Fallback + if not clean: + clean = "import_data" + + return clean + + def _clean_headers(self, raw_headers: tuple) -> List[str]: + """Nettoie les noms de colonnes. + + - Supprime les None + - Strip les espaces + - Déduplique les noms identiques (ajoute un suffixe _2, _3, etc.) + """ + headers: List[str] = [] + seen: Dict[str, int] = {} + + for h in raw_headers: + if h is None: + continue + + name = str(h).strip() + if not name: + continue + + # Dédupliquer + if name in seen: + seen[name] += 1 + name = f"{name}_{seen[name]}" + else: + seen[name] = 1 + + headers.append(name) + + return headers + + @staticmethod + def _get_sheet(wb: openpyxl.Workbook, sheet_name: Optional[str]) -> Any: + """Récupère la feuille demandée ou la première par défaut.""" + if sheet_name: + if sheet_name not in wb.sheetnames: + raise ValueError( + f"Feuille '{sheet_name}' introuvable. " + f"Feuilles disponibles : {wb.sheetnames}" + ) + return wb[sheet_name] + return wb.active diff --git a/tests/unit/test_excel_importer.py b/tests/unit/test_excel_importer.py new file mode 100644 index 000000000..a0dd52e67 --- /dev/null +++ b/tests/unit/test_excel_importer.py @@ -0,0 +1,520 @@ +""" +Tests unitaires pour ExcelImporter et DBIterator. + +Crée des fichiers Excel temporaires via openpyxl et vérifie +l'import SQLite, la détection de types et l'itération. +""" + +import os +import sqlite3 +import tempfile +from datetime import datetime, date +from pathlib import Path + +import openpyxl +import pytest + +from core.data.excel_importer import ExcelImporter, ImportResult, PreviewResult +from core.data.db_iterator import DBIterator + + +# ------------------------------------------------------------------ +# Fixtures +# ------------------------------------------------------------------ + + +@pytest.fixture +def tmp_dir(tmp_path): + """Dossier temporaire pour les fichiers de test.""" + return tmp_path + + +@pytest.fixture +def db_path(tmp_dir): + """Chemin de base SQLite temporaire.""" + return str(tmp_dir / "test.db") + + +@pytest.fixture +def importer(db_path): + """Instance ExcelImporter avec DB temporaire.""" + return ExcelImporter(db_path=db_path) + + +@pytest.fixture +def iterator(db_path): + """Instance DBIterator avec DB temporaire.""" + return DBIterator(db_path=db_path) + + +@pytest.fixture +def simple_excel(tmp_dir): + """Fichier Excel simple : 3 colonnes, 5 lignes.""" + path = tmp_dir / "simple.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.title = "Données" + + # Headers + ws.append(["Nom", "Age", "Ville"]) + + # Données + ws.append(["Alice", 30, "Paris"]) + ws.append(["Bob", 25, "Lyon"]) + ws.append(["Charlie", 35, "Marseille"]) + ws.append(["Diana", 28, "Toulouse"]) + ws.append(["Eve", 42, "Bordeaux"]) + + wb.save(str(path)) + return str(path) + + +@pytest.fixture +def typed_excel(tmp_dir): + """Fichier Excel avec différents types : texte, entier, décimal, date.""" + path = tmp_dir / "typed.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.title = "Types" + + ws.append(["Nom", "Quantite", "Prix", "Date_achat"]) + ws.append(["Produit A", 10, 19.99, datetime(2026, 1, 15)]) + ws.append(["Produit B", 5, 42.50, datetime(2026, 2, 20)]) + ws.append(["Produit C", 100, 3.00, datetime(2026, 3, 1)]) + ws.append(["Produit D", 1, 999.99, datetime(2026, 3, 10)]) + + wb.save(str(path)) + return str(path) + + +@pytest.fixture +def multi_sheet_excel(tmp_dir): + """Fichier Excel avec plusieurs feuilles.""" + path = tmp_dir / "multi.xlsx" + wb = openpyxl.Workbook() + + # Feuille 1 + ws1 = wb.active + ws1.title = "Patients" + ws1.append(["NomPatient", "IPP"]) + ws1.append(["Dupont", "12345"]) + ws1.append(["Martin", "67890"]) + + # Feuille 2 + ws2 = wb.create_sheet("Séjours") + ws2.append(["NumSejour", "DateEntree", "DateSortie"]) + ws2.append(["S001", "2026-01-01", "2026-01-05"]) + + # Feuille 3 + ws3 = wb.create_sheet("Diagnostics") + ws3.append(["Code", "Libelle"]) + ws3.append(["A00", "Choléra"]) + + wb.save(str(path)) + return str(path) + + +# ------------------------------------------------------------------ +# Tests ExcelImporter — Import +# ------------------------------------------------------------------ + + +class TestImportSimpleExcel: + """Test import d'un fichier Excel simple (3 colonnes, 5 lignes).""" + + def test_import_simple_excel(self, importer, simple_excel): + """L'import crée la table et insère les 5 lignes.""" + result = importer.import_file(simple_excel) + + assert isinstance(result, ImportResult) + assert result.success + assert result.row_count == 5 + assert result.column_count == 3 + assert result.table_name == "simple" + assert result.sheet_name == "Données" + assert "Nom" in result.columns + assert "Age" in result.columns + assert "Ville" in result.columns + + def test_import_with_custom_table_name(self, importer, simple_excel): + """L'import utilise le nom de table personnalisé.""" + result = importer.import_file(simple_excel, table_name="mes_patients") + + assert result.table_name == "mes_patients" + assert result.row_count == 5 + + def test_import_creates_db_file(self, importer, simple_excel, db_path): + """L'import crée le fichier SQLite.""" + importer.import_file(simple_excel) + assert Path(db_path).exists() + + def test_import_data_readable(self, importer, simple_excel, db_path): + """Les données importées sont lisibles en SQL.""" + importer.import_file(simple_excel) + + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + rows = conn.execute('SELECT * FROM "simple" ORDER BY _rowid').fetchall() + conn.close() + + assert len(rows) == 5 + assert rows[0]["Nom"] == "Alice" + assert rows[0]["Age"] == 30 + assert rows[0]["Ville"] == "Paris" + assert rows[4]["Nom"] == "Eve" + + +# ------------------------------------------------------------------ +# Tests ExcelImporter — Détection de types +# ------------------------------------------------------------------ + + +class TestAutoDetectTypes: + """Test la détection automatique des types de colonnes.""" + + def test_auto_detect_types(self, importer, typed_excel): + """Détecte correctement texte, entier, décimal et date.""" + result = importer.import_file(typed_excel) + + assert result.columns["Nom"] == "TEXT" + assert result.columns["Quantite"] == "INTEGER" + assert result.columns["Prix"] == "REAL" + # Les dates datetime sont converties en ISO string → TEXT + assert result.columns["Date_achat"] == "TEXT" + + def test_detect_types_with_mixed_numbers(self, importer, tmp_dir): + """Si une colonne mélange int et float, le type est REAL.""" + path = tmp_dir / "mixed.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["Valeur"]) + ws.append([10]) + ws.append([20.5]) + ws.append([30]) + wb.save(str(path)) + + result = importer.import_file(str(path)) + assert result.columns["Valeur"] == "REAL" + + def test_detect_types_all_none(self, importer, tmp_dir): + """Colonne entièrement vide → TEXT par défaut.""" + path = tmp_dir / "nulls.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["Vide", "Plein"]) + ws.append([None, "A"]) + ws.append([None, "B"]) + wb.save(str(path)) + + result = importer.import_file(str(path)) + assert result.columns["Vide"] == "TEXT" + assert result.columns["Plein"] == "TEXT" + + +# ------------------------------------------------------------------ +# Tests ExcelImporter — Preview +# ------------------------------------------------------------------ + + +class TestPreview: + """Test l'aperçu avant import.""" + + def test_preview(self, importer, simple_excel): + """L'aperçu retourne les headers et quelques lignes.""" + preview = importer.preview(simple_excel, max_rows=3) + + assert isinstance(preview, PreviewResult) + assert preview.headers == ["Nom", "Age", "Ville"] + assert len(preview.rows) == 3 + assert preview.total_rows == 5 + assert preview.sheet_name == "Données" + assert "Nom" in preview.detected_types + + def test_preview_max_rows(self, importer, simple_excel): + """L'aperçu respecte la limite max_rows.""" + preview = importer.preview(simple_excel, max_rows=2) + assert len(preview.rows) == 2 + + def test_preview_returns_detected_types(self, importer, typed_excel): + """L'aperçu inclut les types détectés.""" + preview = importer.preview(typed_excel) + assert preview.detected_types["Quantite"] == "INTEGER" + assert preview.detected_types["Prix"] == "REAL" + + +# ------------------------------------------------------------------ +# Tests ExcelImporter — Feuilles multiples +# ------------------------------------------------------------------ + + +class TestListSheets: + """Test la gestion des feuilles multiples.""" + + def test_list_sheets(self, importer, multi_sheet_excel): + """Liste correctement les feuilles.""" + sheets = importer.list_sheets(multi_sheet_excel) + + assert len(sheets) == 3 + assert "Patients" in sheets + assert "Séjours" in sheets + assert "Diagnostics" in sheets + + def test_import_specific_sheet(self, importer, multi_sheet_excel): + """Import d'une feuille spécifique.""" + result = importer.import_file( + multi_sheet_excel, sheet_name="Séjours", table_name="sejours" + ) + + assert result.sheet_name == "Séjours" + assert result.row_count == 1 + assert "NumSejour" in result.columns + + def test_import_nonexistent_sheet_raises(self, importer, multi_sheet_excel): + """Feuille inexistante → ValueError.""" + with pytest.raises(ValueError, match="introuvable"): + importer.import_file(multi_sheet_excel, sheet_name="Inexistante") + + +# ------------------------------------------------------------------ +# Tests ExcelImporter — Sanitize +# ------------------------------------------------------------------ + + +class TestSanitizeTableName: + """Test le nettoyage des noms de tables.""" + + def test_sanitize_table_name(self, importer): + """Les caractères spéciaux sont remplacés par des underscores.""" + assert importer._sanitize_table_name("Mon Fichier (2)") == "mon_fichier_2" + + def test_sanitize_numeric_prefix(self, importer): + """Un nom commençant par un chiffre reçoit un préfixe.""" + assert importer._sanitize_table_name("2026_data") == "t_2026_data" + + def test_sanitize_empty(self, importer): + """Un nom vide donne 'import_data'.""" + assert importer._sanitize_table_name("") == "import_data" + + def test_sanitize_accents(self, importer): + """Les accents sont conservés (chars alphanumériques en Python).""" + result = importer._sanitize_table_name("données_été") + assert "donn" in result # Le mot est conservé + + def test_sanitize_special_chars(self, importer): + """Les tirets, points, parenthèses sont nettoyés.""" + result = importer._sanitize_table_name("fichier-test.v2 (copie)") + assert "_" in result + assert "(" not in result + assert ")" not in result + assert "." not in result + + +# ------------------------------------------------------------------ +# Tests DBIterator — Itération +# ------------------------------------------------------------------ + + +class TestIterateRows: + """Test l'itération sur les lignes.""" + + def test_iterate_rows(self, importer, iterator, simple_excel): + """L'itération retourne toutes les lignes en dicts.""" + importer.import_file(simple_excel) + + rows = list(iterator.iterate("simple")) + assert len(rows) == 5 + assert rows[0]["Nom"] == "Alice" + assert rows[0]["Age"] == 30 + assert rows[4]["Nom"] == "Eve" + + def test_iterate_with_limit(self, importer, iterator, simple_excel): + """L'itération respecte la limite.""" + importer.import_file(simple_excel) + + rows = list(iterator.iterate("simple", limit=2)) + assert len(rows) == 2 + + def test_iterate_with_order(self, importer, iterator, simple_excel): + """L'itération peut être ordonnée.""" + importer.import_file(simple_excel) + + rows = list(iterator.iterate("simple", order_by="Age DESC")) + assert rows[0]["Nom"] == "Eve" # 42 ans, le plus âgé + assert rows[-1]["Nom"] == "Bob" # 25 ans, le plus jeune + + +class TestIterateWithWhere: + """Test l'itération avec clause WHERE.""" + + def test_iterate_with_where(self, importer, iterator, simple_excel): + """Le filtre WHERE fonctionne.""" + importer.import_file(simple_excel) + + rows = list(iterator.iterate("simple", where="Age > 30")) + assert len(rows) == 2 # Charlie (35) et Eve (42) + noms = {r["Nom"] for r in rows} + assert "Charlie" in noms + assert "Eve" in noms + + def test_iterate_with_where_text(self, importer, iterator, simple_excel): + """Le filtre WHERE fonctionne sur les colonnes texte.""" + importer.import_file(simple_excel) + + rows = list(iterator.iterate("simple", where="Ville = 'Paris'")) + assert len(rows) == 1 + assert rows[0]["Nom"] == "Alice" + + +# ------------------------------------------------------------------ +# Tests DBIterator — Comptage +# ------------------------------------------------------------------ + + +class TestCount: + """Test le comptage de lignes.""" + + def test_count(self, importer, iterator, simple_excel): + """Compte toutes les lignes.""" + importer.import_file(simple_excel) + assert iterator.count("simple") == 5 + + def test_count_with_where(self, importer, iterator, simple_excel): + """Compte avec filtre.""" + importer.import_file(simple_excel) + assert iterator.count("simple", where="Age >= 30") == 3 # Alice, Charlie, Eve + + +# ------------------------------------------------------------------ +# Tests DBIterator — Métadonnées +# ------------------------------------------------------------------ + + +class TestListTables: + """Test la liste des tables.""" + + def test_list_tables(self, importer, iterator, simple_excel, typed_excel): + """Liste les tables créées par les imports.""" + importer.import_file(simple_excel, table_name="table_a") + importer.import_file(typed_excel, table_name="table_b") + + tables = iterator.list_tables() + assert "table_a" in tables + assert "table_b" in tables + + def test_list_tables_empty_db(self, tmp_dir): + """DB vide → liste vide.""" + db_path = str(tmp_dir / "empty.db") + # Créer une DB vide + conn = sqlite3.connect(db_path) + conn.close() + + it = DBIterator(db_path=db_path) + assert it.list_tables() == [] + + def test_list_tables_no_db(self, tmp_dir): + """DB inexistante → liste vide.""" + it = DBIterator(db_path=str(tmp_dir / "nope.db")) + assert it.list_tables() == [] + + +class TestGetColumns: + """Test la récupération des métadonnées de colonnes.""" + + def test_get_columns(self, importer, iterator, simple_excel): + """Retourne les colonnes avec leurs types.""" + importer.import_file(simple_excel) + + columns = iterator.get_columns("simple") + col_names = [c["name"] for c in columns] + + assert "_rowid" in col_names + assert "Nom" in col_names + assert "Age" in col_names + assert "Ville" in col_names + + # Vérifier les types + col_map = {c["name"]: c["type"] for c in columns} + assert col_map["Nom"] == "TEXT" + assert col_map["Age"] == "INTEGER" + + +class TestGetRow: + """Test la récupération d'une ligne par ID.""" + + def test_get_row(self, importer, iterator, simple_excel): + """Récupère une ligne par son _rowid.""" + importer.import_file(simple_excel) + + row = iterator.get_row("simple", 1) + assert row is not None + assert row["Nom"] == "Alice" + + def test_get_row_not_found(self, importer, iterator, simple_excel): + """Ligne inexistante → None.""" + importer.import_file(simple_excel) + + row = iterator.get_row("simple", 999) + assert row is None + + +# ------------------------------------------------------------------ +# Tests — Cas limites +# ------------------------------------------------------------------ + + +class TestEdgeCases: + """Tests de cas limites.""" + + def test_file_not_found(self, importer): + """Fichier inexistant → FileNotFoundError.""" + with pytest.raises(FileNotFoundError): + importer.import_file("/tmp/inexistant_xyz.xlsx") + + def test_empty_file(self, importer, tmp_dir): + """Fichier vide → ValueError.""" + path = tmp_dir / "empty.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + # Pas de données du tout — openpyxl crée une feuille vide + wb.save(str(path)) + + with pytest.raises(ValueError, match="vide|colonne"): + importer.import_file(str(path)) + + def test_duplicate_headers(self, importer, iterator, tmp_dir): + """Les colonnes dupliquées sont dédupliquées.""" + path = tmp_dir / "dupes.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["Nom", "Nom", "Valeur"]) + ws.append(["A", "B", 1]) + wb.save(str(path)) + + result = importer.import_file(str(path)) + assert result.column_count == 3 + # La deuxième colonne "Nom" est renommée "Nom_2" + assert "Nom_2" in result.columns + + def test_skip_empty_rows(self, importer, iterator, tmp_dir): + """Les lignes entièrement vides sont ignorées.""" + path = tmp_dir / "with_blanks.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["Col"]) + ws.append(["A"]) + ws.append([None]) + ws.append(["B"]) + wb.save(str(path)) + + result = importer.import_file(str(path)) + # "A" et "B" sont insérées, la ligne None est ignorée + assert result.row_count == 2 + assert result.skipped_rows == 1 + + def test_imported_at_column(self, importer, iterator, simple_excel): + """Chaque ligne a un timestamp d'import (_imported_at).""" + importer.import_file(simple_excel) + + rows = list(iterator.iterate("simple", limit=1)) + assert "_imported_at" in rows[0] + assert rows[0]["_imported_at"] is not None diff --git a/visual_workflow_builder/backend/api_v3/dag_execute.py b/visual_workflow_builder/backend/api_v3/dag_execute.py index cdac32ad3..b9854e74f 100644 --- a/visual_workflow_builder/backend/api_v3/dag_execute.py +++ b/visual_workflow_builder/backend/api_v3/dag_execute.py @@ -12,6 +12,7 @@ Auteur : Dom, Claude — 16 mars 2026 import json import logging +import re import sys import traceback from pathlib import Path @@ -37,6 +38,15 @@ from core.execution.dag_executor import ( ) from core.execution.llm_actions import LLMActionHandler +# Data loop — import Excel et itération sur tables +try: + from core.data.excel_importer import ExcelImporter + from core.data.db_iterator import DBIterator + _DATA_LOOP_AVAILABLE = True +except ImportError as _e: + logger.warning("Module core.data indisponible : %s", _e) + _DATA_LOOP_AVAILABLE = False + # --------------------------------------------------------------------------- # Types d'actions VWB → StepType du DAGExecutor # --------------------------------------------------------------------------- @@ -63,6 +73,9 @@ _WAIT_ACTION_TYPES = {"wait_for_anchor"} # Actions VWB de type condition _CONDITION_ACTION_TYPES = {"visual_condition"} +# Actions VWB de type data loop +_DATA_LOOP_ACTION_TYPES = {"import_excel", "db_foreach"} + def _classify_step_type(action_type: str) -> StepType: """Détermine le StepType DAG à partir du action_type VWB.""" @@ -72,6 +85,7 @@ def _classify_step_type(action_type: str) -> StepType: return StepType.WAIT if action_type in _CONDITION_ACTION_TYPES: return StepType.CONDITION + # import_excel et db_foreach sont des UI_ACTION traitées spécialement return StepType.UI_ACTION @@ -157,6 +171,270 @@ def _convert_vwb_to_dag_steps( return dag_steps +# --------------------------------------------------------------------------- +# Exécution des étapes Data Loop (import_excel, db_foreach) +# --------------------------------------------------------------------------- + +_CURRENT_ROW_PATTERN = re.compile(r"\$\{current_row\.(\w+)\}") + + +def _execute_import_excel(parameters: Dict[str, Any]) -> Dict[str, Any]: + """Exécute l'import d'un fichier Excel dans la base SQLite. + + Args: + parameters: Doit contenir 'file_path', optionnellement 'table_name', 'sheet_name' + + Returns: + Dict avec les infos d'import (table_name, row_count, columns) + """ + if not _DATA_LOOP_AVAILABLE: + raise RuntimeError( + "Module core.data non disponible. " + "Vérifiez que core/data/excel_importer.py existe." + ) + + file_path = parameters.get("file_path", "") + if not file_path: + raise ValueError("Paramètre 'file_path' requis pour import_excel") + + table_name = parameters.get("table_name") or None + sheet_name = parameters.get("sheet_name") or None + + # Convertir sheet_name en int si c'est un index numérique + if sheet_name and sheet_name.isdigit(): + sheet_name = int(sheet_name) + + importer = ExcelImporter() + result = importer.import_file( + file_path=file_path, + table_name=table_name, + sheet_name=sheet_name, + ) + + logger.info( + "Import Excel terminé : %s → table '%s' (%d lignes, colonnes: %s)", + file_path, + result.table_name, + result.row_count, + result.columns, + ) + + return { + "table_name": result.table_name, + "row_count": result.row_count, + "columns": result.columns, + "db_path": str(result.db_path) if hasattr(result, "db_path") else None, + } + + +def _inject_current_row( + action: Dict[str, Any], row: Dict[str, Any] +) -> Dict[str, Any]: + """Remplace les références ${current_row.column} dans les paramètres d'une action. + + Args: + action: Dict des paramètres de l'étape + row: Dict représentant la ligne courante (colonne → valeur) + + Returns: + Copie de l'action avec les références remplacées + """ + import copy + resolved = copy.deepcopy(action) + + def _resolve(obj: Any) -> Any: + if isinstance(obj, str): + # Cas spécial : la chaîne entière est une seule référence + m = _CURRENT_ROW_PATTERN.fullmatch(obj) + if m: + col = m.group(1) + return row.get(col, obj) + # Cas général : remplacement inline + def _replacer(match: re.Match) -> str: + col = match.group(1) + val = row.get(col) + return str(val) if val is not None else match.group(0) + return _CURRENT_ROW_PATTERN.sub(_replacer, obj) + elif isinstance(obj, dict): + return {k: _resolve(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [_resolve(item) for item in obj] + return obj + + return _resolve(resolved) + + +def _execute_db_foreach( + foreach_step_data: Dict[str, Any], + all_steps_data: List[Dict[str, Any]], + edges_data: List[Dict[str, Any]], + executor_kwargs: Dict[str, Any], +) -> Dict[str, Any]: + """Exécute une boucle db_foreach : lit les lignes puis ré-exécute les étapes dépendantes. + + Algorithme : + 1. Lire toutes les lignes de la table (avec filtres optionnels) + 2. Identifier les étapes qui dépendent du foreach (via les edges) + 3. Pour chaque ligne, injecter ${current_row.column} et exécuter ces étapes + + Args: + foreach_step_data: Les données de l'étape db_foreach + all_steps_data: Toutes les étapes du workflow + edges_data: Les edges du workflow + executor_kwargs: Paramètres pour le DAGExecutor (model, ollama_endpoint, timeout) + + Returns: + Dict avec le résumé de l'exécution de la boucle + """ + if not _DATA_LOOP_AVAILABLE: + raise RuntimeError( + "Module core.data non disponible. " + "Vérifiez que core/data/db_iterator.py existe." + ) + + parameters = foreach_step_data.get("parameters", {}) + table_name = parameters.get("table_name", "") + if not table_name: + raise ValueError("Paramètre 'table_name' requis pour db_foreach") + + where_clause = parameters.get("where_clause") or None + order_by = parameters.get("order_by") or None + limit = parameters.get("limit") + if limit is not None: + try: + limit = int(limit) + except (ValueError, TypeError): + limit = None + + # 1. Lire les lignes + iterator = DBIterator() + rows = list(iterator.iterate( + table_name=table_name, + where=where_clause, + order_by=order_by, + limit=limit, + )) + + if not rows: + logger.info("db_foreach : table '%s' vide, aucune itération", table_name) + return {"row_count": 0, "iterations": [], "table_name": table_name} + + logger.info( + "db_foreach : %d lignes à traiter dans '%s'", len(rows), table_name + ) + + # 2. Identifier les étapes dépendantes du foreach + foreach_id = foreach_step_data["id"] + dependent_ids = set() + for edge in edges_data: + if edge.get("source") == foreach_id: + dependent_ids.add(edge["target"]) + + # Aussi inclure les étapes qui dépendent des dépendants directs (chaîne) + changed = True + while changed: + changed = False + for edge in edges_data: + if edge.get("source") in dependent_ids and edge["target"] not in dependent_ids: + dependent_ids.add(edge["target"]) + changed = True + + if not dependent_ids: + logger.warning("db_foreach '%s' : aucune étape dépendante trouvée", foreach_id) + return {"row_count": len(rows), "iterations": [], "table_name": table_name} + + # Filtrer les étapes et edges pour ne garder que le sous-graphe + sub_steps = [s for s in all_steps_data if s["id"] in dependent_ids] + sub_edges = [ + e for e in edges_data + if e.get("source") in dependent_ids and e.get("target") in dependent_ids + ] + + # 3. Pour chaque ligne, injecter et exécuter + iteration_results = [] + model = executor_kwargs.get("model", "qwen3-vl:8b") + ollama_endpoint = executor_kwargs.get("ollama_endpoint", "http://localhost:11434") + timeout = executor_kwargs.get("timeout", 300) + + for row_idx, row in enumerate(rows): + logger.info( + "db_foreach iteration %d/%d : %s", + row_idx + 1, len(rows), {k: str(v)[:50] for k, v in row.items()} + ) + + # Injecter ${current_row.xxx} dans les paramètres de chaque étape + injected_steps = [] + for step_data in sub_steps: + injected = dict(step_data) + injected["parameters"] = _inject_current_row( + step_data.get("parameters", {}), row + ) + injected_steps.append(injected) + + # Reconstruire les edges internes (sans la dépendance vers foreach) + internal_edges = [] + for e in sub_edges: + internal_edges.append(e) + + # Pour les étapes qui dépendaient du foreach, supprimer cette dépendance + # (elles deviennent des racines du sous-DAG) + root_ids = set() + for edge in edges_data: + if edge.get("source") == foreach_id and edge["target"] in dependent_ids: + root_ids.add(edge["target"]) + + # Convertir en DAG steps + dag_steps = _convert_vwb_to_dag_steps(injected_steps, internal_edges) + + # Supprimer la dépendance vers foreach_id dans les étapes racines + for ds in dag_steps: + ds.depends_on = [d for d in ds.depends_on if d != foreach_id] + + # Vérifier s'il y a des étapes LLM + has_llm = any(s.step_type == StepType.LLM_CALL for s in dag_steps) + + llm_handler = None + if has_llm: + llm_handler = LLMActionHandler( + ollama_endpoint=ollama_endpoint, + model=model, + ) + + executor = DAGExecutor( + max_llm_workers=2, + max_ui_workers=1, + llm_handler=llm_handler, + ) + executor.load_workflow(dag_steps) + + result = executor.execute(timeout=timeout) + + iteration_results.append({ + "row_index": row_idx, + "row_data": row, + "success": result.success, + "results": result.results, + "errors": result.errors, + "duration": result.duration_seconds, + }) + + if not result.success: + logger.warning( + "db_foreach iteration %d échouée : %s", + row_idx, result.errors, + ) + + # Résumé + success_count = sum(1 for r in iteration_results if r["success"]) + return { + "table_name": table_name, + "row_count": len(rows), + "success_count": success_count, + "error_count": len(rows) - success_count, + "iterations": iteration_results, + } + + # --------------------------------------------------------------------------- # Instance globale du dernier exécuteur (pour le status polling) # --------------------------------------------------------------------------- @@ -232,8 +510,107 @@ def execute_dag(workflow_id: str): model = data.get("model", "qwen3-vl:8b") ollama_endpoint = data.get("ollama_endpoint", "http://localhost:11434") + executor_kwargs = { + "timeout": timeout, + "model": model, + "ollama_endpoint": ollama_endpoint, + } + + # --------------------------------------------------------------- + # Pré-traitement des étapes Data Loop (import_excel, db_foreach) + # --------------------------------------------------------------- + data_loop_results: Dict[str, Any] = {} + + # 1. Exécuter les imports Excel en premier (avant le DAG principal) + import_steps = [s for s in steps_data if s["action_type"] == "import_excel"] + for imp_step in import_steps: + try: + imp_result = _execute_import_excel(imp_step.get("parameters", {})) + data_loop_results[imp_step["id"]] = imp_result + logger.info( + "Import Excel '%s' terminé : %s", + imp_step["id"], imp_result, + ) + except Exception as exc: + logger.error("Import Excel '%s' échoué : %s", imp_step["id"], exc) + return jsonify({ + 'success': False, + 'error': f"Import Excel échoué : {exc}", + }), 500 + + # 2. Traiter les étapes db_foreach séparément + foreach_steps = [s for s in steps_data if s["action_type"] == "db_foreach"] + for fe_step in foreach_steps: + try: + fe_result = _execute_db_foreach( + fe_step, steps_data, edges_data, executor_kwargs, + ) + data_loop_results[fe_step["id"]] = fe_result + logger.info( + "db_foreach '%s' terminé : %d lignes, %d succès", + fe_step["id"], + fe_result.get("row_count", 0), + fe_result.get("success_count", 0), + ) + except Exception as exc: + logger.error("db_foreach '%s' échoué : %s", fe_step["id"], exc) + return jsonify({ + 'success': False, + 'error': f"Boucle données échouée : {exc}", + }), 500 + + # 3. Filtrer les étapes data loop + leurs dépendants (déjà exécutés) + # pour le DAG principal + foreach_ids = {s["id"] for s in foreach_steps} + already_executed_ids = set() + for fe_id in foreach_ids: + already_executed_ids.add(fe_id) + # Trouver récursivement tous les dépendants + changed = True + while changed: + changed = False + for edge in edges_data: + if (edge.get("source") in already_executed_ids + and edge["target"] not in already_executed_ids): + already_executed_ids.add(edge["target"]) + changed = True + + # Ajouter les import_excel aux déjà exécutés + for imp_step in import_steps: + already_executed_ids.add(imp_step["id"]) + + # Filtrer les étapes restantes pour le DAG principal + remaining_steps = [s for s in steps_data if s["id"] not in already_executed_ids] + remaining_edges = [ + e for e in edges_data + if e.get("source") not in already_executed_ids + and e.get("target") not in already_executed_ids + ] + + # Si toutes les étapes sont des data loop, retourner directement + if not remaining_steps: + return jsonify({ + 'success': True, + 'execution': { + "success": True, + "data_loop_results": data_loop_results, + "steps": {}, + "results": {}, + "errors": [], + "duration_seconds": 0, + }, + }) + + # --------------------------------------------------------------- + # Exécution DAG normale pour les étapes restantes + # --------------------------------------------------------------- + # Convertir en étapes DAG - dag_steps = _convert_vwb_to_dag_steps(steps_data, edges_data) + dag_steps = _convert_vwb_to_dag_steps(remaining_steps, remaining_edges) + + # Supprimer les dépendances vers des étapes data loop déjà exécutées + for ds in dag_steps: + ds.depends_on = [d for d in ds.depends_on if d not in already_executed_ids] # Vérifier s'il y a des étapes LLM has_llm_steps = any(s.step_type == StepType.LLM_CALL for s in dag_steps) @@ -253,6 +630,11 @@ def execute_dag(workflow_id: str): llm_handler=llm_handler, ) + # Injecter les résultats des étapes data loop dans l'exécuteur + # pour qu'ils soient disponibles via ${step_id.result} + for step_id, dl_result in data_loop_results.items(): + executor._results[step_id] = dl_result + # Charger le workflow dans le DAG executor.load_workflow(dag_steps) @@ -261,16 +643,22 @@ def execute_dag(workflow_id: str): _last_result = None logger.info( - "Lancement exécution DAG pour workflow '%s' : %d étapes (%d LLM)", + "Lancement exécution DAG pour workflow '%s' : %d étapes (%d LLM, %d data loop pré-traités)", workflow_id, len(dag_steps), sum(1 for s in dag_steps if s.step_type == StepType.LLM_CALL), + len(data_loop_results), ) # Exécuter (bloquant — le timeout protège) result = executor.execute(timeout=timeout) _last_result = result + # Fusionner les résultats data loop + result_dict = result.to_dict() + if data_loop_results: + result_dict["data_loop_results"] = data_loop_results + logger.info( "Exécution DAG terminée : success=%s, durée=%.2fs", result.success, @@ -279,7 +667,7 @@ def execute_dag(workflow_id: str): return jsonify({ 'success': True, - 'execution': result.to_dict(), + 'execution': result_dict, }) except ValueError as e: diff --git a/visual_workflow_builder/backend/contracts/action_contracts.py b/visual_workflow_builder/backend/contracts/action_contracts.py index 7b687a4b8..9c62d9f0f 100644 --- a/visual_workflow_builder/backend/contracts/action_contracts.py +++ b/visual_workflow_builder/backend/contracts/action_contracts.py @@ -315,6 +315,23 @@ VWB_ACTION_CONTRACTS: Dict[str, ActionContract] = { param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})} ), + # --- BOUCLE DONNÉES (Data Loop) --- + "import_excel": ActionContract( + action_type="import_excel", + description="Importer un fichier Excel dans la base SQLite", + required_params=["file_path"], + optional_params=["table_name", "sheet_name"], + param_validators={"file_path": lambda p: bool(p and isinstance(p, str) and p.strip())} + ), + + "db_foreach": ActionContract( + action_type="db_foreach", + description="Boucle sur chaque ligne d'une table et exécute les étapes dépendantes", + required_params=["table_name"], + optional_params=["where_clause", "order_by", "limit"], + param_validators={"table_name": lambda p: bool(p and isinstance(p, str) and p.strip())} + ), + # --- ACTIONS DAG LLM — Exécutées via le DAGExecutor --- "llm_analyze": ActionContract( action_type="llm_analyze", diff --git a/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx b/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx index 0e36d73f9..dac634ec4 100644 --- a/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx +++ b/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx @@ -1025,6 +1025,94 @@ export default function PropertiesPanel({ step, onUpdateParams, onDelete }: Prop ); + // === BOUCLE DONNÉES (Data Loop) === + case 'import_excel': + return ( + <> +
+ 📥 Import Excel +
+
+ + updateParam('file_path', e.target.value)} + placeholder="/chemin/vers/fichier.xlsx" + /> +
+
+ + updateParam('table_name', e.target.value)} + placeholder="Auto-detect depuis le nom du fichier" + /> + Si vide, le nom de la table sera derive du nom du fichier Excel +
+
+ + updateParam('sheet_name', e.target.value)} + placeholder="Premiere feuille par defaut" + /> + Nom ou index (0, 1, 2...) de la feuille a importer +
+ + ); + + case 'db_foreach': + return ( + <> +
+ 🔄 Boucle sur table +
+
+ + updateParam('table_name', e.target.value)} + placeholder="nom_table" + /> +
+
+ + updateParam('where_clause', e.target.value)} + placeholder="Ex: statut = 'actif' AND age > 18" + /> +
+
+ + updateParam('order_by', e.target.value)} + placeholder="Ex: nom ASC, date DESC" + /> +
+
+ + updateParam('limit', e.target.value ? Number(e.target.value) : null)} + min="1" + placeholder="Toutes les lignes" + /> +
+
+ Les colonnes sont accessibles via {'${current_row.nom_colonne}'} dans les etapes suivantes. +
+ + ); + // === VALIDATION === case 'verify_element_exists': return ( diff --git a/visual_workflow_builder/frontend_v4/src/components/StepNode.tsx b/visual_workflow_builder/frontend_v4/src/components/StepNode.tsx index e18827ec4..9e1e3ac12 100644 --- a/visual_workflow_builder/frontend_v4/src/components/StepNode.tsx +++ b/visual_workflow_builder/frontend_v4/src/components/StepNode.tsx @@ -13,9 +13,11 @@ function StepNode({ data, selected }: StepNodeProps) { const step = data.step; const action = ACTIONS.find(a => a.type === step.action_type); const isConditional = step.action_type === 'visual_condition' || step.action_type === 'loop_visual'; + const isDataLoop = step.action_type === 'db_foreach'; + const isImport = step.action_type === 'import_excel'; return ( -
+
{/* Entrée: haut */} )} + {/* Aperçu import Excel */} + {step.action_type === 'import_excel' && typeof step.parameters?.file_path === 'string' && step.parameters.file_path.length > 0 && ( +
+ {`📄 ${String(step.parameters.file_path).split('/').pop()?.split('\\').pop() || String(step.parameters.file_path)}`} + {step.parameters.table_name ? ` → ${String(step.parameters.table_name)}` : ''} +
+ )} + + {/* Aperçu boucle db_foreach */} + {step.action_type === 'db_foreach' && typeof step.parameters?.table_name === 'string' && step.parameters.table_name.length > 0 && ( +
+ {`🗃️ ${String(step.parameters.table_name)}`} + {step.parameters.limit ? ` (max ${String(step.parameters.limit)})` : ''} +
+ )} + {!step.anchor_id && action?.needsAnchor && (
Ancre requise diff --git a/visual_workflow_builder/frontend_v4/src/types.ts b/visual_workflow_builder/frontend_v4/src/types.ts index 30184f780..cbfbeb3b9 100644 --- a/visual_workflow_builder/frontend_v4/src/types.ts +++ b/visual_workflow_builder/frontend_v4/src/types.ts @@ -47,6 +47,8 @@ export type ActionType = | 'ai_custom' | 'db_save_data' | 'db_read_data' + | 'import_excel' + | 'db_foreach' | 'verify_element_exists' | 'verify_text_content' // === DAG LLM — étapes IA exécutées via le DAGExecutor === @@ -104,6 +106,10 @@ export const ACTIONS: ActionDefinition[] = [ { type: 'db_save_data', label: 'Sauvegarder en BDD', icon: '💿', category: 'data', needsAnchor: false, params: ['table', 'data'] }, { type: 'db_read_data', label: 'Lire depuis BDD', icon: '📖', category: 'data', needsAnchor: false, params: ['query', 'variable_name'] }, + // === BOUCLE DONNÉES (Data Loop) === + { type: 'import_excel', label: 'Importer Excel', icon: '📥', category: 'data', needsAnchor: false, params: ['file_path', 'table_name', 'sheet_name'] }, + { type: 'db_foreach', label: 'Pour chaque ligne', icon: '🔄', category: 'data', needsAnchor: false, params: ['table_name', 'where_clause', 'order_by', 'limit'] }, + // === DAG LLM — Actions IA via DAGExecutor (parallèle, Ollama) === { type: 'llm_analyze', label: 'Analyser texte', icon: '🔬', category: 'llm', needsAnchor: false, params: ['text', 'instruction', 'model'] }, { type: 'llm_translate', label: 'Traduire', icon: '🌐', category: 'llm', needsAnchor: false, params: ['text', 'target_lang', 'model'] },