Feat: Actions database sauvegarder/charger_donnees

Module complet de persistance SQLite pour VWB:

GestionnaireDB:
- Interface clé-valeur avec typage auto (string, number, bool, json)
- Collections pour données structurées avec historique
- Requêtes SQL personnalisées (SELECT/modifications)
- Thread-safe, singleton par chemin de DB
- Statistiques et nettoyage

Actions:
- sauvegarder_donnees: 3 modes (cle_valeur, collection, sql)
- charger_donnees: 4 modes (cle_valeur, collection, sql, lister)

Base par défaut: ~/.vwb/data.db

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-14 23:30:31 +01:00
parent f62e8e7852
commit 473ca84be5
4 changed files with 1048 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
"""
Actions Database VWB - Module d'initialisation
Auteur : Dom, Claude - 14 janvier 2026
Ce module contient les actions de persistance de données
pour le Visual Workflow Builder.
Actions disponibles :
- VWBSauvegarderDonneesAction : Sauvegarde de données (clé-valeur, collection, SQL)
- VWBChargerDonneesAction : Chargement de données
Utilitaires :
- GestionnaireDB : Gestionnaire SQLite thread-safe
"""
from .gestionnaire_db import GestionnaireDB
from .sauvegarder_donnees import VWBSauvegarderDonneesAction, VWBDBSaveDataAction
from .charger_donnees import VWBChargerDonneesAction, VWBDBReadDataAction
__all__ = [
# Gestionnaire
'GestionnaireDB',
# Actions françaises
'VWBSauvegarderDonneesAction',
'VWBChargerDonneesAction',
# Alias anglais
'VWBDBSaveDataAction',
'VWBDBReadDataAction',
]
__version__ = '1.0.0'
__author__ = 'Dom, Claude'
__date__ = '14 janvier 2026'

View File

@@ -0,0 +1,276 @@
"""
Action Charger Données - Récupération des données persistantes
Auteur : Dom, Claude - 14 janvier 2026
Cette action permet de charger des données sauvegardées précédemment
pour les utiliser dans le workflow courant.
Cas d'usage :
- Récupérer des résultats d'exécutions précédentes
- Charger une configuration
- Comparer avec des données historiques
- Reprendre un workflow interrompu
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
from ...contracts.error import VWBErrorType, create_vwb_error
from .gestionnaire_db import GestionnaireDB
class VWBChargerDonneesAction(BaseVWBAction):
"""
Action de chargement de données.
Modes de chargement :
- cle_valeur: Récupération par clé
- collection: Récupération d'une collection
- sql: Requête SQL personnalisée
"""
def __init__(
self,
action_id: str,
parameters: Dict[str, Any],
screen_capturer=None
):
"""
Initialise l'action de chargement.
Args:
action_id: Identifiant unique de l'action
parameters: Paramètres de chargement
screen_capturer: Non utilisé, présent pour compatibilité
"""
super().__init__(
action_id=action_id,
name="Charger Données",
description="Charge des données sauvegardées précédemment",
parameters=parameters,
screen_capturer=screen_capturer
)
# Mode de chargement
self.mode = parameters.get('mode', 'cle_valeur') # cle_valeur, collection, sql, lister
# Paramètres clé-valeur
self.cle = parameters.get('cle', parameters.get('key', ''))
self.valeur_defaut = parameters.get('valeur_defaut', parameters.get('default_value'))
# Paramètres collection
self.nom_collection = parameters.get('nom_collection', parameters.get('collection_name', ''))
self.limite = parameters.get('limite', parameters.get('limit', 100))
self.dernier_seulement = parameters.get('dernier_seulement', parameters.get('last_only', False))
# Paramètres SQL
self.requete_sql = parameters.get('requete_sql', parameters.get('sql_query', ''))
self.parametres_sql = parameters.get('parametres_sql', parameters.get('sql_params', ()))
# Paramètres liste
self.prefixe_cle = parameters.get('prefixe_cle', parameters.get('key_prefix', ''))
# Configuration
self.chemin_db = parameters.get('chemin_db', parameters.get('db_path'))
self.workflow_id = parameters.get('workflow_id')
# Variable de sortie
self.variable_sortie = parameters.get('variable_sortie', parameters.get('output_variable', 'donnees_chargees'))
def validate_parameters(self) -> List[str]:
"""Valide les paramètres de l'action."""
erreurs = []
if self.mode not in ['cle_valeur', 'collection', 'sql', 'lister']:
erreurs.append("Mode doit être 'cle_valeur', 'collection', 'sql' ou 'lister'")
if self.mode == 'cle_valeur' and not self.cle:
erreurs.append("Clé requise pour le mode clé-valeur")
if self.mode == 'collection' and not self.nom_collection:
erreurs.append("Nom de collection requis")
if self.mode == 'sql' and not self.requete_sql:
erreurs.append("Requête SQL requise")
if self.limite < 1 or self.limite > 10000:
erreurs.append("Limite doit être entre 1 et 10000")
return erreurs
def execute_core(self, step_id: str) -> VWBActionResult:
"""
Exécute le chargement des données.
Args:
step_id: Identifiant de l'étape
Returns:
Résultat avec les données chargées
"""
start_time = datetime.now()
try:
# Obtenir le gestionnaire de DB
db = GestionnaireDB.obtenir_instance(self.chemin_db)
# Exécuter selon le mode
if self.mode == 'cle_valeur':
resultat = self._charger_cle_valeur(db)
elif self.mode == 'collection':
resultat = self._charger_collection(db)
elif self.mode == 'sql':
resultat = self._executer_sql(db)
else: # lister
resultat = self._lister_cles(db)
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds() * 1000
donnees = resultat.get('donnees')
trouve = resultat.get('trouve', donnees is not None)
if trouve:
print(f"📂 Données chargées ({self.mode})")
self._afficher_resume(donnees)
else:
print(f"📂 Aucune donnée trouvée ({self.mode})")
return VWBActionResult(
action_id=self.action_id,
step_id=step_id,
status=VWBActionStatus.SUCCESS,
start_time=start_time,
end_time=end_time,
execution_time_ms=execution_time,
output_data={
'donnees': donnees,
'trouve': trouve,
'mode': self.mode,
'variable_sortie': self.variable_sortie,
'nb_resultats': self._compter_resultats(donnees)
},
evidence_list=self.evidence_list.copy()
)
except Exception as e:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message=f"Erreur: {str(e)}",
technical_details={'exception': str(e)}
)
def _charger_cle_valeur(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Charge une valeur par clé."""
valeur = db.charger(
cle=self.cle,
defaut=self.valeur_defaut
)
trouve = valeur != self.valeur_defaut or db.charger(self.cle) is not None
return {
'donnees': valeur,
'trouve': trouve
}
def _charger_collection(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Charge les données d'une collection."""
if self.dernier_seulement:
resultat = db.dernier_enregistrement(
nom_collection=self.nom_collection,
workflow_id=self.workflow_id
)
if resultat:
return {
'donnees': resultat['donnees'],
'trouve': True,
'metadata': {
'id': resultat['id'],
'created_at': resultat['created_at']
}
}
return {'donnees': None, 'trouve': False}
else:
resultats = db.charger_collection(
nom_collection=self.nom_collection,
limite=self.limite,
workflow_id=self.workflow_id
)
return {
'donnees': [r['donnees'] for r in resultats],
'trouve': len(resultats) > 0
}
def _executer_sql(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Exécute une requête SQL SELECT."""
resultats = db.executer_sql(
requete=self.requete_sql,
parametres=tuple(self.parametres_sql) if self.parametres_sql else ()
)
return {
'donnees': resultats,
'trouve': len(resultats) > 0
}
def _lister_cles(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Liste les clés disponibles."""
cles = db.lister_cles(
prefixe=self.prefixe_cle if self.prefixe_cle else None,
workflow_id=self.workflow_id
)
return {
'donnees': cles,
'trouve': len(cles) > 0
}
def _compter_resultats(self, donnees: Any) -> int:
"""Compte le nombre de résultats."""
if donnees is None:
return 0
if isinstance(donnees, list):
return len(donnees)
if isinstance(donnees, dict):
return len(donnees)
return 1
def _afficher_resume(self, donnees: Any):
"""Affiche un résumé des données chargées."""
if donnees is None:
return
if isinstance(donnees, list):
print(f" {len(donnees)} élément(s)")
elif isinstance(donnees, dict):
print(f" {len(donnees)} clé(s)")
else:
valeur_str = str(donnees)
if len(valeur_str) > 50:
valeur_str = valeur_str[:50] + '...'
print(f" Valeur: {valeur_str}")
def get_action_info(self) -> Dict[str, Any]:
"""Retourne les informations de l'action."""
return {
'action_id': self.action_id,
'name': self.name,
'description': self.description,
'type': 'charger_donnees',
'parameters': {
'mode': self.mode,
'cle': self.cle if self.mode == 'cle_valeur' else None,
'collection': self.nom_collection if self.mode == 'collection' else None,
'variable_sortie': self.variable_sortie
},
'status': self.current_status.value
}
# Alias anglais
VWBDBReadDataAction = VWBChargerDonneesAction

View File

@@ -0,0 +1,497 @@
"""
Gestionnaire de Base de Données VWB
Auteur : Dom, Claude - 14 janvier 2026
Module utilitaire pour la gestion SQLite des workflows VWB.
Fournit une interface simple pour le stockage persistant.
"""
import sqlite3
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
from contextlib import contextmanager
import threading
class GestionnaireDB:
"""
Gestionnaire de base de données SQLite pour VWB.
Fournit :
- Stockage clé-valeur simple
- Tables structurées
- Requêtes SQL avancées
- Thread-safe
"""
_instances: Dict[str, 'GestionnaireDB'] = {}
_lock = threading.Lock()
def __init__(self, chemin_db: Optional[str] = None):
"""
Initialise le gestionnaire.
Args:
chemin_db: Chemin vers le fichier SQLite (défaut: ~/.vwb/data.db)
"""
if chemin_db:
self.chemin_db = Path(chemin_db).expanduser()
else:
self.chemin_db = Path.home() / '.vwb' / 'data.db'
# Créer le dossier parent si nécessaire
self.chemin_db.parent.mkdir(parents=True, exist_ok=True)
# Initialiser la base
self._initialiser_schema()
@classmethod
def obtenir_instance(cls, chemin_db: Optional[str] = None) -> 'GestionnaireDB':
"""Obtient une instance singleton par chemin de DB."""
chemin = chemin_db or str(Path.home() / '.vwb' / 'data.db')
with cls._lock:
if chemin not in cls._instances:
cls._instances[chemin] = cls(chemin_db)
return cls._instances[chemin]
@contextmanager
def _connexion(self):
"""Context manager pour connexion thread-safe."""
conn = sqlite3.connect(str(self.chemin_db), timeout=30.0)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _initialiser_schema(self):
"""Initialise les tables de base."""
with self._connexion() as conn:
cursor = conn.cursor()
# Table clé-valeur simple
cursor.execute('''
CREATE TABLE IF NOT EXISTS kv_store (
cle TEXT PRIMARY KEY,
valeur TEXT,
type_donnee TEXT DEFAULT 'string',
workflow_id TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Table pour données structurées (JSON)
cursor.execute('''
CREATE TABLE IF NOT EXISTS donnees_structurees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nom_collection TEXT NOT NULL,
donnees TEXT NOT NULL,
workflow_id TEXT,
execution_id TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Index pour performances
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_kv_workflow
ON kv_store(workflow_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_struct_collection
ON donnees_structurees(nom_collection)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_struct_workflow
ON donnees_structurees(workflow_id)
''')
# ==================== Interface Clé-Valeur ====================
def sauvegarder(
self,
cle: str,
valeur: Any,
workflow_id: Optional[str] = None
) -> bool:
"""
Sauvegarde une valeur avec une clé.
Args:
cle: Clé unique
valeur: Valeur à sauvegarder (sera sérialisée en JSON si nécessaire)
workflow_id: ID du workflow (optionnel)
Returns:
True si succès
"""
try:
# Déterminer le type et sérialiser
if isinstance(valeur, (dict, list)):
valeur_str = json.dumps(valeur, ensure_ascii=False, default=str)
type_donnee = 'json'
elif isinstance(valeur, bool):
valeur_str = str(valeur).lower()
type_donnee = 'bool'
elif isinstance(valeur, (int, float)):
valeur_str = str(valeur)
type_donnee = 'number'
else:
valeur_str = str(valeur)
type_donnee = 'string'
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO kv_store
(cle, valeur, type_donnee, workflow_id, updated_at)
VALUES (?, ?, ?, ?, ?)
''', (cle, valeur_str, type_donnee, workflow_id, datetime.now().isoformat()))
return True
except Exception as e:
print(f"❌ Erreur sauvegarde: {e}")
return False
def charger(
self,
cle: str,
defaut: Any = None
) -> Any:
"""
Charge une valeur par sa clé.
Args:
cle: Clé à rechercher
defaut: Valeur par défaut si non trouvée
Returns:
Valeur désérialisée ou défaut
"""
try:
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute(
'SELECT valeur, type_donnee FROM kv_store WHERE cle = ?',
(cle,)
)
row = cursor.fetchone()
if not row:
return defaut
valeur_str, type_donnee = row['valeur'], row['type_donnee']
# Désérialiser selon le type
if type_donnee == 'json':
return json.loads(valeur_str)
elif type_donnee == 'bool':
return valeur_str.lower() == 'true'
elif type_donnee == 'number':
return float(valeur_str) if '.' in valeur_str else int(valeur_str)
else:
return valeur_str
except Exception as e:
print(f"❌ Erreur chargement: {e}")
return defaut
def supprimer(self, cle: str) -> bool:
"""Supprime une entrée par sa clé."""
try:
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM kv_store WHERE cle = ?', (cle,))
return True
except Exception:
return False
def lister_cles(
self,
prefixe: Optional[str] = None,
workflow_id: Optional[str] = None
) -> List[str]:
"""Liste les clés disponibles."""
try:
with self._connexion() as conn:
cursor = conn.cursor()
if prefixe and workflow_id:
cursor.execute(
'SELECT cle FROM kv_store WHERE cle LIKE ? AND workflow_id = ?',
(f'{prefixe}%', workflow_id)
)
elif prefixe:
cursor.execute(
'SELECT cle FROM kv_store WHERE cle LIKE ?',
(f'{prefixe}%',)
)
elif workflow_id:
cursor.execute(
'SELECT cle FROM kv_store WHERE workflow_id = ?',
(workflow_id,)
)
else:
cursor.execute('SELECT cle FROM kv_store')
return [row['cle'] for row in cursor.fetchall()]
except Exception:
return []
# ==================== Interface Collections ====================
def sauvegarder_collection(
self,
nom_collection: str,
donnees: Union[Dict, List],
workflow_id: Optional[str] = None,
execution_id: Optional[str] = None
) -> int:
"""
Sauvegarde des données structurées dans une collection.
Args:
nom_collection: Nom de la collection
donnees: Données (dict ou list)
workflow_id: ID du workflow
execution_id: ID de l'exécution
Returns:
ID de l'enregistrement créé
"""
try:
donnees_json = json.dumps(donnees, ensure_ascii=False, default=str)
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO donnees_structurees
(nom_collection, donnees, workflow_id, execution_id)
VALUES (?, ?, ?, ?)
''', (nom_collection, donnees_json, workflow_id, execution_id))
return cursor.lastrowid
except Exception as e:
print(f"❌ Erreur sauvegarde collection: {e}")
return -1
def charger_collection(
self,
nom_collection: str,
limite: int = 100,
workflow_id: Optional[str] = None
) -> List[Dict]:
"""
Charge les données d'une collection.
Args:
nom_collection: Nom de la collection
limite: Nombre max d'enregistrements
workflow_id: Filtrer par workflow
Returns:
Liste des enregistrements
"""
try:
with self._connexion() as conn:
cursor = conn.cursor()
if workflow_id:
cursor.execute('''
SELECT id, donnees, workflow_id, execution_id, created_at
FROM donnees_structurees
WHERE nom_collection = ? AND workflow_id = ?
ORDER BY created_at DESC
LIMIT ?
''', (nom_collection, workflow_id, limite))
else:
cursor.execute('''
SELECT id, donnees, workflow_id, execution_id, created_at
FROM donnees_structurees
WHERE nom_collection = ?
ORDER BY created_at DESC
LIMIT ?
''', (nom_collection, limite))
resultats = []
for row in cursor.fetchall():
resultats.append({
'id': row['id'],
'donnees': json.loads(row['donnees']),
'workflow_id': row['workflow_id'],
'execution_id': row['execution_id'],
'created_at': row['created_at']
})
return resultats
except Exception as e:
print(f"❌ Erreur chargement collection: {e}")
return []
def dernier_enregistrement(
self,
nom_collection: str,
workflow_id: Optional[str] = None
) -> Optional[Dict]:
"""Récupère le dernier enregistrement d'une collection."""
resultats = self.charger_collection(nom_collection, limite=1, workflow_id=workflow_id)
return resultats[0] if resultats else None
# ==================== Interface SQL Avancée ====================
def executer_sql(
self,
requete: str,
parametres: tuple = ()
) -> List[Dict]:
"""
Exécute une requête SQL personnalisée (SELECT uniquement pour sécurité).
Args:
requete: Requête SQL
parametres: Paramètres de la requête
Returns:
Résultats sous forme de liste de dicts
"""
# Sécurité: n'autoriser que SELECT
requete_upper = requete.strip().upper()
if not requete_upper.startswith('SELECT'):
raise ValueError("Seules les requêtes SELECT sont autorisées")
try:
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute(requete, parametres)
colonnes = [description[0] for description in cursor.description]
resultats = []
for row in cursor.fetchall():
resultats.append(dict(zip(colonnes, row)))
return resultats
except Exception as e:
print(f"❌ Erreur SQL: {e}")
return []
def executer_modification(
self,
requete: str,
parametres: tuple = ()
) -> int:
"""
Exécute une requête de modification (INSERT, UPDATE, DELETE).
Args:
requete: Requête SQL
parametres: Paramètres
Returns:
Nombre de lignes affectées
"""
try:
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute(requete, parametres)
return cursor.rowcount
except Exception as e:
print(f"❌ Erreur modification: {e}")
return -1
# ==================== Utilitaires ====================
def statistiques(self) -> Dict[str, Any]:
"""Retourne des statistiques sur la base."""
try:
with self._connexion() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as count FROM kv_store')
nb_cles = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM donnees_structurees')
nb_collections = cursor.fetchone()['count']
cursor.execute('SELECT DISTINCT nom_collection FROM donnees_structurees')
collections = [row['nom_collection'] for row in cursor.fetchall()]
taille = self.chemin_db.stat().st_size if self.chemin_db.exists() else 0
return {
'chemin': str(self.chemin_db),
'taille_octets': taille,
'nb_cles': nb_cles,
'nb_enregistrements': nb_collections,
'collections': collections
}
except Exception as e:
return {'erreur': str(e)}
def nettoyer(
self,
workflow_id: Optional[str] = None,
avant_date: Optional[str] = None
) -> int:
"""
Nettoie les anciennes données.
Args:
workflow_id: Supprimer pour ce workflow uniquement
avant_date: Supprimer les données avant cette date (ISO format)
Returns:
Nombre d'entrées supprimées
"""
total = 0
try:
with self._connexion() as conn:
cursor = conn.cursor()
if workflow_id and avant_date:
cursor.execute(
'DELETE FROM kv_store WHERE workflow_id = ? AND updated_at < ?',
(workflow_id, avant_date)
)
total += cursor.rowcount
cursor.execute(
'DELETE FROM donnees_structurees WHERE workflow_id = ? AND created_at < ?',
(workflow_id, avant_date)
)
total += cursor.rowcount
elif workflow_id:
cursor.execute('DELETE FROM kv_store WHERE workflow_id = ?', (workflow_id,))
total += cursor.rowcount
cursor.execute('DELETE FROM donnees_structurees WHERE workflow_id = ?', (workflow_id,))
total += cursor.rowcount
elif avant_date:
cursor.execute('DELETE FROM kv_store WHERE updated_at < ?', (avant_date,))
total += cursor.rowcount
cursor.execute('DELETE FROM donnees_structurees WHERE created_at < ?', (avant_date,))
total += cursor.rowcount
return total
except Exception as e:
print(f"❌ Erreur nettoyage: {e}")
return -1

View File

@@ -0,0 +1,242 @@
"""
Action Sauvegarder Données - Persistance des données de workflow
Auteur : Dom, Claude - 14 janvier 2026
Cette action permet de sauvegarder des données de manière persistante
pour les réutiliser entre exécutions ou dans d'autres workflows.
Cas d'usage :
- Sauvegarder les résultats d'extraction
- Stocker l'état du workflow
- Logger les données pour audit
- Partager des données entre workflows
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
from ...contracts.error import VWBErrorType, create_vwb_error
from .gestionnaire_db import GestionnaireDB
class VWBSauvegarderDonneesAction(BaseVWBAction):
"""
Action de sauvegarde de données.
Modes de sauvegarde :
- cle_valeur: Stockage simple clé-valeur
- collection: Ajout à une collection (historique)
- sql: Requête SQL personnalisée
"""
def __init__(
self,
action_id: str,
parameters: Dict[str, Any],
screen_capturer=None
):
"""
Initialise l'action de sauvegarde.
Args:
action_id: Identifiant unique de l'action
parameters: Paramètres de sauvegarde
screen_capturer: Non utilisé, présent pour compatibilité
"""
super().__init__(
action_id=action_id,
name="Sauvegarder Données",
description="Sauvegarde des données de manière persistante",
parameters=parameters,
screen_capturer=screen_capturer
)
# Mode de sauvegarde
self.mode = parameters.get('mode', 'cle_valeur') # cle_valeur, collection, sql
# Paramètres clé-valeur
self.cle = parameters.get('cle', parameters.get('key', ''))
self.valeur = parameters.get('valeur', parameters.get('value'))
# Paramètres collection
self.nom_collection = parameters.get('nom_collection', parameters.get('collection_name', ''))
self.donnees = parameters.get('donnees', parameters.get('data'))
# Paramètres SQL avancés
self.requete_sql = parameters.get('requete_sql', parameters.get('sql_query', ''))
self.parametres_sql = parameters.get('parametres_sql', parameters.get('sql_params', ()))
# Configuration
self.chemin_db = parameters.get('chemin_db', parameters.get('db_path'))
self.workflow_id = parameters.get('workflow_id')
self.execution_id = parameters.get('execution_id')
# Options
self.ecraser = parameters.get('ecraser', parameters.get('overwrite', True))
def validate_parameters(self) -> List[str]:
"""Valide les paramètres de l'action."""
erreurs = []
if self.mode not in ['cle_valeur', 'collection', 'sql']:
erreurs.append("Mode doit être 'cle_valeur', 'collection' ou 'sql'")
if self.mode == 'cle_valeur':
if not self.cle:
erreurs.append("Clé requise pour le mode clé-valeur")
if self.valeur is None:
erreurs.append("Valeur requise pour le mode clé-valeur")
elif self.mode == 'collection':
if not self.nom_collection:
erreurs.append("Nom de collection requis")
if self.donnees is None:
erreurs.append("Données requises pour la collection")
elif self.mode == 'sql':
if not self.requete_sql:
erreurs.append("Requête SQL requise")
return erreurs
def execute_core(self, step_id: str) -> VWBActionResult:
"""
Exécute la sauvegarde des données.
Args:
step_id: Identifiant de l'étape
Returns:
Résultat de l'exécution
"""
start_time = datetime.now()
try:
# Obtenir le gestionnaire de DB
db = GestionnaireDB.obtenir_instance(self.chemin_db)
# Exécuter selon le mode
if self.mode == 'cle_valeur':
resultat = self._sauvegarder_cle_valeur(db)
elif self.mode == 'collection':
resultat = self._sauvegarder_collection(db)
else: # sql
resultat = self._executer_sql(db)
if not resultat['succes']:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message=resultat.get('erreur', 'Échec de la sauvegarde')
)
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds() * 1000
print(f"💾 Données sauvegardées ({self.mode})")
return VWBActionResult(
action_id=self.action_id,
step_id=step_id,
status=VWBActionStatus.SUCCESS,
start_time=start_time,
end_time=end_time,
execution_time_ms=execution_time,
output_data={
'mode': self.mode,
'details': resultat.get('details', {}),
'chemin_db': str(db.chemin_db)
},
evidence_list=self.evidence_list.copy()
)
except Exception as e:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message=f"Erreur: {str(e)}",
technical_details={'exception': str(e)}
)
def _sauvegarder_cle_valeur(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Sauvegarde en mode clé-valeur."""
succes = db.sauvegarder(
cle=self.cle,
valeur=self.valeur,
workflow_id=self.workflow_id
)
if succes:
print(f" Clé: {self.cle}")
return {
'succes': True,
'details': {
'cle': self.cle,
'type_valeur': type(self.valeur).__name__
}
}
else:
return {'succes': False, 'erreur': 'Échec sauvegarde clé-valeur'}
def _sauvegarder_collection(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Sauvegarde en mode collection."""
id_enregistrement = db.sauvegarder_collection(
nom_collection=self.nom_collection,
donnees=self.donnees,
workflow_id=self.workflow_id,
execution_id=self.execution_id
)
if id_enregistrement > 0:
print(f" Collection: {self.nom_collection} (ID: {id_enregistrement})")
return {
'succes': True,
'details': {
'collection': self.nom_collection,
'id': id_enregistrement,
'nb_elements': len(self.donnees) if isinstance(self.donnees, (list, dict)) else 1
}
}
else:
return {'succes': False, 'erreur': 'Échec sauvegarde collection'}
def _executer_sql(self, db: GestionnaireDB) -> Dict[str, Any]:
"""Exécute une requête SQL de modification."""
nb_lignes = db.executer_modification(
requete=self.requete_sql,
parametres=tuple(self.parametres_sql) if self.parametres_sql else ()
)
if nb_lignes >= 0:
print(f" SQL: {nb_lignes} ligne(s) affectée(s)")
return {
'succes': True,
'details': {
'lignes_affectees': nb_lignes,
'requete': self.requete_sql[:50] + '...' if len(self.requete_sql) > 50 else self.requete_sql
}
}
else:
return {'succes': False, 'erreur': 'Échec requête SQL'}
def get_action_info(self) -> Dict[str, Any]:
"""Retourne les informations de l'action."""
return {
'action_id': self.action_id,
'name': self.name,
'description': self.description,
'type': 'sauvegarder_donnees',
'parameters': {
'mode': self.mode,
'cle': self.cle if self.mode == 'cle_valeur' else None,
'collection': self.nom_collection if self.mode == 'collection' else None
},
'status': self.current_status.value
}
# Alias anglais
VWBDBSaveDataAction = VWBSauvegarderDonneesAction