From f62e8e78528edfc27ebec855849bdc916fa8f420 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 14 Jan 2026 23:25:32 +0100 Subject: [PATCH] Feat: Action telecharger_vers_dossier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gestion complète des téléchargements de fichiers: - Surveillance du dossier source pour nouveaux fichiers - Attente fin de téléchargement (fichier stable) - Validation: taille min, extensions autorisées - Déplacement/renommage avec templates: {original}, {date}, {datetime}, {annee}, etc. - Gestion conflits de noms - Détection fichiers temporaires (.part, .crdownload) Co-Authored-By: Claude Opus 4.5 --- .../backend/actions/data/__init__.py | 8 +- .../actions/data/telecharger_vers_dossier.py | 470 ++++++++++++++++++ 2 files changed, 476 insertions(+), 2 deletions(-) create mode 100644 visual_workflow_builder/backend/actions/data/telecharger_vers_dossier.py diff --git a/visual_workflow_builder/backend/actions/data/__init__.py b/visual_workflow_builder/backend/actions/data/__init__.py index 42e74c90b..4772f098a 100644 --- a/visual_workflow_builder/backend/actions/data/__init__.py +++ b/visual_workflow_builder/backend/actions/data/__init__.py @@ -7,15 +7,19 @@ pour le Visual Workflow Builder. Actions disponibles : - VWBExtraireTableauAction : Extraction de données tabulaires +- VWBTelechargerVersDossierAction : Gestion des téléchargements """ from .extraire_tableau import VWBExtraireTableauAction, VWBExtractTableAction +from .telecharger_vers_dossier import VWBTelechargerVersDossierAction, VWBDownloadToFolderAction __all__ = [ 'VWBExtraireTableauAction', - 'VWBExtractTableAction', # Alias anglais + 'VWBExtractTableAction', + 'VWBTelechargerVersDossierAction', + 'VWBDownloadToFolderAction', ] -__version__ = '1.0.0' +__version__ = '1.1.0' __author__ = 'Dom, Claude' __date__ = '14 janvier 2026' diff --git a/visual_workflow_builder/backend/actions/data/telecharger_vers_dossier.py b/visual_workflow_builder/backend/actions/data/telecharger_vers_dossier.py new file mode 100644 index 000000000..6730a1fd7 --- /dev/null +++ b/visual_workflow_builder/backend/actions/data/telecharger_vers_dossier.py @@ -0,0 +1,470 @@ +""" +Action Télécharger vers Dossier - Gestion des téléchargements de fichiers +Auteur : Dom, Claude - 14 janvier 2026 + +Cette action permet de gérer les téléchargements de fichiers : +- Surveiller l'apparition d'un nouveau fichier +- Attendre la fin du téléchargement +- Déplacer/renommer le fichier +- Valider l'intégrité + +Cas d'usage : +- Télécharger des rapports depuis applications web +- Exporter des fichiers depuis Citrix/VDI +- Sauvegarder des pièces jointes +""" + +from typing import Dict, Any, List, Optional +from datetime import datetime +from pathlib import Path +import time +import shutil +import os +import re +import glob + +from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus +from ...contracts.error import VWBErrorType, create_vwb_error + + +class VWBTelechargerVersDossierAction(BaseVWBAction): + """ + Action de gestion des téléchargements. + + Surveille un dossier source pour détecter un nouveau fichier, + attend la fin du téléchargement, puis déplace/renomme le fichier + vers le dossier cible. + """ + + def __init__( + self, + action_id: str, + parameters: Dict[str, Any], + screen_capturer=None + ): + """ + Initialise l'action de téléchargement. + + Args: + action_id: Identifiant unique de l'action + parameters: Paramètres du téléchargement + screen_capturer: Instance du ScreenCapturer (optionnel) + """ + super().__init__( + action_id=action_id, + name="Télécharger vers Dossier", + description="Gère le téléchargement et déplacement de fichiers", + parameters=parameters, + screen_capturer=screen_capturer + ) + + # Dossiers + self.dossier_source = Path(parameters.get( + 'dossier_source', + parameters.get('source_folder', self._get_dossier_telechargements()) + )).expanduser() + + self.dossier_cible = parameters.get('dossier_cible', parameters.get('target_folder')) + if self.dossier_cible: + self.dossier_cible = Path(self.dossier_cible).expanduser() + + # Pattern de fichier à surveiller + self.pattern_fichier = parameters.get('pattern_fichier', parameters.get('file_pattern', '*')) + + # Renommage + self.nouveau_nom = parameters.get('nouveau_nom', parameters.get('new_name', '')) + self.prefixe_date = parameters.get('prefixe_date', parameters.get('date_prefix', False)) + self.format_date = parameters.get('format_date', parameters.get('date_format', '%Y%m%d_%H%M%S')) + + # Timeouts et validation + self.timeout_detection_ms = parameters.get('timeout_detection_ms', parameters.get('detection_timeout_ms', 30000)) + self.timeout_telechargement_ms = parameters.get('timeout_telechargement_ms', parameters.get('download_timeout_ms', 120000)) + self.intervalle_verification_ms = parameters.get('intervalle_verification_ms', parameters.get('check_interval_ms', 500)) + + # Validation + self.taille_min_octets = parameters.get('taille_min_octets', parameters.get('min_size_bytes', 0)) + self.extensions_autorisees = parameters.get('extensions_autorisees', parameters.get('allowed_extensions', [])) + + # Options + self.supprimer_source = parameters.get('supprimer_source', parameters.get('delete_source', True)) + self.ecraser_existant = parameters.get('ecraser_existant', parameters.get('overwrite_existing', False)) + + # Variable de sortie + self.variable_sortie = parameters.get('variable_sortie', parameters.get('output_variable', 'fichier_telecharge')) + + def _get_dossier_telechargements(self) -> str: + """Retourne le dossier de téléchargements par défaut.""" + # Linux + xdg_download = os.environ.get('XDG_DOWNLOAD_DIR') + if xdg_download: + return xdg_download + + home = Path.home() + + # Essayer les emplacements courants + for nom in ['Téléchargements', 'Downloads', 'downloads']: + chemin = home / nom + if chemin.exists(): + return str(chemin) + + return str(home / 'Downloads') + + def validate_parameters(self) -> List[str]: + """Valide les paramètres de l'action.""" + erreurs = [] + + if not self.dossier_source.exists(): + erreurs.append(f"Dossier source inexistant: {self.dossier_source}") + + if self.dossier_cible and not self.dossier_cible.parent.exists(): + erreurs.append(f"Dossier parent cible inexistant: {self.dossier_cible.parent}") + + if self.timeout_detection_ms < 1000: + erreurs.append("Timeout détection minimum: 1000ms") + + if self.timeout_telechargement_ms < 5000: + erreurs.append("Timeout téléchargement minimum: 5000ms") + + return erreurs + + def execute_core(self, step_id: str) -> VWBActionResult: + """ + Exécute la gestion du téléchargement. + + Args: + step_id: Identifiant de l'étape + + Returns: + Résultat avec les informations du fichier + """ + start_time = datetime.now() + + try: + # Étape 1: Lister les fichiers existants + fichiers_avant = self._lister_fichiers() + print(f"📂 Surveillance de {self.dossier_source} ({len(fichiers_avant)} fichiers existants)") + + # Étape 2: Attendre l'apparition d'un nouveau fichier + nouveau_fichier = self._attendre_nouveau_fichier(fichiers_avant) + + if not nouveau_fichier: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.TIMEOUT_ERROR, + message=f"Aucun nouveau fichier détecté après {self.timeout_detection_ms}ms" + ) + + print(f"📥 Nouveau fichier détecté: {nouveau_fichier.name}") + + # Étape 3: Attendre la fin du téléchargement + telechargement_ok = self._attendre_fin_telechargement(nouveau_fichier) + + if not telechargement_ok: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.TIMEOUT_ERROR, + message=f"Téléchargement non terminé après {self.timeout_telechargement_ms}ms" + ) + + # Étape 4: Valider le fichier + validation = self._valider_fichier(nouveau_fichier) + if not validation['valide']: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.VALIDATION_FAILED, + message=validation['erreur'] + ) + + # Étape 5: Déplacer/Renommer si demandé + fichier_final = nouveau_fichier + if self.dossier_cible or self.nouveau_nom: + fichier_final = self._deplacer_renommer(nouveau_fichier) + + if not fichier_final: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message="Échec du déplacement/renommage" + ) + + end_time = datetime.now() + execution_time = (end_time - start_time).total_seconds() * 1000 + + taille = fichier_final.stat().st_size + print(f"✅ Fichier prêt: {fichier_final.name} ({self._formater_taille(taille)})") + + return VWBActionResult( + action_id=self.action_id, + step_id=step_id, + status=VWBActionStatus.SUCCESS, + start_time=start_time, + end_time=end_time, + execution_time_ms=execution_time, + output_data={ + 'chemin_fichier': str(fichier_final), + 'nom_fichier': fichier_final.name, + 'taille_octets': taille, + 'taille_lisible': self._formater_taille(taille), + 'extension': fichier_final.suffix, + 'variable_sortie': self.variable_sortie + }, + evidence_list=self.evidence_list.copy() + ) + + except Exception as e: + return self._create_error_result( + step_id=step_id, + start_time=start_time, + error_type=VWBErrorType.SYSTEM_ERROR, + message=f"Erreur: {str(e)}", + technical_details={'exception': str(e)} + ) + + def _lister_fichiers(self) -> set: + """Liste les fichiers correspondant au pattern.""" + pattern_complet = str(self.dossier_source / self.pattern_fichier) + fichiers = set(glob.glob(pattern_complet)) + return fichiers + + def _attendre_nouveau_fichier(self, fichiers_avant: set) -> Optional[Path]: + """ + Attend l'apparition d'un nouveau fichier. + + Args: + fichiers_avant: Set des fichiers existants avant + + Returns: + Path du nouveau fichier ou None + """ + debut = time.time() + timeout_sec = self.timeout_detection_ms / 1000 + + while (time.time() - debut) < timeout_sec: + fichiers_maintenant = self._lister_fichiers() + nouveaux = fichiers_maintenant - fichiers_avant + + # Filtrer les fichiers temporaires (.part, .crdownload, .tmp) + for fichier in nouveaux: + path = Path(fichier) + if not self._est_fichier_temporaire(path): + return path + + time.sleep(self.intervalle_verification_ms / 1000) + + return None + + def _est_fichier_temporaire(self, path: Path) -> bool: + """Vérifie si c'est un fichier temporaire de téléchargement.""" + extensions_temp = {'.part', '.crdownload', '.tmp', '.download', '.partial'} + return path.suffix.lower() in extensions_temp or path.name.startswith('.') + + def _attendre_fin_telechargement(self, fichier: Path) -> bool: + """ + Attend que le fichier arrête de grossir. + + Args: + fichier: Path du fichier + + Returns: + True si téléchargement terminé + """ + debut = time.time() + timeout_sec = self.timeout_telechargement_ms / 1000 + taille_precedente = -1 + compteur_stable = 0 + seuil_stable = 3 # Nombre de vérifications consécutives sans changement + + while (time.time() - debut) < timeout_sec: + try: + if not fichier.exists(): + time.sleep(self.intervalle_verification_ms / 1000) + continue + + taille_actuelle = fichier.stat().st_size + + if taille_actuelle == taille_precedente: + compteur_stable += 1 + if compteur_stable >= seuil_stable: + # Vérifier qu'on peut ouvrir le fichier (pas verrouillé) + try: + with open(fichier, 'rb') as f: + f.read(1) + return True + except (IOError, PermissionError): + # Fichier encore verrouillé + compteur_stable = 0 + else: + compteur_stable = 0 + + taille_precedente = taille_actuelle + + except (FileNotFoundError, PermissionError): + pass + + time.sleep(self.intervalle_verification_ms / 1000) + + return False + + def _valider_fichier(self, fichier: Path) -> Dict[str, Any]: + """ + Valide le fichier téléchargé. + + Args: + fichier: Path du fichier + + Returns: + Dict avec 'valide' et 'erreur' si applicable + """ + # Vérifier l'existence + if not fichier.exists(): + return {'valide': False, 'erreur': 'Fichier introuvable'} + + # Vérifier la taille minimale + taille = fichier.stat().st_size + if taille < self.taille_min_octets: + return { + 'valide': False, + 'erreur': f"Fichier trop petit: {taille} < {self.taille_min_octets} octets" + } + + # Vérifier l'extension + if self.extensions_autorisees: + ext = fichier.suffix.lower().lstrip('.') + if ext not in [e.lower().lstrip('.') for e in self.extensions_autorisees]: + return { + 'valide': False, + 'erreur': f"Extension non autorisée: {fichier.suffix}" + } + + return {'valide': True} + + def _deplacer_renommer(self, fichier: Path) -> Optional[Path]: + """ + Déplace et/ou renomme le fichier. + + Args: + fichier: Path du fichier source + + Returns: + Path du fichier final ou None + """ + try: + # Déterminer le nouveau nom + if self.nouveau_nom: + nouveau_nom = self._appliquer_template_nom(self.nouveau_nom, fichier) + else: + nouveau_nom = fichier.name + + # Ajouter préfixe date si demandé + if self.prefixe_date: + date_str = datetime.now().strftime(self.format_date) + nouveau_nom = f"{date_str}_{nouveau_nom}" + + # Déterminer le dossier cible + dossier = self.dossier_cible if self.dossier_cible else fichier.parent + + # Créer le dossier cible si nécessaire + if not dossier.exists(): + dossier.mkdir(parents=True, exist_ok=True) + + # Chemin final + chemin_final = dossier / nouveau_nom + + # Gérer les conflits + if chemin_final.exists() and not self.ecraser_existant: + chemin_final = self._generer_nom_unique(chemin_final) + + # Déplacer ou copier + if self.supprimer_source: + shutil.move(str(fichier), str(chemin_final)) + else: + shutil.copy2(str(fichier), str(chemin_final)) + + print(f"📁 Fichier {'déplacé' if self.supprimer_source else 'copié'} vers: {chemin_final}") + return chemin_final + + except Exception as e: + print(f"❌ Erreur déplacement: {e}") + return None + + def _appliquer_template_nom(self, template: str, fichier: Path) -> str: + """ + Applique les variables au template de nom. + + Args: + template: Template avec {variables} + fichier: Fichier source + + Returns: + Nom final + """ + maintenant = datetime.now() + + remplacements = { + '{original}': fichier.stem, + '{extension}': fichier.suffix, + '{ext}': fichier.suffix.lstrip('.'), + '{date}': maintenant.strftime('%Y%m%d'), + '{heure}': maintenant.strftime('%H%M%S'), + '{datetime}': maintenant.strftime('%Y%m%d_%H%M%S'), + '{annee}': maintenant.strftime('%Y'), + '{mois}': maintenant.strftime('%m'), + '{jour}': maintenant.strftime('%d'), + } + + resultat = template + for var, valeur in remplacements.items(): + resultat = resultat.replace(var, valeur) + + # S'assurer qu'il y a une extension + if not Path(resultat).suffix and fichier.suffix: + resultat += fichier.suffix + + return resultat + + def _generer_nom_unique(self, chemin: Path) -> Path: + """Génère un nom unique si le fichier existe.""" + compteur = 1 + base = chemin.stem + ext = chemin.suffix + dossier = chemin.parent + + while chemin.exists(): + nouveau_nom = f"{base}_{compteur}{ext}" + chemin = dossier / nouveau_nom + compteur += 1 + + return chemin + + def _formater_taille(self, octets: int) -> str: + """Formate une taille en octets lisible.""" + for unite in ['o', 'Ko', 'Mo', 'Go']: + if octets < 1024: + return f"{octets:.1f} {unite}" + octets /= 1024 + return f"{octets:.1f} To" + + def get_action_info(self) -> Dict[str, Any]: + """Retourne les informations de l'action.""" + return { + 'action_id': self.action_id, + 'name': self.name, + 'description': self.description, + 'type': 'telecharger_vers_dossier', + 'parameters': { + 'dossier_source': str(self.dossier_source), + 'dossier_cible': str(self.dossier_cible) if self.dossier_cible else None, + 'pattern': self.pattern_fichier, + 'nouveau_nom': self.nouveau_nom, + 'variable_sortie': self.variable_sortie + }, + 'status': self.current_status.value + } + + +# Alias anglais +VWBDownloadToFolderAction = VWBTelechargerVersDossierAction