feat: outils gestion fichiers dans le VWB (📁 Fichiers)

- 5 actions : lister, créer dossier, déplacer, copier, classer par extension
- Exécution sur Windows via agent port 5006
- Sécurité chemins (bloque C:\Windows, /etc, etc.)
- Propriétés panel + preview canvas pour chaque action

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-18 16:05:36 +01:00
parent 97d708c6f5
commit 40e5fba86c
10 changed files with 898 additions and 9 deletions

View File

@@ -0,0 +1,16 @@
"""
Actions de gestion de fichiers pour le VWB.
Auteur : Dom, Claude — 18 mars 2026
Actions :
- file_list_dir : Lister les fichiers d'un dossier
- file_create_dir : Créer un dossier
- file_move : Déplacer un fichier
- file_copy : Copier un fichier
- file_sort_by_ext : Classer les fichiers par extension
"""
from .file_actions import FileActionHandler
__all__ = ['FileActionHandler']

View File

@@ -0,0 +1,353 @@
"""
Gestionnaire d'actions fichiers pour le VWB.
Auteur : Dom, Claude — 18 mars 2026
Opérations de gestion de fichiers : lister, créer dossier, déplacer,
copier, classer par extension. Compatible chemins Windows (backslash)
et Linux (slash).
Sécurité : validation des chemins pour empêcher le path traversal
en dehors des répertoires autorisés.
"""
import fnmatch
import logging
import os
import shutil
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Répertoires autorisés par défaut sur Windows (racines légitimes)
# On autorise les chemins sous les profils utilisateur et les lecteurs standards
_WINDOWS_ALLOWED_ROOTS = [
"C:\\Users",
"D:\\",
"E:\\",
]
# Répertoires autorisés sur Linux
_LINUX_ALLOWED_ROOTS = [
"/home",
"/tmp",
]
def _normalize_path(path_str: str) -> str:
"""Normalise un chemin en résolvant .. et en nettoyant les séparateurs."""
# Déterminer si c'est un chemin Windows (contient : comme C:\ ou \)
if ':' in path_str[:3] or path_str.startswith('\\\\'):
# Chemin Windows : normaliser avec ntpath
import ntpath
return ntpath.normpath(path_str)
else:
return os.path.normpath(path_str)
def _is_safe_path(path_str: str) -> bool:
"""Vérifie qu'un chemin est sûr (pas de traversal hors zones autorisées).
Bloque les chemins contenant des séquences de traversal dangereuses
et vérifie que le chemin résolu reste dans une zone autorisée.
"""
normalized = _normalize_path(path_str)
# Bloquer les chemins vides
if not normalized or normalized.strip() in ('', '.'):
return False
# Déterminer le système de fichiers cible
is_windows = ':' in normalized[:3] or normalized.startswith('\\\\')
if is_windows:
allowed_roots = _WINDOWS_ALLOWED_ROOTS
# Vérifier que le chemin est sous une racine autorisée
norm_upper = normalized.upper()
return any(norm_upper.startswith(root.upper()) for root in allowed_roots)
else:
allowed_roots = _LINUX_ALLOWED_ROOTS
return any(normalized.startswith(root) for root in allowed_roots)
class FileActionHandler:
"""Gère les opérations sur les fichiers.
Peut s'exécuter en local (serveur Linux) ou les paramètres
peuvent être transmis à l'agent Windows via le proxy.
"""
def __init__(self, allowed_roots: Optional[List[str]] = None):
"""Initialise le gestionnaire.
Args:
allowed_roots: Racines autorisées supplémentaires (optionnel).
"""
if allowed_roots:
_WINDOWS_ALLOWED_ROOTS.extend(
r for r in allowed_roots if r not in _WINDOWS_ALLOWED_ROOTS
)
def execute(self, action_type: str, params: dict, target: str = 'local') -> dict:
"""Exécute une action fichier.
Args:
action_type: Type d'action (file_list_dir, file_create_dir, etc.)
params: Paramètres de l'action
target: 'local' pour exécution serveur, 'windows' pour l'agent
Returns:
Dict avec le résultat ou une erreur
"""
handlers = {
'file_list_dir': self._list_dir,
'file_create_dir': self._create_dir,
'file_move': self._move_file,
'file_copy': self._copy_file,
'file_sort_by_ext': self._sort_by_extension,
}
handler = handlers.get(action_type)
if not handler:
return {'error': f"Type d'action fichier inconnu : {action_type}"}
try:
return handler(params)
except Exception as e:
logger.error("Erreur action fichier '%s' : %s", action_type, e)
return {'error': str(e)}
def _list_dir(self, params: dict) -> dict:
"""Liste les fichiers d'un dossier.
Args:
params: {'path': str, 'pattern': str (optionnel, défaut '*')}
Returns:
{'files': [...], 'count': int, 'extensions': {...}}
"""
path_str = params.get('path', '')
pattern = params.get('pattern', '*')
if not path_str:
return {'error': "Paramètre 'path' requis"}
if not _is_safe_path(path_str):
return {'error': f"Chemin non autorisé : {path_str}"}
source = Path(path_str)
if not source.exists():
return {'error': f"Dossier introuvable : {path_str}"}
if not source.is_dir():
return {'error': f"Le chemin n'est pas un dossier : {path_str}"}
files = []
extensions: Dict[str, int] = {}
for item in source.iterdir():
if item.is_file():
# Appliquer le filtre glob
if not fnmatch.fnmatch(item.name, pattern):
continue
ext = item.suffix.lstrip('.').lower() or 'sans_extension'
size = item.stat().st_size
files.append({
'name': item.name,
'extension': ext,
'size': size,
'path': str(item),
})
extensions[ext] = extensions.get(ext, 0) + 1
logger.info(
"Listage dossier '%s' : %d fichiers, extensions : %s",
path_str, len(files), extensions,
)
return {
'files': files,
'count': len(files),
'extensions': extensions,
'path': path_str,
}
def _create_dir(self, params: dict) -> dict:
"""Crée un dossier (et les sous-dossiers si nécessaire).
Args:
params: {'path': str}
Returns:
{'created': bool, 'path': str}
"""
path_str = params.get('path', '')
if not path_str:
return {'error': "Paramètre 'path' requis"}
if not _is_safe_path(path_str):
return {'error': f"Chemin non autorisé : {path_str}"}
target = Path(path_str)
already_exists = target.exists()
target.mkdir(parents=True, exist_ok=True)
logger.info(
"Dossier '%s' %s",
path_str,
"existait déjà" if already_exists else "créé",
)
return {
'created': not already_exists,
'path': path_str,
'already_existed': already_exists,
}
def _move_file(self, params: dict) -> dict:
"""Déplace ou renomme un fichier.
Args:
params: {'source': str, 'destination': str}
Returns:
{'moved': bool, 'source': str, 'destination': str}
"""
source_str = params.get('source', '')
dest_str = params.get('destination', '')
if not source_str or not dest_str:
return {'error': "Paramètres 'source' et 'destination' requis"}
if not _is_safe_path(source_str):
return {'error': f"Chemin source non autorisé : {source_str}"}
if not _is_safe_path(dest_str):
return {'error': f"Chemin destination non autorisé : {dest_str}"}
source = Path(source_str)
if not source.exists():
return {'error': f"Fichier source introuvable : {source_str}"}
dest = Path(dest_str)
# Créer le dossier parent si nécessaire
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(dest))
logger.info("Fichier déplacé : '%s''%s'", source_str, dest_str)
return {
'moved': True,
'source': source_str,
'destination': dest_str,
}
def _copy_file(self, params: dict) -> dict:
"""Copie un fichier vers un autre emplacement.
Args:
params: {'source': str, 'destination': str}
Returns:
{'copied': bool, 'source': str, 'destination': str}
"""
source_str = params.get('source', '')
dest_str = params.get('destination', '')
if not source_str or not dest_str:
return {'error': "Paramètres 'source' et 'destination' requis"}
if not _is_safe_path(source_str):
return {'error': f"Chemin source non autorisé : {source_str}"}
if not _is_safe_path(dest_str):
return {'error': f"Chemin destination non autorisé : {dest_str}"}
source = Path(source_str)
if not source.exists():
return {'error': f"Fichier source introuvable : {source_str}"}
dest = Path(dest_str)
# Créer le dossier parent si nécessaire
dest.parent.mkdir(parents=True, exist_ok=True)
if source.is_dir():
shutil.copytree(str(source), str(dest))
else:
shutil.copy2(str(source), str(dest))
logger.info("Fichier copié : '%s''%s'", source_str, dest_str)
return {
'copied': True,
'source': source_str,
'destination': dest_str,
}
def _sort_by_extension(self, params: dict) -> dict:
"""Classe les fichiers par extension dans des sous-dossiers.
Crée un sous-dossier par extension (pdf/, docx/, jpg/, etc.)
et déplace chaque fichier dans le sous-dossier correspondant.
Args:
params: {'source_dir': str, 'create_subdirs': bool (défaut True)}
Returns:
{'moved': [...], 'count': int, 'extensions': {...}}
"""
source_dir_str = params.get('source_dir', '')
create_subdirs = params.get('create_subdirs', True)
if not source_dir_str:
return {'error': "Paramètre 'source_dir' requis"}
if not _is_safe_path(source_dir_str):
return {'error': f"Chemin non autorisé : {source_dir_str}"}
source = Path(source_dir_str)
if not source.exists():
return {'error': f"Dossier introuvable : {source_dir_str}"}
if not source.is_dir():
return {'error': f"Le chemin n'est pas un dossier : {source_dir_str}"}
moved = []
extensions: Dict[str, int] = {}
for file in source.iterdir():
if file.is_file():
ext = file.suffix.lstrip('.').lower() or 'sans_extension'
target_dir = source / ext
if create_subdirs:
target_dir.mkdir(exist_ok=True)
elif not target_dir.exists():
logger.warning(
"Sous-dossier '%s' inexistant et create_subdirs=False, fichier ignoré : %s",
ext, file.name,
)
continue
dest = target_dir / file.name
# Éviter d'écraser un fichier existant
if dest.exists():
base = file.stem
counter = 1
while dest.exists():
dest = target_dir / f"{base}_{counter}{file.suffix}"
counter += 1
shutil.move(str(file), str(dest))
moved.append({
'file': file.name,
'to': ext,
'destination': str(dest),
})
extensions[ext] = extensions.get(ext, 0) + 1
logger.info(
"Classement par extension dans '%s' : %d fichiers déplacés, extensions : %s",
source_dir_str, len(moved), extensions,
)
return {
'moved': moved,
'count': len(moved),
'extensions': extensions,
'source_dir': source_dir_str,
}