Files
rpa_vision_v3/tests/unit/test_excel_importer.py
Dom 9da804bb6e feat: import Excel → SQLite + boucle données → UI dans le VWB
- ExcelImporter : import .xlsx → SQLite auto (détection types, batch insert)
- DBIterator : lecture ligne par ligne avec filtre/tri/limite
- VWB actions : "Importer Excel" + "Pour chaque ligne" dans la palette
- DAG executor : pré-exécution import, boucle foreach avec injection
  ${current_row.colonne} dans les étapes dépendantes
- 36 tests unitaires Excel/DB (tous passent)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 23:10:51 +01:00

521 lines
17 KiB
Python

"""
Tests unitaires pour ExcelImporter et DBIterator.
Crée des fichiers Excel temporaires via openpyxl et vérifie
l'import SQLite, la détection de types et l'itération.
"""
import os
import sqlite3
import tempfile
from datetime import datetime, date
from pathlib import Path
import openpyxl
import pytest
from core.data.excel_importer import ExcelImporter, ImportResult, PreviewResult
from core.data.db_iterator import DBIterator
# ------------------------------------------------------------------
# Fixtures
# ------------------------------------------------------------------
@pytest.fixture
def tmp_dir(tmp_path):
"""Dossier temporaire pour les fichiers de test."""
return tmp_path
@pytest.fixture
def db_path(tmp_dir):
"""Chemin de base SQLite temporaire."""
return str(tmp_dir / "test.db")
@pytest.fixture
def importer(db_path):
"""Instance ExcelImporter avec DB temporaire."""
return ExcelImporter(db_path=db_path)
@pytest.fixture
def iterator(db_path):
"""Instance DBIterator avec DB temporaire."""
return DBIterator(db_path=db_path)
@pytest.fixture
def simple_excel(tmp_dir):
"""Fichier Excel simple : 3 colonnes, 5 lignes."""
path = tmp_dir / "simple.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Données"
# Headers
ws.append(["Nom", "Age", "Ville"])
# Données
ws.append(["Alice", 30, "Paris"])
ws.append(["Bob", 25, "Lyon"])
ws.append(["Charlie", 35, "Marseille"])
ws.append(["Diana", 28, "Toulouse"])
ws.append(["Eve", 42, "Bordeaux"])
wb.save(str(path))
return str(path)
@pytest.fixture
def typed_excel(tmp_dir):
"""Fichier Excel avec différents types : texte, entier, décimal, date."""
path = tmp_dir / "typed.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Types"
ws.append(["Nom", "Quantite", "Prix", "Date_achat"])
ws.append(["Produit A", 10, 19.99, datetime(2026, 1, 15)])
ws.append(["Produit B", 5, 42.50, datetime(2026, 2, 20)])
ws.append(["Produit C", 100, 3.00, datetime(2026, 3, 1)])
ws.append(["Produit D", 1, 999.99, datetime(2026, 3, 10)])
wb.save(str(path))
return str(path)
@pytest.fixture
def multi_sheet_excel(tmp_dir):
"""Fichier Excel avec plusieurs feuilles."""
path = tmp_dir / "multi.xlsx"
wb = openpyxl.Workbook()
# Feuille 1
ws1 = wb.active
ws1.title = "Patients"
ws1.append(["NomPatient", "IPP"])
ws1.append(["Dupont", "12345"])
ws1.append(["Martin", "67890"])
# Feuille 2
ws2 = wb.create_sheet("Séjours")
ws2.append(["NumSejour", "DateEntree", "DateSortie"])
ws2.append(["S001", "2026-01-01", "2026-01-05"])
# Feuille 3
ws3 = wb.create_sheet("Diagnostics")
ws3.append(["Code", "Libelle"])
ws3.append(["A00", "Choléra"])
wb.save(str(path))
return str(path)
# ------------------------------------------------------------------
# Tests ExcelImporter — Import
# ------------------------------------------------------------------
class TestImportSimpleExcel:
"""Test import d'un fichier Excel simple (3 colonnes, 5 lignes)."""
def test_import_simple_excel(self, importer, simple_excel):
"""L'import crée la table et insère les 5 lignes."""
result = importer.import_file(simple_excel)
assert isinstance(result, ImportResult)
assert result.success
assert result.row_count == 5
assert result.column_count == 3
assert result.table_name == "simple"
assert result.sheet_name == "Données"
assert "Nom" in result.columns
assert "Age" in result.columns
assert "Ville" in result.columns
def test_import_with_custom_table_name(self, importer, simple_excel):
"""L'import utilise le nom de table personnalisé."""
result = importer.import_file(simple_excel, table_name="mes_patients")
assert result.table_name == "mes_patients"
assert result.row_count == 5
def test_import_creates_db_file(self, importer, simple_excel, db_path):
"""L'import crée le fichier SQLite."""
importer.import_file(simple_excel)
assert Path(db_path).exists()
def test_import_data_readable(self, importer, simple_excel, db_path):
"""Les données importées sont lisibles en SQL."""
importer.import_file(simple_excel)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute('SELECT * FROM "simple" ORDER BY _rowid').fetchall()
conn.close()
assert len(rows) == 5
assert rows[0]["Nom"] == "Alice"
assert rows[0]["Age"] == 30
assert rows[0]["Ville"] == "Paris"
assert rows[4]["Nom"] == "Eve"
# ------------------------------------------------------------------
# Tests ExcelImporter — Détection de types
# ------------------------------------------------------------------
class TestAutoDetectTypes:
"""Test la détection automatique des types de colonnes."""
def test_auto_detect_types(self, importer, typed_excel):
"""Détecte correctement texte, entier, décimal et date."""
result = importer.import_file(typed_excel)
assert result.columns["Nom"] == "TEXT"
assert result.columns["Quantite"] == "INTEGER"
assert result.columns["Prix"] == "REAL"
# Les dates datetime sont converties en ISO string → TEXT
assert result.columns["Date_achat"] == "TEXT"
def test_detect_types_with_mixed_numbers(self, importer, tmp_dir):
"""Si une colonne mélange int et float, le type est REAL."""
path = tmp_dir / "mixed.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.append(["Valeur"])
ws.append([10])
ws.append([20.5])
ws.append([30])
wb.save(str(path))
result = importer.import_file(str(path))
assert result.columns["Valeur"] == "REAL"
def test_detect_types_all_none(self, importer, tmp_dir):
"""Colonne entièrement vide → TEXT par défaut."""
path = tmp_dir / "nulls.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.append(["Vide", "Plein"])
ws.append([None, "A"])
ws.append([None, "B"])
wb.save(str(path))
result = importer.import_file(str(path))
assert result.columns["Vide"] == "TEXT"
assert result.columns["Plein"] == "TEXT"
# ------------------------------------------------------------------
# Tests ExcelImporter — Preview
# ------------------------------------------------------------------
class TestPreview:
"""Test l'aperçu avant import."""
def test_preview(self, importer, simple_excel):
"""L'aperçu retourne les headers et quelques lignes."""
preview = importer.preview(simple_excel, max_rows=3)
assert isinstance(preview, PreviewResult)
assert preview.headers == ["Nom", "Age", "Ville"]
assert len(preview.rows) == 3
assert preview.total_rows == 5
assert preview.sheet_name == "Données"
assert "Nom" in preview.detected_types
def test_preview_max_rows(self, importer, simple_excel):
"""L'aperçu respecte la limite max_rows."""
preview = importer.preview(simple_excel, max_rows=2)
assert len(preview.rows) == 2
def test_preview_returns_detected_types(self, importer, typed_excel):
"""L'aperçu inclut les types détectés."""
preview = importer.preview(typed_excel)
assert preview.detected_types["Quantite"] == "INTEGER"
assert preview.detected_types["Prix"] == "REAL"
# ------------------------------------------------------------------
# Tests ExcelImporter — Feuilles multiples
# ------------------------------------------------------------------
class TestListSheets:
"""Test la gestion des feuilles multiples."""
def test_list_sheets(self, importer, multi_sheet_excel):
"""Liste correctement les feuilles."""
sheets = importer.list_sheets(multi_sheet_excel)
assert len(sheets) == 3
assert "Patients" in sheets
assert "Séjours" in sheets
assert "Diagnostics" in sheets
def test_import_specific_sheet(self, importer, multi_sheet_excel):
"""Import d'une feuille spécifique."""
result = importer.import_file(
multi_sheet_excel, sheet_name="Séjours", table_name="sejours"
)
assert result.sheet_name == "Séjours"
assert result.row_count == 1
assert "NumSejour" in result.columns
def test_import_nonexistent_sheet_raises(self, importer, multi_sheet_excel):
"""Feuille inexistante → ValueError."""
with pytest.raises(ValueError, match="introuvable"):
importer.import_file(multi_sheet_excel, sheet_name="Inexistante")
# ------------------------------------------------------------------
# Tests ExcelImporter — Sanitize
# ------------------------------------------------------------------
class TestSanitizeTableName:
"""Test le nettoyage des noms de tables."""
def test_sanitize_table_name(self, importer):
"""Les caractères spéciaux sont remplacés par des underscores."""
assert importer._sanitize_table_name("Mon Fichier (2)") == "mon_fichier_2"
def test_sanitize_numeric_prefix(self, importer):
"""Un nom commençant par un chiffre reçoit un préfixe."""
assert importer._sanitize_table_name("2026_data") == "t_2026_data"
def test_sanitize_empty(self, importer):
"""Un nom vide donne 'import_data'."""
assert importer._sanitize_table_name("") == "import_data"
def test_sanitize_accents(self, importer):
"""Les accents sont conservés (chars alphanumériques en Python)."""
result = importer._sanitize_table_name("données_été")
assert "donn" in result # Le mot est conservé
def test_sanitize_special_chars(self, importer):
"""Les tirets, points, parenthèses sont nettoyés."""
result = importer._sanitize_table_name("fichier-test.v2 (copie)")
assert "_" in result
assert "(" not in result
assert ")" not in result
assert "." not in result
# ------------------------------------------------------------------
# Tests DBIterator — Itération
# ------------------------------------------------------------------
class TestIterateRows:
"""Test l'itération sur les lignes."""
def test_iterate_rows(self, importer, iterator, simple_excel):
"""L'itération retourne toutes les lignes en dicts."""
importer.import_file(simple_excel)
rows = list(iterator.iterate("simple"))
assert len(rows) == 5
assert rows[0]["Nom"] == "Alice"
assert rows[0]["Age"] == 30
assert rows[4]["Nom"] == "Eve"
def test_iterate_with_limit(self, importer, iterator, simple_excel):
"""L'itération respecte la limite."""
importer.import_file(simple_excel)
rows = list(iterator.iterate("simple", limit=2))
assert len(rows) == 2
def test_iterate_with_order(self, importer, iterator, simple_excel):
"""L'itération peut être ordonnée."""
importer.import_file(simple_excel)
rows = list(iterator.iterate("simple", order_by="Age DESC"))
assert rows[0]["Nom"] == "Eve" # 42 ans, le plus âgé
assert rows[-1]["Nom"] == "Bob" # 25 ans, le plus jeune
class TestIterateWithWhere:
"""Test l'itération avec clause WHERE."""
def test_iterate_with_where(self, importer, iterator, simple_excel):
"""Le filtre WHERE fonctionne."""
importer.import_file(simple_excel)
rows = list(iterator.iterate("simple", where="Age > 30"))
assert len(rows) == 2 # Charlie (35) et Eve (42)
noms = {r["Nom"] for r in rows}
assert "Charlie" in noms
assert "Eve" in noms
def test_iterate_with_where_text(self, importer, iterator, simple_excel):
"""Le filtre WHERE fonctionne sur les colonnes texte."""
importer.import_file(simple_excel)
rows = list(iterator.iterate("simple", where="Ville = 'Paris'"))
assert len(rows) == 1
assert rows[0]["Nom"] == "Alice"
# ------------------------------------------------------------------
# Tests DBIterator — Comptage
# ------------------------------------------------------------------
class TestCount:
"""Test le comptage de lignes."""
def test_count(self, importer, iterator, simple_excel):
"""Compte toutes les lignes."""
importer.import_file(simple_excel)
assert iterator.count("simple") == 5
def test_count_with_where(self, importer, iterator, simple_excel):
"""Compte avec filtre."""
importer.import_file(simple_excel)
assert iterator.count("simple", where="Age >= 30") == 3 # Alice, Charlie, Eve
# ------------------------------------------------------------------
# Tests DBIterator — Métadonnées
# ------------------------------------------------------------------
class TestListTables:
"""Test la liste des tables."""
def test_list_tables(self, importer, iterator, simple_excel, typed_excel):
"""Liste les tables créées par les imports."""
importer.import_file(simple_excel, table_name="table_a")
importer.import_file(typed_excel, table_name="table_b")
tables = iterator.list_tables()
assert "table_a" in tables
assert "table_b" in tables
def test_list_tables_empty_db(self, tmp_dir):
"""DB vide → liste vide."""
db_path = str(tmp_dir / "empty.db")
# Créer une DB vide
conn = sqlite3.connect(db_path)
conn.close()
it = DBIterator(db_path=db_path)
assert it.list_tables() == []
def test_list_tables_no_db(self, tmp_dir):
"""DB inexistante → liste vide."""
it = DBIterator(db_path=str(tmp_dir / "nope.db"))
assert it.list_tables() == []
class TestGetColumns:
"""Test la récupération des métadonnées de colonnes."""
def test_get_columns(self, importer, iterator, simple_excel):
"""Retourne les colonnes avec leurs types."""
importer.import_file(simple_excel)
columns = iterator.get_columns("simple")
col_names = [c["name"] for c in columns]
assert "_rowid" in col_names
assert "Nom" in col_names
assert "Age" in col_names
assert "Ville" in col_names
# Vérifier les types
col_map = {c["name"]: c["type"] for c in columns}
assert col_map["Nom"] == "TEXT"
assert col_map["Age"] == "INTEGER"
class TestGetRow:
"""Test la récupération d'une ligne par ID."""
def test_get_row(self, importer, iterator, simple_excel):
"""Récupère une ligne par son _rowid."""
importer.import_file(simple_excel)
row = iterator.get_row("simple", 1)
assert row is not None
assert row["Nom"] == "Alice"
def test_get_row_not_found(self, importer, iterator, simple_excel):
"""Ligne inexistante → None."""
importer.import_file(simple_excel)
row = iterator.get_row("simple", 999)
assert row is None
# ------------------------------------------------------------------
# Tests — Cas limites
# ------------------------------------------------------------------
class TestEdgeCases:
"""Tests de cas limites."""
def test_file_not_found(self, importer):
"""Fichier inexistant → FileNotFoundError."""
with pytest.raises(FileNotFoundError):
importer.import_file("/tmp/inexistant_xyz.xlsx")
def test_empty_file(self, importer, tmp_dir):
"""Fichier vide → ValueError."""
path = tmp_dir / "empty.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
# Pas de données du tout — openpyxl crée une feuille vide
wb.save(str(path))
with pytest.raises(ValueError, match="vide|colonne"):
importer.import_file(str(path))
def test_duplicate_headers(self, importer, iterator, tmp_dir):
"""Les colonnes dupliquées sont dédupliquées."""
path = tmp_dir / "dupes.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.append(["Nom", "Nom", "Valeur"])
ws.append(["A", "B", 1])
wb.save(str(path))
result = importer.import_file(str(path))
assert result.column_count == 3
# La deuxième colonne "Nom" est renommée "Nom_2"
assert "Nom_2" in result.columns
def test_skip_empty_rows(self, importer, iterator, tmp_dir):
"""Les lignes entièrement vides sont ignorées."""
path = tmp_dir / "with_blanks.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.append(["Col"])
ws.append(["A"])
ws.append([None])
ws.append(["B"])
wb.save(str(path))
result = importer.import_file(str(path))
# "A" et "B" sont insérées, la ligne None est ignorée
assert result.row_count == 2
assert result.skipped_rows == 1
def test_imported_at_column(self, importer, iterator, simple_excel):
"""Chaque ligne a un timestamp d'import (_imported_at)."""
importer.import_file(simple_excel)
rows = list(iterator.iterate("simple", limit=1))
assert "_imported_at" in rows[0]
assert rows[0]["_imported_at"] is not None