From 473ca84be54f2450a7b5278ddcfcf8ba96b762bb Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 14 Jan 2026 23:30:31 +0100 Subject: [PATCH] Feat: Actions database sauvegarder/charger_donnees MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Module complet de persistance SQLite pour VWB: GestionnaireDB: - Interface clé-valeur avec typage auto (string, number, bool, json) - Collections pour données structurées avec historique - Requêtes SQL personnalisées (SELECT/modifications) - Thread-safe, singleton par chemin de DB - Statistiques et nettoyage Actions: - sauvegarder_donnees: 3 modes (cle_valeur, collection, sql) - charger_donnees: 4 modes (cle_valeur, collection, sql, lister) Base par défaut: ~/.vwb/data.db Co-Authored-By: Claude Opus 4.5 --- .../backend/actions/database/__init__.py | 33 ++ .../actions/database/charger_donnees.py | 276 ++++++++++ .../actions/database/gestionnaire_db.py | 497 ++++++++++++++++++ .../actions/database/sauvegarder_donnees.py | 242 +++++++++ 4 files changed, 1048 insertions(+) create mode 100644 visual_workflow_builder/backend/actions/database/__init__.py create mode 100644 visual_workflow_builder/backend/actions/database/charger_donnees.py create mode 100644 visual_workflow_builder/backend/actions/database/gestionnaire_db.py create mode 100644 visual_workflow_builder/backend/actions/database/sauvegarder_donnees.py diff --git a/visual_workflow_builder/backend/actions/database/__init__.py b/visual_workflow_builder/backend/actions/database/__init__.py new file mode 100644 index 000000000..a858aeab4 --- /dev/null +++ b/visual_workflow_builder/backend/actions/database/__init__.py @@ -0,0 +1,33 @@ +""" +Actions Database VWB - Module d'initialisation +Auteur : Dom, Claude - 14 janvier 2026 + +Ce module contient les actions de persistance de données +pour le Visual Workflow Builder. + +Actions disponibles : +- VWBSauvegarderDonneesAction : Sauvegarde de données (clé-valeur, collection, SQL) +- VWBChargerDonneesAction : Chargement de données + +Utilitaires : +- GestionnaireDB : Gestionnaire SQLite thread-safe +""" + +from .gestionnaire_db import GestionnaireDB +from .sauvegarder_donnees import VWBSauvegarderDonneesAction, VWBDBSaveDataAction +from .charger_donnees import VWBChargerDonneesAction, VWBDBReadDataAction + +__all__ = [ + # Gestionnaire + 'GestionnaireDB', + # Actions françaises + 'VWBSauvegarderDonneesAction', + 'VWBChargerDonneesAction', + # Alias anglais + 'VWBDBSaveDataAction', + 'VWBDBReadDataAction', +] + +__version__ = '1.0.0' +__author__ = 'Dom, Claude' +__date__ = '14 janvier 2026' diff --git a/visual_workflow_builder/backend/actions/database/charger_donnees.py b/visual_workflow_builder/backend/actions/database/charger_donnees.py new file mode 100644 index 000000000..a11406158 --- /dev/null +++ b/visual_workflow_builder/backend/actions/database/charger_donnees.py @@ -0,0 +1,276 @@ +""" +Action Charger Données - Récupération des données persistantes +Auteur : Dom, Claude - 14 janvier 2026 + +Cette action permet de charger des données sauvegardées précédemment +pour les utiliser dans le workflow courant. + +Cas d'usage : +- Récupérer des résultats d'exécutions précédentes +- Charger une configuration +- Comparer avec des données historiques +- Reprendre un workflow interrompu +""" + +from typing import Dict, Any, List, Optional +from datetime import datetime + +from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus +from ...contracts.error import VWBErrorType, create_vwb_error +from .gestionnaire_db import GestionnaireDB + + +class VWBChargerDonneesAction(BaseVWBAction): + """ + Action de chargement de données. + + Modes de chargement : + - cle_valeur: Récupération par clé + - collection: Récupération d'une collection + - sql: Requête SQL personnalisée + """ + + def __init__( + self, + action_id: str, + parameters: Dict[str, Any], + screen_capturer=None + ): + """ + Initialise l'action de chargement. + + Args: + action_id: Identifiant unique de l'action + parameters: Paramètres de chargement + screen_capturer: Non utilisé, présent pour compatibilité + """ + super().__init__( + action_id=action_id, + name="Charger Données", + description="Charge des données sauvegardées précédemment", + parameters=parameters, + screen_capturer=screen_capturer + ) + + # Mode de chargement + self.mode = parameters.get('mode', 'cle_valeur') # cle_valeur, collection, sql, lister + + # Paramètres clé-valeur + self.cle = parameters.get('cle', parameters.get('key', '')) + self.valeur_defaut = parameters.get('valeur_defaut', parameters.get('default_value')) + + # Paramètres collection + self.nom_collection = parameters.get('nom_collection', parameters.get('collection_name', '')) + self.limite = parameters.get('limite', parameters.get('limit', 100)) + self.dernier_seulement = parameters.get('dernier_seulement', parameters.get('last_only', False)) + + # Paramètres SQL + self.requete_sql = parameters.get('requete_sql', parameters.get('sql_query', '')) + self.parametres_sql = parameters.get('parametres_sql', parameters.get('sql_params', ())) + + # Paramètres liste + self.prefixe_cle = parameters.get('prefixe_cle', parameters.get('key_prefix', '')) + + # Configuration + self.chemin_db = parameters.get('chemin_db', parameters.get('db_path')) + self.workflow_id = parameters.get('workflow_id') + + # Variable de sortie + self.variable_sortie = parameters.get('variable_sortie', parameters.get('output_variable', 'donnees_chargees')) + + def validate_parameters(self) -> List[str]: + """Valide les paramètres de l'action.""" + erreurs = [] + + if self.mode not in ['cle_valeur', 'collection', 'sql', 'lister']: + erreurs.append("Mode doit être 'cle_valeur', 'collection', 'sql' ou 'lister'") + + if self.mode == 'cle_valeur' and not self.cle: + erreurs.append("Clé requise pour le mode clé-valeur") + + if self.mode == 'collection' and not self.nom_collection: + erreurs.append("Nom de collection requis") + + if self.mode == 'sql' and not self.requete_sql: + erreurs.append("Requête SQL requise") + + if self.limite < 1 or self.limite > 10000: + erreurs.append("Limite doit être entre 1 et 10000") + + return erreurs + + def execute_core(self, step_id: str) -> VWBActionResult: + """ + Exécute le chargement des données. + + Args: + step_id: Identifiant de l'étape + + Returns: + Résultat avec les données chargées + """ + start_time = datetime.now() + + try: + # Obtenir le gestionnaire de DB + db = GestionnaireDB.obtenir_instance(self.chemin_db) + + # Exécuter selon le mode + if self.mode == 'cle_valeur': + resultat = self._charger_cle_valeur(db) + elif self.mode == 'collection': + resultat = self._charger_collection(db) + elif self.mode == 'sql': + resultat = self._executer_sql(db) + else: # lister + resultat = self._lister_cles(db) + + end_time = datetime.now() + execution_time = (end_time - start_time).total_seconds() * 1000 + + donnees = resultat.get('donnees') + trouve = resultat.get('trouve', donnees is not None) + + if trouve: + print(f"📂 Données chargées ({self.mode})") + self._afficher_resume(donnees) + else: + print(f"📂 Aucune donnée trouvée ({self.mode})") + + return VWBActionResult( + action_id=self.action_id, + step_id=step_id, + status=VWBActionStatus.SUCCESS, + start_time=start_time, + end_time=end_time, + execution_time_ms=execution_time, + output_data={ + 'donnees': donnees, + 'trouve': trouve, + 'mode': self.mode, + 'variable_sortie': self.variable_sortie, + 'nb_resultats': self._compter_resultats(donnees) + }, + evidence_list=self.evidence_list.copy() + ) + + except Exception as e: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=f"Erreur: {str(e)}", + technical_details={'exception': str(e)} + ) + + def _charger_cle_valeur(self, db: GestionnaireDB) -> Dict[str, Any]: + """Charge une valeur par clé.""" + valeur = db.charger( + cle=self.cle, + defaut=self.valeur_defaut + ) + + trouve = valeur != self.valeur_defaut or db.charger(self.cle) is not None + + return { + 'donnees': valeur, + 'trouve': trouve + } + + def _charger_collection(self, db: GestionnaireDB) -> Dict[str, Any]: + """Charge les données d'une collection.""" + if self.dernier_seulement: + resultat = db.dernier_enregistrement( + nom_collection=self.nom_collection, + workflow_id=self.workflow_id + ) + if resultat: + return { + 'donnees': resultat['donnees'], + 'trouve': True, + 'metadata': { + 'id': resultat['id'], + 'created_at': resultat['created_at'] + } + } + return {'donnees': None, 'trouve': False} + + else: + resultats = db.charger_collection( + nom_collection=self.nom_collection, + limite=self.limite, + workflow_id=self.workflow_id + ) + return { + 'donnees': [r['donnees'] for r in resultats], + 'trouve': len(resultats) > 0 + } + + def _executer_sql(self, db: GestionnaireDB) -> Dict[str, Any]: + """Exécute une requête SQL SELECT.""" + resultats = db.executer_sql( + requete=self.requete_sql, + parametres=tuple(self.parametres_sql) if self.parametres_sql else () + ) + + return { + 'donnees': resultats, + 'trouve': len(resultats) > 0 + } + + def _lister_cles(self, db: GestionnaireDB) -> Dict[str, Any]: + """Liste les clés disponibles.""" + cles = db.lister_cles( + prefixe=self.prefixe_cle if self.prefixe_cle else None, + workflow_id=self.workflow_id + ) + + return { + 'donnees': cles, + 'trouve': len(cles) > 0 + } + + def _compter_resultats(self, donnees: Any) -> int: + """Compte le nombre de résultats.""" + if donnees is None: + return 0 + if isinstance(donnees, list): + return len(donnees) + if isinstance(donnees, dict): + return len(donnees) + return 1 + + def _afficher_resume(self, donnees: Any): + """Affiche un résumé des données chargées.""" + if donnees is None: + return + + if isinstance(donnees, list): + print(f" {len(donnees)} élément(s)") + elif isinstance(donnees, dict): + print(f" {len(donnees)} clé(s)") + else: + valeur_str = str(donnees) + if len(valeur_str) > 50: + valeur_str = valeur_str[:50] + '...' + print(f" Valeur: {valeur_str}") + + def get_action_info(self) -> Dict[str, Any]: + """Retourne les informations de l'action.""" + return { + 'action_id': self.action_id, + 'name': self.name, + 'description': self.description, + 'type': 'charger_donnees', + 'parameters': { + 'mode': self.mode, + 'cle': self.cle if self.mode == 'cle_valeur' else None, + 'collection': self.nom_collection if self.mode == 'collection' else None, + 'variable_sortie': self.variable_sortie + }, + 'status': self.current_status.value + } + + +# Alias anglais +VWBDBReadDataAction = VWBChargerDonneesAction diff --git a/visual_workflow_builder/backend/actions/database/gestionnaire_db.py b/visual_workflow_builder/backend/actions/database/gestionnaire_db.py new file mode 100644 index 000000000..92b8d681e --- /dev/null +++ b/visual_workflow_builder/backend/actions/database/gestionnaire_db.py @@ -0,0 +1,497 @@ +""" +Gestionnaire de Base de Données VWB +Auteur : Dom, Claude - 14 janvier 2026 + +Module utilitaire pour la gestion SQLite des workflows VWB. +Fournit une interface simple pour le stockage persistant. +""" + +import sqlite3 +import json +from pathlib import Path +from datetime import datetime +from typing import Dict, Any, List, Optional, Union +from contextlib import contextmanager +import threading + + +class GestionnaireDB: + """ + Gestionnaire de base de données SQLite pour VWB. + + Fournit : + - Stockage clé-valeur simple + - Tables structurées + - Requêtes SQL avancées + - Thread-safe + """ + + _instances: Dict[str, 'GestionnaireDB'] = {} + _lock = threading.Lock() + + def __init__(self, chemin_db: Optional[str] = None): + """ + Initialise le gestionnaire. + + Args: + chemin_db: Chemin vers le fichier SQLite (défaut: ~/.vwb/data.db) + """ + if chemin_db: + self.chemin_db = Path(chemin_db).expanduser() + else: + self.chemin_db = Path.home() / '.vwb' / 'data.db' + + # Créer le dossier parent si nécessaire + self.chemin_db.parent.mkdir(parents=True, exist_ok=True) + + # Initialiser la base + self._initialiser_schema() + + @classmethod + def obtenir_instance(cls, chemin_db: Optional[str] = None) -> 'GestionnaireDB': + """Obtient une instance singleton par chemin de DB.""" + chemin = chemin_db or str(Path.home() / '.vwb' / 'data.db') + + with cls._lock: + if chemin not in cls._instances: + cls._instances[chemin] = cls(chemin_db) + return cls._instances[chemin] + + @contextmanager + def _connexion(self): + """Context manager pour connexion thread-safe.""" + conn = sqlite3.connect(str(self.chemin_db), timeout=30.0) + conn.row_factory = sqlite3.Row + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + def _initialiser_schema(self): + """Initialise les tables de base.""" + with self._connexion() as conn: + cursor = conn.cursor() + + # Table clé-valeur simple + cursor.execute(''' + CREATE TABLE IF NOT EXISTS kv_store ( + cle TEXT PRIMARY KEY, + valeur TEXT, + type_donnee TEXT DEFAULT 'string', + workflow_id TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # Table pour données structurées (JSON) + cursor.execute(''' + CREATE TABLE IF NOT EXISTS donnees_structurees ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + nom_collection TEXT NOT NULL, + donnees TEXT NOT NULL, + workflow_id TEXT, + execution_id TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # Index pour performances + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_kv_workflow + ON kv_store(workflow_id) + ''') + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_struct_collection + ON donnees_structurees(nom_collection) + ''') + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_struct_workflow + ON donnees_structurees(workflow_id) + ''') + + # ==================== Interface Clé-Valeur ==================== + + def sauvegarder( + self, + cle: str, + valeur: Any, + workflow_id: Optional[str] = None + ) -> bool: + """ + Sauvegarde une valeur avec une clé. + + Args: + cle: Clé unique + valeur: Valeur à sauvegarder (sera sérialisée en JSON si nécessaire) + workflow_id: ID du workflow (optionnel) + + Returns: + True si succès + """ + try: + # Déterminer le type et sérialiser + if isinstance(valeur, (dict, list)): + valeur_str = json.dumps(valeur, ensure_ascii=False, default=str) + type_donnee = 'json' + elif isinstance(valeur, bool): + valeur_str = str(valeur).lower() + type_donnee = 'bool' + elif isinstance(valeur, (int, float)): + valeur_str = str(valeur) + type_donnee = 'number' + else: + valeur_str = str(valeur) + type_donnee = 'string' + + with self._connexion() as conn: + cursor = conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO kv_store + (cle, valeur, type_donnee, workflow_id, updated_at) + VALUES (?, ?, ?, ?, ?) + ''', (cle, valeur_str, type_donnee, workflow_id, datetime.now().isoformat())) + + return True + + except Exception as e: + print(f"❌ Erreur sauvegarde: {e}") + return False + + def charger( + self, + cle: str, + defaut: Any = None + ) -> Any: + """ + Charge une valeur par sa clé. + + Args: + cle: Clé à rechercher + defaut: Valeur par défaut si non trouvée + + Returns: + Valeur désérialisée ou défaut + """ + try: + with self._connexion() as conn: + cursor = conn.cursor() + cursor.execute( + 'SELECT valeur, type_donnee FROM kv_store WHERE cle = ?', + (cle,) + ) + row = cursor.fetchone() + + if not row: + return defaut + + valeur_str, type_donnee = row['valeur'], row['type_donnee'] + + # Désérialiser selon le type + if type_donnee == 'json': + return json.loads(valeur_str) + elif type_donnee == 'bool': + return valeur_str.lower() == 'true' + elif type_donnee == 'number': + return float(valeur_str) if '.' in valeur_str else int(valeur_str) + else: + return valeur_str + + except Exception as e: + print(f"❌ Erreur chargement: {e}") + return defaut + + def supprimer(self, cle: str) -> bool: + """Supprime une entrée par sa clé.""" + try: + with self._connexion() as conn: + cursor = conn.cursor() + cursor.execute('DELETE FROM kv_store WHERE cle = ?', (cle,)) + return True + except Exception: + return False + + def lister_cles( + self, + prefixe: Optional[str] = None, + workflow_id: Optional[str] = None + ) -> List[str]: + """Liste les clés disponibles.""" + try: + with self._connexion() as conn: + cursor = conn.cursor() + + if prefixe and workflow_id: + cursor.execute( + 'SELECT cle FROM kv_store WHERE cle LIKE ? AND workflow_id = ?', + (f'{prefixe}%', workflow_id) + ) + elif prefixe: + cursor.execute( + 'SELECT cle FROM kv_store WHERE cle LIKE ?', + (f'{prefixe}%',) + ) + elif workflow_id: + cursor.execute( + 'SELECT cle FROM kv_store WHERE workflow_id = ?', + (workflow_id,) + ) + else: + cursor.execute('SELECT cle FROM kv_store') + + return [row['cle'] for row in cursor.fetchall()] + + except Exception: + return [] + + # ==================== Interface Collections ==================== + + def sauvegarder_collection( + self, + nom_collection: str, + donnees: Union[Dict, List], + workflow_id: Optional[str] = None, + execution_id: Optional[str] = None + ) -> int: + """ + Sauvegarde des données structurées dans une collection. + + Args: + nom_collection: Nom de la collection + donnees: Données (dict ou list) + workflow_id: ID du workflow + execution_id: ID de l'exécution + + Returns: + ID de l'enregistrement créé + """ + try: + donnees_json = json.dumps(donnees, ensure_ascii=False, default=str) + + with self._connexion() as conn: + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO donnees_structurees + (nom_collection, donnees, workflow_id, execution_id) + VALUES (?, ?, ?, ?) + ''', (nom_collection, donnees_json, workflow_id, execution_id)) + return cursor.lastrowid + + except Exception as e: + print(f"❌ Erreur sauvegarde collection: {e}") + return -1 + + def charger_collection( + self, + nom_collection: str, + limite: int = 100, + workflow_id: Optional[str] = None + ) -> List[Dict]: + """ + Charge les données d'une collection. + + Args: + nom_collection: Nom de la collection + limite: Nombre max d'enregistrements + workflow_id: Filtrer par workflow + + Returns: + Liste des enregistrements + """ + try: + with self._connexion() as conn: + cursor = conn.cursor() + + if workflow_id: + cursor.execute(''' + SELECT id, donnees, workflow_id, execution_id, created_at + FROM donnees_structurees + WHERE nom_collection = ? AND workflow_id = ? + ORDER BY created_at DESC + LIMIT ? + ''', (nom_collection, workflow_id, limite)) + else: + cursor.execute(''' + SELECT id, donnees, workflow_id, execution_id, created_at + FROM donnees_structurees + WHERE nom_collection = ? + ORDER BY created_at DESC + LIMIT ? + ''', (nom_collection, limite)) + + resultats = [] + for row in cursor.fetchall(): + resultats.append({ + 'id': row['id'], + 'donnees': json.loads(row['donnees']), + 'workflow_id': row['workflow_id'], + 'execution_id': row['execution_id'], + 'created_at': row['created_at'] + }) + + return resultats + + except Exception as e: + print(f"❌ Erreur chargement collection: {e}") + return [] + + def dernier_enregistrement( + self, + nom_collection: str, + workflow_id: Optional[str] = None + ) -> Optional[Dict]: + """Récupère le dernier enregistrement d'une collection.""" + resultats = self.charger_collection(nom_collection, limite=1, workflow_id=workflow_id) + return resultats[0] if resultats else None + + # ==================== Interface SQL Avancée ==================== + + def executer_sql( + self, + requete: str, + parametres: tuple = () + ) -> List[Dict]: + """ + Exécute une requête SQL personnalisée (SELECT uniquement pour sécurité). + + Args: + requete: Requête SQL + parametres: Paramètres de la requête + + Returns: + Résultats sous forme de liste de dicts + """ + # Sécurité: n'autoriser que SELECT + requete_upper = requete.strip().upper() + if not requete_upper.startswith('SELECT'): + raise ValueError("Seules les requêtes SELECT sont autorisées") + + try: + with self._connexion() as conn: + cursor = conn.cursor() + cursor.execute(requete, parametres) + + colonnes = [description[0] for description in cursor.description] + resultats = [] + + for row in cursor.fetchall(): + resultats.append(dict(zip(colonnes, row))) + + return resultats + + except Exception as e: + print(f"❌ Erreur SQL: {e}") + return [] + + def executer_modification( + self, + requete: str, + parametres: tuple = () + ) -> int: + """ + Exécute une requête de modification (INSERT, UPDATE, DELETE). + + Args: + requete: Requête SQL + parametres: Paramètres + + Returns: + Nombre de lignes affectées + """ + try: + with self._connexion() as conn: + cursor = conn.cursor() + cursor.execute(requete, parametres) + return cursor.rowcount + + except Exception as e: + print(f"❌ Erreur modification: {e}") + return -1 + + # ==================== Utilitaires ==================== + + def statistiques(self) -> Dict[str, Any]: + """Retourne des statistiques sur la base.""" + try: + with self._connexion() as conn: + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(*) as count FROM kv_store') + nb_cles = cursor.fetchone()['count'] + + cursor.execute('SELECT COUNT(*) as count FROM donnees_structurees') + nb_collections = cursor.fetchone()['count'] + + cursor.execute('SELECT DISTINCT nom_collection FROM donnees_structurees') + collections = [row['nom_collection'] for row in cursor.fetchall()] + + taille = self.chemin_db.stat().st_size if self.chemin_db.exists() else 0 + + return { + 'chemin': str(self.chemin_db), + 'taille_octets': taille, + 'nb_cles': nb_cles, + 'nb_enregistrements': nb_collections, + 'collections': collections + } + + except Exception as e: + return {'erreur': str(e)} + + def nettoyer( + self, + workflow_id: Optional[str] = None, + avant_date: Optional[str] = None + ) -> int: + """ + Nettoie les anciennes données. + + Args: + workflow_id: Supprimer pour ce workflow uniquement + avant_date: Supprimer les données avant cette date (ISO format) + + Returns: + Nombre d'entrées supprimées + """ + total = 0 + + try: + with self._connexion() as conn: + cursor = conn.cursor() + + if workflow_id and avant_date: + cursor.execute( + 'DELETE FROM kv_store WHERE workflow_id = ? AND updated_at < ?', + (workflow_id, avant_date) + ) + total += cursor.rowcount + + cursor.execute( + 'DELETE FROM donnees_structurees WHERE workflow_id = ? AND created_at < ?', + (workflow_id, avant_date) + ) + total += cursor.rowcount + + elif workflow_id: + cursor.execute('DELETE FROM kv_store WHERE workflow_id = ?', (workflow_id,)) + total += cursor.rowcount + + cursor.execute('DELETE FROM donnees_structurees WHERE workflow_id = ?', (workflow_id,)) + total += cursor.rowcount + + elif avant_date: + cursor.execute('DELETE FROM kv_store WHERE updated_at < ?', (avant_date,)) + total += cursor.rowcount + + cursor.execute('DELETE FROM donnees_structurees WHERE created_at < ?', (avant_date,)) + total += cursor.rowcount + + return total + + except Exception as e: + print(f"❌ Erreur nettoyage: {e}") + return -1 diff --git a/visual_workflow_builder/backend/actions/database/sauvegarder_donnees.py b/visual_workflow_builder/backend/actions/database/sauvegarder_donnees.py new file mode 100644 index 000000000..b2cad1c42 --- /dev/null +++ b/visual_workflow_builder/backend/actions/database/sauvegarder_donnees.py @@ -0,0 +1,242 @@ +""" +Action Sauvegarder Données - Persistance des données de workflow +Auteur : Dom, Claude - 14 janvier 2026 + +Cette action permet de sauvegarder des données de manière persistante +pour les réutiliser entre exécutions ou dans d'autres workflows. + +Cas d'usage : +- Sauvegarder les résultats d'extraction +- Stocker l'état du workflow +- Logger les données pour audit +- Partager des données entre workflows +""" + +from typing import Dict, Any, List, Optional +from datetime import datetime + +from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus +from ...contracts.error import VWBErrorType, create_vwb_error +from .gestionnaire_db import GestionnaireDB + + +class VWBSauvegarderDonneesAction(BaseVWBAction): + """ + Action de sauvegarde de données. + + Modes de sauvegarde : + - cle_valeur: Stockage simple clé-valeur + - collection: Ajout à une collection (historique) + - sql: Requête SQL personnalisée + """ + + def __init__( + self, + action_id: str, + parameters: Dict[str, Any], + screen_capturer=None + ): + """ + Initialise l'action de sauvegarde. + + Args: + action_id: Identifiant unique de l'action + parameters: Paramètres de sauvegarde + screen_capturer: Non utilisé, présent pour compatibilité + """ + super().__init__( + action_id=action_id, + name="Sauvegarder Données", + description="Sauvegarde des données de manière persistante", + parameters=parameters, + screen_capturer=screen_capturer + ) + + # Mode de sauvegarde + self.mode = parameters.get('mode', 'cle_valeur') # cle_valeur, collection, sql + + # Paramètres clé-valeur + self.cle = parameters.get('cle', parameters.get('key', '')) + self.valeur = parameters.get('valeur', parameters.get('value')) + + # Paramètres collection + self.nom_collection = parameters.get('nom_collection', parameters.get('collection_name', '')) + self.donnees = parameters.get('donnees', parameters.get('data')) + + # Paramètres SQL avancés + self.requete_sql = parameters.get('requete_sql', parameters.get('sql_query', '')) + self.parametres_sql = parameters.get('parametres_sql', parameters.get('sql_params', ())) + + # Configuration + self.chemin_db = parameters.get('chemin_db', parameters.get('db_path')) + self.workflow_id = parameters.get('workflow_id') + self.execution_id = parameters.get('execution_id') + + # Options + self.ecraser = parameters.get('ecraser', parameters.get('overwrite', True)) + + def validate_parameters(self) -> List[str]: + """Valide les paramètres de l'action.""" + erreurs = [] + + if self.mode not in ['cle_valeur', 'collection', 'sql']: + erreurs.append("Mode doit être 'cle_valeur', 'collection' ou 'sql'") + + if self.mode == 'cle_valeur': + if not self.cle: + erreurs.append("Clé requise pour le mode clé-valeur") + if self.valeur is None: + erreurs.append("Valeur requise pour le mode clé-valeur") + + elif self.mode == 'collection': + if not self.nom_collection: + erreurs.append("Nom de collection requis") + if self.donnees is None: + erreurs.append("Données requises pour la collection") + + elif self.mode == 'sql': + if not self.requete_sql: + erreurs.append("Requête SQL requise") + + return erreurs + + def execute_core(self, step_id: str) -> VWBActionResult: + """ + Exécute la sauvegarde des données. + + Args: + step_id: Identifiant de l'étape + + Returns: + Résultat de l'exécution + """ + start_time = datetime.now() + + try: + # Obtenir le gestionnaire de DB + db = GestionnaireDB.obtenir_instance(self.chemin_db) + + # Exécuter selon le mode + if self.mode == 'cle_valeur': + resultat = self._sauvegarder_cle_valeur(db) + elif self.mode == 'collection': + resultat = self._sauvegarder_collection(db) + else: # sql + resultat = self._executer_sql(db) + + if not resultat['succes']: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=resultat.get('erreur', 'Échec de la sauvegarde') + ) + + end_time = datetime.now() + execution_time = (end_time - start_time).total_seconds() * 1000 + + print(f"💾 Données sauvegardées ({self.mode})") + + return VWBActionResult( + action_id=self.action_id, + step_id=step_id, + status=VWBActionStatus.SUCCESS, + start_time=start_time, + end_time=end_time, + execution_time_ms=execution_time, + output_data={ + 'mode': self.mode, + 'details': resultat.get('details', {}), + 'chemin_db': str(db.chemin_db) + }, + evidence_list=self.evidence_list.copy() + ) + + except Exception as e: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=f"Erreur: {str(e)}", + technical_details={'exception': str(e)} + ) + + def _sauvegarder_cle_valeur(self, db: GestionnaireDB) -> Dict[str, Any]: + """Sauvegarde en mode clé-valeur.""" + succes = db.sauvegarder( + cle=self.cle, + valeur=self.valeur, + workflow_id=self.workflow_id + ) + + if succes: + print(f" Clé: {self.cle}") + return { + 'succes': True, + 'details': { + 'cle': self.cle, + 'type_valeur': type(self.valeur).__name__ + } + } + else: + return {'succes': False, 'erreur': 'Échec sauvegarde clé-valeur'} + + def _sauvegarder_collection(self, db: GestionnaireDB) -> Dict[str, Any]: + """Sauvegarde en mode collection.""" + id_enregistrement = db.sauvegarder_collection( + nom_collection=self.nom_collection, + donnees=self.donnees, + workflow_id=self.workflow_id, + execution_id=self.execution_id + ) + + if id_enregistrement > 0: + print(f" Collection: {self.nom_collection} (ID: {id_enregistrement})") + return { + 'succes': True, + 'details': { + 'collection': self.nom_collection, + 'id': id_enregistrement, + 'nb_elements': len(self.donnees) if isinstance(self.donnees, (list, dict)) else 1 + } + } + else: + return {'succes': False, 'erreur': 'Échec sauvegarde collection'} + + def _executer_sql(self, db: GestionnaireDB) -> Dict[str, Any]: + """Exécute une requête SQL de modification.""" + nb_lignes = db.executer_modification( + requete=self.requete_sql, + parametres=tuple(self.parametres_sql) if self.parametres_sql else () + ) + + if nb_lignes >= 0: + print(f" SQL: {nb_lignes} ligne(s) affectée(s)") + return { + 'succes': True, + 'details': { + 'lignes_affectees': nb_lignes, + 'requete': self.requete_sql[:50] + '...' if len(self.requete_sql) > 50 else self.requete_sql + } + } + else: + return {'succes': False, 'erreur': 'Échec requête SQL'} + + def get_action_info(self) -> Dict[str, Any]: + """Retourne les informations de l'action.""" + return { + 'action_id': self.action_id, + 'name': self.name, + 'description': self.description, + 'type': 'sauvegarder_donnees', + 'parameters': { + 'mode': self.mode, + 'cle': self.cle if self.mode == 'cle_valeur' else None, + 'collection': self.nom_collection if self.mode == 'collection' else None + }, + 'status': self.current_status.value + } + + +# Alias anglais +VWBDBSaveDataAction = VWBSauvegarderDonneesAction