- ExcelImporter : import .xlsx → SQLite auto (détection types, batch insert)
- DBIterator : lecture ligne par ligne avec filtre/tri/limite
- VWB actions : "Importer Excel" + "Pour chaque ligne" dans la palette
- DAG executor : pré-exécution import, boucle foreach avec injection
${current_row.colonne} dans les étapes dépendantes
- 36 tests unitaires Excel/DB (tous passent)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
521 lines
17 KiB
Python
521 lines
17 KiB
Python
"""
|
|
Tests unitaires pour ExcelImporter et DBIterator.
|
|
|
|
Crée des fichiers Excel temporaires via openpyxl et vérifie
|
|
l'import SQLite, la détection de types et l'itération.
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import tempfile
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
|
|
import openpyxl
|
|
import pytest
|
|
|
|
from core.data.excel_importer import ExcelImporter, ImportResult, PreviewResult
|
|
from core.data.db_iterator import DBIterator
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Fixtures
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_dir(tmp_path):
|
|
"""Dossier temporaire pour les fichiers de test."""
|
|
return tmp_path
|
|
|
|
|
|
@pytest.fixture
|
|
def db_path(tmp_dir):
|
|
"""Chemin de base SQLite temporaire."""
|
|
return str(tmp_dir / "test.db")
|
|
|
|
|
|
@pytest.fixture
|
|
def importer(db_path):
|
|
"""Instance ExcelImporter avec DB temporaire."""
|
|
return ExcelImporter(db_path=db_path)
|
|
|
|
|
|
@pytest.fixture
|
|
def iterator(db_path):
|
|
"""Instance DBIterator avec DB temporaire."""
|
|
return DBIterator(db_path=db_path)
|
|
|
|
|
|
@pytest.fixture
|
|
def simple_excel(tmp_dir):
|
|
"""Fichier Excel simple : 3 colonnes, 5 lignes."""
|
|
path = tmp_dir / "simple.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Données"
|
|
|
|
# Headers
|
|
ws.append(["Nom", "Age", "Ville"])
|
|
|
|
# Données
|
|
ws.append(["Alice", 30, "Paris"])
|
|
ws.append(["Bob", 25, "Lyon"])
|
|
ws.append(["Charlie", 35, "Marseille"])
|
|
ws.append(["Diana", 28, "Toulouse"])
|
|
ws.append(["Eve", 42, "Bordeaux"])
|
|
|
|
wb.save(str(path))
|
|
return str(path)
|
|
|
|
|
|
@pytest.fixture
|
|
def typed_excel(tmp_dir):
|
|
"""Fichier Excel avec différents types : texte, entier, décimal, date."""
|
|
path = tmp_dir / "typed.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Types"
|
|
|
|
ws.append(["Nom", "Quantite", "Prix", "Date_achat"])
|
|
ws.append(["Produit A", 10, 19.99, datetime(2026, 1, 15)])
|
|
ws.append(["Produit B", 5, 42.50, datetime(2026, 2, 20)])
|
|
ws.append(["Produit C", 100, 3.00, datetime(2026, 3, 1)])
|
|
ws.append(["Produit D", 1, 999.99, datetime(2026, 3, 10)])
|
|
|
|
wb.save(str(path))
|
|
return str(path)
|
|
|
|
|
|
@pytest.fixture
|
|
def multi_sheet_excel(tmp_dir):
|
|
"""Fichier Excel avec plusieurs feuilles."""
|
|
path = tmp_dir / "multi.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
|
|
# Feuille 1
|
|
ws1 = wb.active
|
|
ws1.title = "Patients"
|
|
ws1.append(["NomPatient", "IPP"])
|
|
ws1.append(["Dupont", "12345"])
|
|
ws1.append(["Martin", "67890"])
|
|
|
|
# Feuille 2
|
|
ws2 = wb.create_sheet("Séjours")
|
|
ws2.append(["NumSejour", "DateEntree", "DateSortie"])
|
|
ws2.append(["S001", "2026-01-01", "2026-01-05"])
|
|
|
|
# Feuille 3
|
|
ws3 = wb.create_sheet("Diagnostics")
|
|
ws3.append(["Code", "Libelle"])
|
|
ws3.append(["A00", "Choléra"])
|
|
|
|
wb.save(str(path))
|
|
return str(path)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests ExcelImporter — Import
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestImportSimpleExcel:
|
|
"""Test import d'un fichier Excel simple (3 colonnes, 5 lignes)."""
|
|
|
|
def test_import_simple_excel(self, importer, simple_excel):
|
|
"""L'import crée la table et insère les 5 lignes."""
|
|
result = importer.import_file(simple_excel)
|
|
|
|
assert isinstance(result, ImportResult)
|
|
assert result.success
|
|
assert result.row_count == 5
|
|
assert result.column_count == 3
|
|
assert result.table_name == "simple"
|
|
assert result.sheet_name == "Données"
|
|
assert "Nom" in result.columns
|
|
assert "Age" in result.columns
|
|
assert "Ville" in result.columns
|
|
|
|
def test_import_with_custom_table_name(self, importer, simple_excel):
|
|
"""L'import utilise le nom de table personnalisé."""
|
|
result = importer.import_file(simple_excel, table_name="mes_patients")
|
|
|
|
assert result.table_name == "mes_patients"
|
|
assert result.row_count == 5
|
|
|
|
def test_import_creates_db_file(self, importer, simple_excel, db_path):
|
|
"""L'import crée le fichier SQLite."""
|
|
importer.import_file(simple_excel)
|
|
assert Path(db_path).exists()
|
|
|
|
def test_import_data_readable(self, importer, simple_excel, db_path):
|
|
"""Les données importées sont lisibles en SQL."""
|
|
importer.import_file(simple_excel)
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute('SELECT * FROM "simple" ORDER BY _rowid').fetchall()
|
|
conn.close()
|
|
|
|
assert len(rows) == 5
|
|
assert rows[0]["Nom"] == "Alice"
|
|
assert rows[0]["Age"] == 30
|
|
assert rows[0]["Ville"] == "Paris"
|
|
assert rows[4]["Nom"] == "Eve"
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests ExcelImporter — Détection de types
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestAutoDetectTypes:
|
|
"""Test la détection automatique des types de colonnes."""
|
|
|
|
def test_auto_detect_types(self, importer, typed_excel):
|
|
"""Détecte correctement texte, entier, décimal et date."""
|
|
result = importer.import_file(typed_excel)
|
|
|
|
assert result.columns["Nom"] == "TEXT"
|
|
assert result.columns["Quantite"] == "INTEGER"
|
|
assert result.columns["Prix"] == "REAL"
|
|
# Les dates datetime sont converties en ISO string → TEXT
|
|
assert result.columns["Date_achat"] == "TEXT"
|
|
|
|
def test_detect_types_with_mixed_numbers(self, importer, tmp_dir):
|
|
"""Si une colonne mélange int et float, le type est REAL."""
|
|
path = tmp_dir / "mixed.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.append(["Valeur"])
|
|
ws.append([10])
|
|
ws.append([20.5])
|
|
ws.append([30])
|
|
wb.save(str(path))
|
|
|
|
result = importer.import_file(str(path))
|
|
assert result.columns["Valeur"] == "REAL"
|
|
|
|
def test_detect_types_all_none(self, importer, tmp_dir):
|
|
"""Colonne entièrement vide → TEXT par défaut."""
|
|
path = tmp_dir / "nulls.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.append(["Vide", "Plein"])
|
|
ws.append([None, "A"])
|
|
ws.append([None, "B"])
|
|
wb.save(str(path))
|
|
|
|
result = importer.import_file(str(path))
|
|
assert result.columns["Vide"] == "TEXT"
|
|
assert result.columns["Plein"] == "TEXT"
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests ExcelImporter — Preview
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestPreview:
|
|
"""Test l'aperçu avant import."""
|
|
|
|
def test_preview(self, importer, simple_excel):
|
|
"""L'aperçu retourne les headers et quelques lignes."""
|
|
preview = importer.preview(simple_excel, max_rows=3)
|
|
|
|
assert isinstance(preview, PreviewResult)
|
|
assert preview.headers == ["Nom", "Age", "Ville"]
|
|
assert len(preview.rows) == 3
|
|
assert preview.total_rows == 5
|
|
assert preview.sheet_name == "Données"
|
|
assert "Nom" in preview.detected_types
|
|
|
|
def test_preview_max_rows(self, importer, simple_excel):
|
|
"""L'aperçu respecte la limite max_rows."""
|
|
preview = importer.preview(simple_excel, max_rows=2)
|
|
assert len(preview.rows) == 2
|
|
|
|
def test_preview_returns_detected_types(self, importer, typed_excel):
|
|
"""L'aperçu inclut les types détectés."""
|
|
preview = importer.preview(typed_excel)
|
|
assert preview.detected_types["Quantite"] == "INTEGER"
|
|
assert preview.detected_types["Prix"] == "REAL"
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests ExcelImporter — Feuilles multiples
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestListSheets:
|
|
"""Test la gestion des feuilles multiples."""
|
|
|
|
def test_list_sheets(self, importer, multi_sheet_excel):
|
|
"""Liste correctement les feuilles."""
|
|
sheets = importer.list_sheets(multi_sheet_excel)
|
|
|
|
assert len(sheets) == 3
|
|
assert "Patients" in sheets
|
|
assert "Séjours" in sheets
|
|
assert "Diagnostics" in sheets
|
|
|
|
def test_import_specific_sheet(self, importer, multi_sheet_excel):
|
|
"""Import d'une feuille spécifique."""
|
|
result = importer.import_file(
|
|
multi_sheet_excel, sheet_name="Séjours", table_name="sejours"
|
|
)
|
|
|
|
assert result.sheet_name == "Séjours"
|
|
assert result.row_count == 1
|
|
assert "NumSejour" in result.columns
|
|
|
|
def test_import_nonexistent_sheet_raises(self, importer, multi_sheet_excel):
|
|
"""Feuille inexistante → ValueError."""
|
|
with pytest.raises(ValueError, match="introuvable"):
|
|
importer.import_file(multi_sheet_excel, sheet_name="Inexistante")
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests ExcelImporter — Sanitize
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestSanitizeTableName:
|
|
"""Test le nettoyage des noms de tables."""
|
|
|
|
def test_sanitize_table_name(self, importer):
|
|
"""Les caractères spéciaux sont remplacés par des underscores."""
|
|
assert importer._sanitize_table_name("Mon Fichier (2)") == "mon_fichier_2"
|
|
|
|
def test_sanitize_numeric_prefix(self, importer):
|
|
"""Un nom commençant par un chiffre reçoit un préfixe."""
|
|
assert importer._sanitize_table_name("2026_data") == "t_2026_data"
|
|
|
|
def test_sanitize_empty(self, importer):
|
|
"""Un nom vide donne 'import_data'."""
|
|
assert importer._sanitize_table_name("") == "import_data"
|
|
|
|
def test_sanitize_accents(self, importer):
|
|
"""Les accents sont conservés (chars alphanumériques en Python)."""
|
|
result = importer._sanitize_table_name("données_été")
|
|
assert "donn" in result # Le mot est conservé
|
|
|
|
def test_sanitize_special_chars(self, importer):
|
|
"""Les tirets, points, parenthèses sont nettoyés."""
|
|
result = importer._sanitize_table_name("fichier-test.v2 (copie)")
|
|
assert "_" in result
|
|
assert "(" not in result
|
|
assert ")" not in result
|
|
assert "." not in result
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests DBIterator — Itération
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestIterateRows:
|
|
"""Test l'itération sur les lignes."""
|
|
|
|
def test_iterate_rows(self, importer, iterator, simple_excel):
|
|
"""L'itération retourne toutes les lignes en dicts."""
|
|
importer.import_file(simple_excel)
|
|
|
|
rows = list(iterator.iterate("simple"))
|
|
assert len(rows) == 5
|
|
assert rows[0]["Nom"] == "Alice"
|
|
assert rows[0]["Age"] == 30
|
|
assert rows[4]["Nom"] == "Eve"
|
|
|
|
def test_iterate_with_limit(self, importer, iterator, simple_excel):
|
|
"""L'itération respecte la limite."""
|
|
importer.import_file(simple_excel)
|
|
|
|
rows = list(iterator.iterate("simple", limit=2))
|
|
assert len(rows) == 2
|
|
|
|
def test_iterate_with_order(self, importer, iterator, simple_excel):
|
|
"""L'itération peut être ordonnée."""
|
|
importer.import_file(simple_excel)
|
|
|
|
rows = list(iterator.iterate("simple", order_by="Age DESC"))
|
|
assert rows[0]["Nom"] == "Eve" # 42 ans, le plus âgé
|
|
assert rows[-1]["Nom"] == "Bob" # 25 ans, le plus jeune
|
|
|
|
|
|
class TestIterateWithWhere:
|
|
"""Test l'itération avec clause WHERE."""
|
|
|
|
def test_iterate_with_where(self, importer, iterator, simple_excel):
|
|
"""Le filtre WHERE fonctionne."""
|
|
importer.import_file(simple_excel)
|
|
|
|
rows = list(iterator.iterate("simple", where="Age > 30"))
|
|
assert len(rows) == 2 # Charlie (35) et Eve (42)
|
|
noms = {r["Nom"] for r in rows}
|
|
assert "Charlie" in noms
|
|
assert "Eve" in noms
|
|
|
|
def test_iterate_with_where_text(self, importer, iterator, simple_excel):
|
|
"""Le filtre WHERE fonctionne sur les colonnes texte."""
|
|
importer.import_file(simple_excel)
|
|
|
|
rows = list(iterator.iterate("simple", where="Ville = 'Paris'"))
|
|
assert len(rows) == 1
|
|
assert rows[0]["Nom"] == "Alice"
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests DBIterator — Comptage
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestCount:
|
|
"""Test le comptage de lignes."""
|
|
|
|
def test_count(self, importer, iterator, simple_excel):
|
|
"""Compte toutes les lignes."""
|
|
importer.import_file(simple_excel)
|
|
assert iterator.count("simple") == 5
|
|
|
|
def test_count_with_where(self, importer, iterator, simple_excel):
|
|
"""Compte avec filtre."""
|
|
importer.import_file(simple_excel)
|
|
assert iterator.count("simple", where="Age >= 30") == 3 # Alice, Charlie, Eve
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests DBIterator — Métadonnées
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestListTables:
|
|
"""Test la liste des tables."""
|
|
|
|
def test_list_tables(self, importer, iterator, simple_excel, typed_excel):
|
|
"""Liste les tables créées par les imports."""
|
|
importer.import_file(simple_excel, table_name="table_a")
|
|
importer.import_file(typed_excel, table_name="table_b")
|
|
|
|
tables = iterator.list_tables()
|
|
assert "table_a" in tables
|
|
assert "table_b" in tables
|
|
|
|
def test_list_tables_empty_db(self, tmp_dir):
|
|
"""DB vide → liste vide."""
|
|
db_path = str(tmp_dir / "empty.db")
|
|
# Créer une DB vide
|
|
conn = sqlite3.connect(db_path)
|
|
conn.close()
|
|
|
|
it = DBIterator(db_path=db_path)
|
|
assert it.list_tables() == []
|
|
|
|
def test_list_tables_no_db(self, tmp_dir):
|
|
"""DB inexistante → liste vide."""
|
|
it = DBIterator(db_path=str(tmp_dir / "nope.db"))
|
|
assert it.list_tables() == []
|
|
|
|
|
|
class TestGetColumns:
|
|
"""Test la récupération des métadonnées de colonnes."""
|
|
|
|
def test_get_columns(self, importer, iterator, simple_excel):
|
|
"""Retourne les colonnes avec leurs types."""
|
|
importer.import_file(simple_excel)
|
|
|
|
columns = iterator.get_columns("simple")
|
|
col_names = [c["name"] for c in columns]
|
|
|
|
assert "_rowid" in col_names
|
|
assert "Nom" in col_names
|
|
assert "Age" in col_names
|
|
assert "Ville" in col_names
|
|
|
|
# Vérifier les types
|
|
col_map = {c["name"]: c["type"] for c in columns}
|
|
assert col_map["Nom"] == "TEXT"
|
|
assert col_map["Age"] == "INTEGER"
|
|
|
|
|
|
class TestGetRow:
|
|
"""Test la récupération d'une ligne par ID."""
|
|
|
|
def test_get_row(self, importer, iterator, simple_excel):
|
|
"""Récupère une ligne par son _rowid."""
|
|
importer.import_file(simple_excel)
|
|
|
|
row = iterator.get_row("simple", 1)
|
|
assert row is not None
|
|
assert row["Nom"] == "Alice"
|
|
|
|
def test_get_row_not_found(self, importer, iterator, simple_excel):
|
|
"""Ligne inexistante → None."""
|
|
importer.import_file(simple_excel)
|
|
|
|
row = iterator.get_row("simple", 999)
|
|
assert row is None
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tests — Cas limites
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Tests de cas limites."""
|
|
|
|
def test_file_not_found(self, importer):
|
|
"""Fichier inexistant → FileNotFoundError."""
|
|
with pytest.raises(FileNotFoundError):
|
|
importer.import_file("/tmp/inexistant_xyz.xlsx")
|
|
|
|
def test_empty_file(self, importer, tmp_dir):
|
|
"""Fichier vide → ValueError."""
|
|
path = tmp_dir / "empty.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
# Pas de données du tout — openpyxl crée une feuille vide
|
|
wb.save(str(path))
|
|
|
|
with pytest.raises(ValueError, match="vide|colonne"):
|
|
importer.import_file(str(path))
|
|
|
|
def test_duplicate_headers(self, importer, iterator, tmp_dir):
|
|
"""Les colonnes dupliquées sont dédupliquées."""
|
|
path = tmp_dir / "dupes.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.append(["Nom", "Nom", "Valeur"])
|
|
ws.append(["A", "B", 1])
|
|
wb.save(str(path))
|
|
|
|
result = importer.import_file(str(path))
|
|
assert result.column_count == 3
|
|
# La deuxième colonne "Nom" est renommée "Nom_2"
|
|
assert "Nom_2" in result.columns
|
|
|
|
def test_skip_empty_rows(self, importer, iterator, tmp_dir):
|
|
"""Les lignes entièrement vides sont ignorées."""
|
|
path = tmp_dir / "with_blanks.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.append(["Col"])
|
|
ws.append(["A"])
|
|
ws.append([None])
|
|
ws.append(["B"])
|
|
wb.save(str(path))
|
|
|
|
result = importer.import_file(str(path))
|
|
# "A" et "B" sont insérées, la ligne None est ignorée
|
|
assert result.row_count == 2
|
|
assert result.skipped_rows == 1
|
|
|
|
def test_imported_at_column(self, importer, iterator, simple_excel):
|
|
"""Chaque ligne a un timestamp d'import (_imported_at)."""
|
|
importer.import_file(simple_excel)
|
|
|
|
rows = list(iterator.iterate("simple", limit=1))
|
|
assert "_imported_at" in rows[0]
|
|
assert rows[0]["_imported_at"] is not None
|