Feat: Action telecharger_vers_dossier

Gestion complète des téléchargements de fichiers:
- Surveillance du dossier source pour nouveaux fichiers
- Attente fin de téléchargement (fichier stable)
- Validation: taille min, extensions autorisées
- Déplacement/renommage avec templates:
  {original}, {date}, {datetime}, {annee}, etc.
- Gestion conflits de noms
- Détection fichiers temporaires (.part, .crdownload)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-14 23:25:32 +01:00
parent ce1a51c314
commit f62e8e7852
2 changed files with 476 additions and 2 deletions

View File

@@ -7,15 +7,19 @@ pour le Visual Workflow Builder.
Actions disponibles :
- VWBExtraireTableauAction : Extraction de données tabulaires
- VWBTelechargerVersDossierAction : Gestion des téléchargements
"""
from .extraire_tableau import VWBExtraireTableauAction, VWBExtractTableAction
from .telecharger_vers_dossier import VWBTelechargerVersDossierAction, VWBDownloadToFolderAction
__all__ = [
'VWBExtraireTableauAction',
'VWBExtractTableAction', # Alias anglais
'VWBExtractTableAction',
'VWBTelechargerVersDossierAction',
'VWBDownloadToFolderAction',
]
__version__ = '1.0.0'
__version__ = '1.1.0'
__author__ = 'Dom, Claude'
__date__ = '14 janvier 2026'

View File

@@ -0,0 +1,470 @@
"""
Action Télécharger vers Dossier - Gestion des téléchargements de fichiers
Auteur : Dom, Claude - 14 janvier 2026
Cette action permet de gérer les téléchargements de fichiers :
- Surveiller l'apparition d'un nouveau fichier
- Attendre la fin du téléchargement
- Déplacer/renommer le fichier
- Valider l'intégrité
Cas d'usage :
- Télécharger des rapports depuis applications web
- Exporter des fichiers depuis Citrix/VDI
- Sauvegarder des pièces jointes
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from pathlib import Path
import time
import shutil
import os
import re
import glob
from ..base_action import BaseVWBAction, VWBActionResult, VWBActionStatus
from ...contracts.error import VWBErrorType, create_vwb_error
class VWBTelechargerVersDossierAction(BaseVWBAction):
"""
Action de gestion des téléchargements.
Surveille un dossier source pour détecter un nouveau fichier,
attend la fin du téléchargement, puis déplace/renomme le fichier
vers le dossier cible.
"""
def __init__(
self,
action_id: str,
parameters: Dict[str, Any],
screen_capturer=None
):
"""
Initialise l'action de téléchargement.
Args:
action_id: Identifiant unique de l'action
parameters: Paramètres du téléchargement
screen_capturer: Instance du ScreenCapturer (optionnel)
"""
super().__init__(
action_id=action_id,
name="Télécharger vers Dossier",
description="Gère le téléchargement et déplacement de fichiers",
parameters=parameters,
screen_capturer=screen_capturer
)
# Dossiers
self.dossier_source = Path(parameters.get(
'dossier_source',
parameters.get('source_folder', self._get_dossier_telechargements())
)).expanduser()
self.dossier_cible = parameters.get('dossier_cible', parameters.get('target_folder'))
if self.dossier_cible:
self.dossier_cible = Path(self.dossier_cible).expanduser()
# Pattern de fichier à surveiller
self.pattern_fichier = parameters.get('pattern_fichier', parameters.get('file_pattern', '*'))
# Renommage
self.nouveau_nom = parameters.get('nouveau_nom', parameters.get('new_name', ''))
self.prefixe_date = parameters.get('prefixe_date', parameters.get('date_prefix', False))
self.format_date = parameters.get('format_date', parameters.get('date_format', '%Y%m%d_%H%M%S'))
# Timeouts et validation
self.timeout_detection_ms = parameters.get('timeout_detection_ms', parameters.get('detection_timeout_ms', 30000))
self.timeout_telechargement_ms = parameters.get('timeout_telechargement_ms', parameters.get('download_timeout_ms', 120000))
self.intervalle_verification_ms = parameters.get('intervalle_verification_ms', parameters.get('check_interval_ms', 500))
# Validation
self.taille_min_octets = parameters.get('taille_min_octets', parameters.get('min_size_bytes', 0))
self.extensions_autorisees = parameters.get('extensions_autorisees', parameters.get('allowed_extensions', []))
# Options
self.supprimer_source = parameters.get('supprimer_source', parameters.get('delete_source', True))
self.ecraser_existant = parameters.get('ecraser_existant', parameters.get('overwrite_existing', False))
# Variable de sortie
self.variable_sortie = parameters.get('variable_sortie', parameters.get('output_variable', 'fichier_telecharge'))
def _get_dossier_telechargements(self) -> str:
"""Retourne le dossier de téléchargements par défaut."""
# Linux
xdg_download = os.environ.get('XDG_DOWNLOAD_DIR')
if xdg_download:
return xdg_download
home = Path.home()
# Essayer les emplacements courants
for nom in ['Téléchargements', 'Downloads', 'downloads']:
chemin = home / nom
if chemin.exists():
return str(chemin)
return str(home / 'Downloads')
def validate_parameters(self) -> List[str]:
"""Valide les paramètres de l'action."""
erreurs = []
if not self.dossier_source.exists():
erreurs.append(f"Dossier source inexistant: {self.dossier_source}")
if self.dossier_cible and not self.dossier_cible.parent.exists():
erreurs.append(f"Dossier parent cible inexistant: {self.dossier_cible.parent}")
if self.timeout_detection_ms < 1000:
erreurs.append("Timeout détection minimum: 1000ms")
if self.timeout_telechargement_ms < 5000:
erreurs.append("Timeout téléchargement minimum: 5000ms")
return erreurs
def execute_core(self, step_id: str) -> VWBActionResult:
"""
Exécute la gestion du téléchargement.
Args:
step_id: Identifiant de l'étape
Returns:
Résultat avec les informations du fichier
"""
start_time = datetime.now()
try:
# Étape 1: Lister les fichiers existants
fichiers_avant = self._lister_fichiers()
print(f"📂 Surveillance de {self.dossier_source} ({len(fichiers_avant)} fichiers existants)")
# Étape 2: Attendre l'apparition d'un nouveau fichier
nouveau_fichier = self._attendre_nouveau_fichier(fichiers_avant)
if not nouveau_fichier:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.TIMEOUT_ERROR,
message=f"Aucun nouveau fichier détecté après {self.timeout_detection_ms}ms"
)
print(f"📥 Nouveau fichier détecté: {nouveau_fichier.name}")
# Étape 3: Attendre la fin du téléchargement
telechargement_ok = self._attendre_fin_telechargement(nouveau_fichier)
if not telechargement_ok:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.TIMEOUT_ERROR,
message=f"Téléchargement non terminé après {self.timeout_telechargement_ms}ms"
)
# Étape 4: Valider le fichier
validation = self._valider_fichier(nouveau_fichier)
if not validation['valide']:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.VALIDATION_FAILED,
message=validation['erreur']
)
# Étape 5: Déplacer/Renommer si demandé
fichier_final = nouveau_fichier
if self.dossier_cible or self.nouveau_nom:
fichier_final = self._deplacer_renommer(nouveau_fichier)
if not fichier_final:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message="Échec du déplacement/renommage"
)
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds() * 1000
taille = fichier_final.stat().st_size
print(f"✅ Fichier prêt: {fichier_final.name} ({self._formater_taille(taille)})")
return VWBActionResult(
action_id=self.action_id,
step_id=step_id,
status=VWBActionStatus.SUCCESS,
start_time=start_time,
end_time=end_time,
execution_time_ms=execution_time,
output_data={
'chemin_fichier': str(fichier_final),
'nom_fichier': fichier_final.name,
'taille_octets': taille,
'taille_lisible': self._formater_taille(taille),
'extension': fichier_final.suffix,
'variable_sortie': self.variable_sortie
},
evidence_list=self.evidence_list.copy()
)
except Exception as e:
return self._create_error_result(
step_id=step_id,
start_time=start_time,
error_type=VWBErrorType.SYSTEM_ERROR,
message=f"Erreur: {str(e)}",
technical_details={'exception': str(e)}
)
def _lister_fichiers(self) -> set:
"""Liste les fichiers correspondant au pattern."""
pattern_complet = str(self.dossier_source / self.pattern_fichier)
fichiers = set(glob.glob(pattern_complet))
return fichiers
def _attendre_nouveau_fichier(self, fichiers_avant: set) -> Optional[Path]:
"""
Attend l'apparition d'un nouveau fichier.
Args:
fichiers_avant: Set des fichiers existants avant
Returns:
Path du nouveau fichier ou None
"""
debut = time.time()
timeout_sec = self.timeout_detection_ms / 1000
while (time.time() - debut) < timeout_sec:
fichiers_maintenant = self._lister_fichiers()
nouveaux = fichiers_maintenant - fichiers_avant
# Filtrer les fichiers temporaires (.part, .crdownload, .tmp)
for fichier in nouveaux:
path = Path(fichier)
if not self._est_fichier_temporaire(path):
return path
time.sleep(self.intervalle_verification_ms / 1000)
return None
def _est_fichier_temporaire(self, path: Path) -> bool:
"""Vérifie si c'est un fichier temporaire de téléchargement."""
extensions_temp = {'.part', '.crdownload', '.tmp', '.download', '.partial'}
return path.suffix.lower() in extensions_temp or path.name.startswith('.')
def _attendre_fin_telechargement(self, fichier: Path) -> bool:
"""
Attend que le fichier arrête de grossir.
Args:
fichier: Path du fichier
Returns:
True si téléchargement terminé
"""
debut = time.time()
timeout_sec = self.timeout_telechargement_ms / 1000
taille_precedente = -1
compteur_stable = 0
seuil_stable = 3 # Nombre de vérifications consécutives sans changement
while (time.time() - debut) < timeout_sec:
try:
if not fichier.exists():
time.sleep(self.intervalle_verification_ms / 1000)
continue
taille_actuelle = fichier.stat().st_size
if taille_actuelle == taille_precedente:
compteur_stable += 1
if compteur_stable >= seuil_stable:
# Vérifier qu'on peut ouvrir le fichier (pas verrouillé)
try:
with open(fichier, 'rb') as f:
f.read(1)
return True
except (IOError, PermissionError):
# Fichier encore verrouillé
compteur_stable = 0
else:
compteur_stable = 0
taille_precedente = taille_actuelle
except (FileNotFoundError, PermissionError):
pass
time.sleep(self.intervalle_verification_ms / 1000)
return False
def _valider_fichier(self, fichier: Path) -> Dict[str, Any]:
"""
Valide le fichier téléchargé.
Args:
fichier: Path du fichier
Returns:
Dict avec 'valide' et 'erreur' si applicable
"""
# Vérifier l'existence
if not fichier.exists():
return {'valide': False, 'erreur': 'Fichier introuvable'}
# Vérifier la taille minimale
taille = fichier.stat().st_size
if taille < self.taille_min_octets:
return {
'valide': False,
'erreur': f"Fichier trop petit: {taille} < {self.taille_min_octets} octets"
}
# Vérifier l'extension
if self.extensions_autorisees:
ext = fichier.suffix.lower().lstrip('.')
if ext not in [e.lower().lstrip('.') for e in self.extensions_autorisees]:
return {
'valide': False,
'erreur': f"Extension non autorisée: {fichier.suffix}"
}
return {'valide': True}
def _deplacer_renommer(self, fichier: Path) -> Optional[Path]:
"""
Déplace et/ou renomme le fichier.
Args:
fichier: Path du fichier source
Returns:
Path du fichier final ou None
"""
try:
# Déterminer le nouveau nom
if self.nouveau_nom:
nouveau_nom = self._appliquer_template_nom(self.nouveau_nom, fichier)
else:
nouveau_nom = fichier.name
# Ajouter préfixe date si demandé
if self.prefixe_date:
date_str = datetime.now().strftime(self.format_date)
nouveau_nom = f"{date_str}_{nouveau_nom}"
# Déterminer le dossier cible
dossier = self.dossier_cible if self.dossier_cible else fichier.parent
# Créer le dossier cible si nécessaire
if not dossier.exists():
dossier.mkdir(parents=True, exist_ok=True)
# Chemin final
chemin_final = dossier / nouveau_nom
# Gérer les conflits
if chemin_final.exists() and not self.ecraser_existant:
chemin_final = self._generer_nom_unique(chemin_final)
# Déplacer ou copier
if self.supprimer_source:
shutil.move(str(fichier), str(chemin_final))
else:
shutil.copy2(str(fichier), str(chemin_final))
print(f"📁 Fichier {'déplacé' if self.supprimer_source else 'copié'} vers: {chemin_final}")
return chemin_final
except Exception as e:
print(f"❌ Erreur déplacement: {e}")
return None
def _appliquer_template_nom(self, template: str, fichier: Path) -> str:
"""
Applique les variables au template de nom.
Args:
template: Template avec {variables}
fichier: Fichier source
Returns:
Nom final
"""
maintenant = datetime.now()
remplacements = {
'{original}': fichier.stem,
'{extension}': fichier.suffix,
'{ext}': fichier.suffix.lstrip('.'),
'{date}': maintenant.strftime('%Y%m%d'),
'{heure}': maintenant.strftime('%H%M%S'),
'{datetime}': maintenant.strftime('%Y%m%d_%H%M%S'),
'{annee}': maintenant.strftime('%Y'),
'{mois}': maintenant.strftime('%m'),
'{jour}': maintenant.strftime('%d'),
}
resultat = template
for var, valeur in remplacements.items():
resultat = resultat.replace(var, valeur)
# S'assurer qu'il y a une extension
if not Path(resultat).suffix and fichier.suffix:
resultat += fichier.suffix
return resultat
def _generer_nom_unique(self, chemin: Path) -> Path:
"""Génère un nom unique si le fichier existe."""
compteur = 1
base = chemin.stem
ext = chemin.suffix
dossier = chemin.parent
while chemin.exists():
nouveau_nom = f"{base}_{compteur}{ext}"
chemin = dossier / nouveau_nom
compteur += 1
return chemin
def _formater_taille(self, octets: int) -> str:
"""Formate une taille en octets lisible."""
for unite in ['o', 'Ko', 'Mo', 'Go']:
if octets < 1024:
return f"{octets:.1f} {unite}"
octets /= 1024
return f"{octets:.1f} To"
def get_action_info(self) -> Dict[str, Any]:
"""Retourne les informations de l'action."""
return {
'action_id': self.action_id,
'name': self.name,
'description': self.description,
'type': 'telecharger_vers_dossier',
'parameters': {
'dossier_source': str(self.dossier_source),
'dossier_cible': str(self.dossier_cible) if self.dossier_cible else None,
'pattern': self.pattern_fichier,
'nouveau_nom': self.nouveau_nom,
'variable_sortie': self.variable_sortie
},
'status': self.current_status.value
}
# Alias anglais
VWBDownloadToFolderAction = VWBTelechargerVersDossierAction