diff --git a/core/monitoring/__init__.py b/core/monitoring/__init__.py new file mode 100644 index 000000000..651184b82 --- /dev/null +++ b/core/monitoring/__init__.py @@ -0,0 +1,145 @@ +""" +Monitoring Module - Système de monitoring et logging centralisé + +Ce module fournit: +- Logging centralisé avec métriques Prometheus (optionnel) +- Monitoring des performances +- Export des logs +- Gestion des chaînes de workflows +- Gestion des déclencheurs +""" + +# Logger - toujours disponible +from .logger import ( + LogEntry, + RPALogger, + get_logger, + workflow_logger, + execution_logger, + detection_logger, + api_logger, +) + +# Métriques Prometheus - optionnelles +PROMETHEUS_AVAILABLE = False +try: + from .metrics import ( + workflow_executions_total, + log_entries_total, + chain_executions_total, + trigger_fires_total, + workflow_duration_seconds, + chain_duration_seconds, + active_workflows, + active_chains, + error_rate, + enabled_triggers, + increment_workflow_execution, + record_workflow_duration, + increment_log_entry, + increment_chain_execution, + record_chain_duration, + increment_trigger_fire, + set_active_workflows, + set_active_chains, + set_error_rate, + set_enabled_triggers, + ) + PROMETHEUS_AVAILABLE = True +except ImportError: + # Prometheus non disponible - créer des fonctions no-op + workflow_executions_total = None + log_entries_total = None + chain_executions_total = None + trigger_fires_total = None + workflow_duration_seconds = None + chain_duration_seconds = None + active_workflows = None + active_chains = None + error_rate = None + enabled_triggers = None + + def increment_workflow_execution(*args, **kwargs): pass + def record_workflow_duration(*args, **kwargs): pass + def increment_log_entry(*args, **kwargs): pass + def increment_chain_execution(*args, **kwargs): pass + def record_chain_duration(*args, **kwargs): pass + def increment_trigger_fire(*args, **kwargs): pass + def set_active_workflows(*args, **kwargs): pass + def set_active_chains(*args, **kwargs): pass + def set_error_rate(*args, **kwargs): pass + def set_enabled_triggers(*args, **kwargs): pass + +# Automation Scheduler - import conditionnel +try: + from .automation_scheduler import AutomationScheduler +except ImportError: + AutomationScheduler = None + +# Fiche #24 - Observabilité (métriques HTTP + sécurité) - optionnel +try: + from .http_server_metrics import ( + HTTP_REQUESTS_TOTAL, + HTTP_REQUEST_DURATION_SECONDS, + HTTP_REQUESTS_IN_FLIGHT, + RPA_SECURITY_BLOCKS_TOTAL, + record_http_request, + in_flight_inc, + in_flight_dec, + record_security_block, + safe_template, + ) +except ImportError: + HTTP_REQUESTS_TOTAL = None + HTTP_REQUEST_DURATION_SECONDS = None + HTTP_REQUESTS_IN_FLIGHT = None + RPA_SECURITY_BLOCKS_TOTAL = None + def record_http_request(*args, **kwargs): pass + def in_flight_inc(*args, **kwargs): pass + def in_flight_dec(*args, **kwargs): pass + def record_security_block(*args, **kwargs): pass + def safe_template(template, **kwargs): return template.format(**kwargs) if kwargs else template + +__version__ = "1.0.0" + +__all__ = [ + "PROMETHEUS_AVAILABLE", + "LogEntry", + "RPALogger", + "get_logger", + "workflow_logger", + "execution_logger", + "detection_logger", + "api_logger", + "workflow_executions_total", + "log_entries_total", + "chain_executions_total", + "trigger_fires_total", + "workflow_duration_seconds", + "chain_duration_seconds", + "active_workflows", + "active_chains", + "error_rate", + "enabled_triggers", + "increment_workflow_execution", + "record_workflow_duration", + "increment_log_entry", + "increment_chain_execution", + "record_chain_duration", + "increment_trigger_fire", + "set_active_workflows", + "set_active_chains", + "set_error_rate", + "set_enabled_triggers", + "AutomationScheduler", + # Fiche #24 + "HTTP_REQUESTS_TOTAL", + "HTTP_REQUEST_DURATION_SECONDS", + "HTTP_REQUESTS_IN_FLIGHT", + "RPA_SECURITY_BLOCKS_TOTAL", + "record_http_request", + "in_flight_inc", + "in_flight_dec", + "record_security_block", + "safe_template", +] diff --git a/core/monitoring/automation_scheduler.py b/core/monitoring/automation_scheduler.py new file mode 100644 index 000000000..1c210e26a --- /dev/null +++ b/core/monitoring/automation_scheduler.py @@ -0,0 +1,283 @@ +""" +Automation Scheduler - Exécution automatique des triggers et chaînes + +Ce module gère l'exécution automatique des workflows et chaînes +via des déclencheurs (schedule, file, etc.). +""" + +import threading +import time +import glob +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional, Dict, Set +from dataclasses import dataclass + +from core.monitoring.logger import get_logger +from core.monitoring.chain_manager import ChainManager +from core.monitoring.trigger_manager import TriggerManager +from core.monitoring.metrics import increment_trigger_fire + +logger = get_logger('automation_scheduler') + + +@dataclass +class TriggerState: + """État d'un trigger pour le scheduling.""" + trigger_id: str + last_check: datetime + last_fired: datetime + processed_files: Set[str] + + +class AutomationScheduler: + """ + Scheduler automatique pour les triggers et chaînes. + + Fonctionne en arrière-plan et vérifie périodiquement : + - Schedule triggers : Lance workflows à intervalles réguliers + - File triggers : Surveille des répertoires pour nouveaux fichiers + - Manual triggers : Exécutés via API + """ + + def __init__( + self, + trigger_manager: TriggerManager, + chain_manager: ChainManager, + check_interval: float = 1.0 + ): + """ + Initialise le scheduler. + + Args: + trigger_manager: Gestionnaire de triggers + chain_manager: Gestionnaire de chaînes + check_interval: Intervalle de vérification en secondes + """ + self.trigger_manager = trigger_manager + self.chain_manager = chain_manager + self.check_interval = check_interval + + self.running = False + self.thread: Optional[threading.Thread] = None + + # État des triggers + self.trigger_states: Dict[str, TriggerState] = {} + + logger.info("AutomationScheduler initialized") + + def start(self): + """Démarre le scheduler en arrière-plan.""" + if self.running: + logger.warning("Scheduler already running") + return + + self.running = True + self.thread = threading.Thread(target=self._run_loop, daemon=True) + self.thread.start() + + logger.info("AutomationScheduler started") + + def stop(self): + """Arrête le scheduler.""" + if not self.running: + return + + self.running = False + if self.thread: + self.thread.join(timeout=5) + + logger.info("AutomationScheduler stopped") + + def _run_loop(self): + """Boucle principale du scheduler.""" + logger.info("Scheduler loop started") + + while self.running: + try: + # Vérifier les schedule triggers + self._check_schedule_triggers() + + # Vérifier les file triggers + self._check_file_triggers() + + # Attendre avant la prochaine vérification + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"Error in scheduler loop: {e}", error=str(e)) + time.sleep(5) # Attendre plus longtemps en cas d'erreur + + def _check_schedule_triggers(self): + """Vérifie et déclenche les schedule triggers.""" + triggers = self.trigger_manager.list_triggers() + + for trigger in triggers: + if not trigger.enabled or trigger.trigger_type != "schedule": + continue + + try: + # Initialiser l'état si nécessaire + if trigger.trigger_id not in self.trigger_states: + self.trigger_states[trigger.trigger_id] = TriggerState( + trigger_id=trigger.trigger_id, + last_check=datetime.now(), + last_fired=trigger.last_fired or datetime.min, + processed_files=set() + ) + + state = self.trigger_states[trigger.trigger_id] + + # Vérifier si l'intervalle est écoulé + interval = trigger.config.get('interval_seconds', 3600) + time_since_last_fire = (datetime.now() - state.last_fired).total_seconds() + + if time_since_last_fire >= interval: + logger.info( + f"Firing schedule trigger {trigger.trigger_id}", + trigger_id=trigger.trigger_id, + workflow_id=trigger.workflow_id, + interval=interval + ) + + # Exécuter le target (workflow ou chaîne) + success = self._execute_target(trigger.workflow_id) + + if success: + # Mettre à jour l'état + state.last_fired = datetime.now() + + # Mettre à jour le trigger + self.trigger_manager.fire_trigger(trigger.trigger_id) + + # Métriques + increment_trigger_fire(trigger.trigger_type, trigger.workflow_id) + + except Exception as e: + logger.error( + f"Error checking schedule trigger {trigger.trigger_id}: {e}", + trigger_id=trigger.trigger_id, + error=str(e) + ) + + def _check_file_triggers(self): + """Vérifie et déclenche les file triggers.""" + triggers = self.trigger_manager.list_triggers() + + for trigger in triggers: + if not trigger.enabled or trigger.trigger_type != "file": + continue + + try: + # Initialiser l'état si nécessaire + if trigger.trigger_id not in self.trigger_states: + self.trigger_states[trigger.trigger_id] = TriggerState( + trigger_id=trigger.trigger_id, + last_check=datetime.now(), + last_fired=trigger.last_fired or datetime.min, + processed_files=set() + ) + + state = self.trigger_states[trigger.trigger_id] + + # Récupérer la config + watch_directory = trigger.config.get('watch_directory') + file_pattern = trigger.config.get('file_pattern', '*') + + if not watch_directory: + continue + + # Chercher les fichiers correspondants + pattern_path = Path(watch_directory) / file_pattern + matching_files = glob.glob(str(pattern_path)) + + # Traiter les nouveaux fichiers + for file_path in matching_files: + if file_path not in state.processed_files: + logger.info( + f"New file detected for trigger {trigger.trigger_id}: {file_path}", + trigger_id=trigger.trigger_id, + file_path=file_path + ) + + # Exécuter le target avec le fichier en variable + success = self._execute_target( + trigger.workflow_id, + variables={'file_path': file_path} + ) + + if success: + # Marquer le fichier comme traité + state.processed_files.add(file_path) + + # Mettre à jour le trigger + self.trigger_manager.fire_trigger(trigger.trigger_id) + + # Métriques + increment_trigger_fire(trigger.trigger_type, trigger.workflow_id) + + # Nettoyer les fichiers traités (garder seulement les 1000 derniers) + if len(state.processed_files) > 1000: + state.processed_files = set(list(state.processed_files)[-1000:]) + + except Exception as e: + logger.error( + f"Error checking file trigger {trigger.trigger_id}: {e}", + trigger_id=trigger.trigger_id, + error=str(e) + ) + + def _execute_target(self, target_id: str, variables: Optional[Dict] = None) -> bool: + """ + Exécute un workflow ou une chaîne. + + Args: + target_id: ID du workflow ou de la chaîne + variables: Variables optionnelles pour l'exécution + + Returns: + True si succès, False sinon + """ + try: + # Vérifier si c'est une chaîne + chain = self.chain_manager.get_chain(target_id) + if chain: + logger.info(f"Executing chain {target_id}") + result = self.chain_manager.execute_chain(target_id) + return result.success + + # Sinon c'est un workflow simple + logger.info(f"Executing workflow {target_id}") + + # TODO: Intégrer avec ExecutionLoop pour exécution réelle + # Pour l'instant, on simule + logger.warning(f"Workflow execution not yet integrated with ExecutionLoop") + + # Simulation d'exécution + time.sleep(0.1) + return True + + except Exception as e: + logger.error(f"Error executing target {target_id}: {e}", error=str(e)) + return False + + def get_status(self) -> Dict: + """ + Récupère le statut du scheduler. + + Returns: + Dictionnaire avec le statut + """ + return { + 'running': self.running, + 'check_interval': self.check_interval, + 'active_triggers': len(self.trigger_states), + 'trigger_states': { + tid: { + 'last_check': state.last_check.isoformat(), + 'last_fired': state.last_fired.isoformat(), + 'processed_files_count': len(state.processed_files) + } + for tid, state in self.trigger_states.items() + } + } diff --git a/core/monitoring/chain_manager.py b/core/monitoring/chain_manager.py new file mode 100644 index 000000000..fe7b47b1c --- /dev/null +++ b/core/monitoring/chain_manager.py @@ -0,0 +1,297 @@ +""" +Chain Manager - Gestion des chaînes de workflows + +Ce module permet de créer, gérer et exécuter des séquences +ordonnées de workflows. +""" + +import json +import uuid +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Dict, Any, Callable +from dataclasses import dataclass, field, asdict + +from core.monitoring.logger import get_logger +from core.monitoring.metrics import increment_chain_execution, record_chain_duration, set_active_chains + +logger = get_logger('chain_manager') + + +@dataclass +class ChainExecutionResult: + """Résultat d'exécution d'une chaîne.""" + chain_id: str + success: bool + duration: float + workflows_executed: int + failed_at: Optional[str] = None + error_message: Optional[str] = None + + +@dataclass +class WorkflowChain: + """Chaîne de workflows.""" + chain_id: str + name: str + workflows: List[str] # Ordered list of workflow_ids + status: str = "active" # active, inactive, running + created_at: datetime = field(default_factory=datetime.now) + last_execution: Optional[datetime] = None + success_rate: float = 0.0 + execution_history: List[Dict[str, Any]] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """Convertit en dictionnaire.""" + data = asdict(self) + data['created_at'] = self.created_at.isoformat() + data['last_execution'] = self.last_execution.isoformat() if self.last_execution else None + return data + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'WorkflowChain': + """Crée depuis un dictionnaire.""" + data['created_at'] = datetime.fromisoformat(data['created_at']) + if data.get('last_execution'): + data['last_execution'] = datetime.fromisoformat(data['last_execution']) + return cls(**data) + + +class ChainManager: + """Gestionnaire de chaînes de workflows.""" + + def __init__(self, storage_path: Path): + """ + Initialise le gestionnaire. + + Args: + storage_path: Chemin du répertoire de stockage + """ + self.storage_path = Path(storage_path) + self.storage_path.mkdir(parents=True, exist_ok=True) + self.chains_file = self.storage_path / "chains.json" + self._chains: Dict[str, WorkflowChain] = {} + self._load_chains() + + def _load_chains(self): + """Charge les chaînes depuis le fichier.""" + if self.chains_file.exists(): + try: + with open(self.chains_file, 'r') as f: + data = json.load(f) + self._chains = { + chain_id: WorkflowChain.from_dict(chain_data) + for chain_id, chain_data in data.items() + } + logger.info(f"Loaded {len(self._chains)} chains") + except Exception as e: + logger.error(f"Error loading chains: {e}") + self._chains = {} + else: + self._chains = {} + + def _save_chains(self): + """Sauvegarde les chaînes dans le fichier.""" + try: + data = { + chain_id: chain.to_dict() + for chain_id, chain in self._chains.items() + } + with open(self.chains_file, 'w') as f: + json.dump(data, f, indent=2) + logger.info(f"Saved {len(self._chains)} chains") + except Exception as e: + logger.error(f"Error saving chains: {e}") + + def list_chains(self) -> List[WorkflowChain]: + """ + Liste toutes les chaînes. + + Returns: + Liste des chaînes + """ + return list(self._chains.values()) + + def get_chain(self, chain_id: str) -> Optional[WorkflowChain]: + """ + Récupère une chaîne par son ID. + + Args: + chain_id: ID de la chaîne + + Returns: + Chaîne ou None si non trouvée + """ + return self._chains.get(chain_id) + + def create_chain(self, name: str, workflows: List[str]) -> WorkflowChain: + """ + Crée une nouvelle chaîne. + + Args: + name: Nom de la chaîne + workflows: Liste ordonnée des workflow_ids + + Returns: + Chaîne créée + + Raises: + ValueError: Si les workflows n'existent pas + """ + # Valider que les workflows existent + if not self.validate_workflows_exist(workflows): + raise ValueError("One or more workflows do not exist") + + chain_id = f"chain_{uuid.uuid4().hex[:8]}" + chain = WorkflowChain( + chain_id=chain_id, + name=name, + workflows=workflows + ) + + self._chains[chain_id] = chain + self._save_chains() + + logger.info(f"Created chain {chain_id}: {name} with {len(workflows)} workflows") + return chain + + def validate_workflows_exist(self, workflow_ids: List[str]) -> bool: + """ + Valide que tous les workflows existent. + + Args: + workflow_ids: Liste des workflow_ids à valider + + Returns: + True si tous existent, False sinon + """ + # Pour l'instant, on simule la validation + # Dans une vraie implémentation, on vérifierait dans le storage + from pathlib import Path + workflows_path = Path("data/training/workflows") + + if not workflows_path.exists(): + return False + + for wf_id in workflow_ids: + wf_file = workflows_path / f"{wf_id}.json" + if not wf_file.exists(): + logger.warning(f"Workflow {wf_id} does not exist") + return False + + return True + + def execute_chain( + self, + chain_id: str, + on_progress: Optional[Callable[[str, int, int], None]] = None + ) -> ChainExecutionResult: + """ + Exécute une chaîne de workflows. + + Args: + chain_id: ID de la chaîne à exécuter + on_progress: Callback optionnel pour les mises à jour de progression + (workflow_id, current_index, total_workflows) + + Returns: + Résultat de l'exécution + + Raises: + ValueError: Si la chaîne n'existe pas + """ + chain = self.get_chain(chain_id) + if not chain: + raise ValueError(f"Chain {chain_id} not found") + + logger.info(f"Starting chain execution: {chain_id}") + start_time = datetime.now() + + # Mettre à jour le statut + chain.status = "running" + set_active_chains(sum(1 for c in self._chains.values() if c.status == "running")) + + workflows_executed = 0 + failed_at = None + error_message = None + success = True + + try: + for i, workflow_id in enumerate(chain.workflows): + if on_progress: + on_progress(workflow_id, i, len(chain.workflows)) + + logger.info(f"Executing workflow {workflow_id} ({i+1}/{len(chain.workflows)})") + + # Simuler l'exécution du workflow + # Dans une vraie implémentation, on appellerait l'ExecutionLoop + try: + # TODO: Intégrer avec ExecutionLoop + # result = execution_loop.execute_workflow(workflow_id) + # if not result.success: + # raise Exception(result.error_message) + workflows_executed += 1 + except Exception as e: + failed_at = workflow_id + error_message = str(e) + success = False + logger.error(f"Chain execution failed at {workflow_id}: {e}") + break + + finally: + # Calculer la durée + duration = (datetime.now() - start_time).total_seconds() + + # Mettre à jour le statut + chain.status = "active" + chain.last_execution = datetime.now() + + # Mettre à jour l'historique + execution_record = { + 'timestamp': datetime.now().isoformat(), + 'success': success, + 'duration': duration, + 'workflows_executed': workflows_executed, + 'failed_at': failed_at + } + chain.execution_history.append(execution_record) + + # Calculer le taux de succès + if chain.execution_history: + successes = sum(1 for r in chain.execution_history if r['success']) + chain.success_rate = (successes / len(chain.execution_history)) * 100 + + self._save_chains() + + # Métriques + increment_chain_execution(chain_id, success) + record_chain_duration(chain_id, duration) + set_active_chains(sum(1 for c in self._chains.values() if c.status == "running")) + + logger.info(f"Chain execution completed: {chain_id} (success={success}, duration={duration:.2f}s)") + + return ChainExecutionResult( + chain_id=chain_id, + success=success, + duration=duration, + workflows_executed=workflows_executed, + failed_at=failed_at, + error_message=error_message + ) + + def delete_chain(self, chain_id: str) -> bool: + """ + Supprime une chaîne. + + Args: + chain_id: ID de la chaîne à supprimer + + Returns: + True si supprimée, False si non trouvée + """ + if chain_id in self._chains: + del self._chains[chain_id] + self._save_chains() + logger.info(f"Deleted chain {chain_id}") + return True + return False diff --git a/core/monitoring/http_server_metrics.py b/core/monitoring/http_server_metrics.py new file mode 100644 index 000000000..72eecd79d --- /dev/null +++ b/core/monitoring/http_server_metrics.py @@ -0,0 +1,98 @@ +"""HTTP server metrics for Prometheus. + +Fiche #24 - Observabilité. + +Used from security middlewares (FastAPI/Flask) so we can count blocked +requests too (401/403/423/429). + +Labels are intentionally simple to avoid cardinality explosions. +""" + +from __future__ import annotations + +from typing import Optional + +from prometheus_client import Counter, Histogram, Gauge + + +HTTP_REQUESTS_TOTAL = Counter( + "http_requests_total", + "Total HTTP requests", + ["service", "method", "path", "status"], +) + +HTTP_REQUEST_DURATION_SECONDS = Histogram( + "http_request_duration_seconds", + "HTTP request duration in seconds", + ["service", "method", "path"], + buckets=( + 0.005, + 0.01, + 0.025, + 0.05, + 0.1, + 0.25, + 0.5, + 1.0, + 2.5, + 5.0, + 10.0, + float("inf"), + ), +) + +HTTP_REQUESTS_IN_FLIGHT = Gauge( + "http_requests_in_flight", + "Number of HTTP requests currently being processed", + ["service"], +) + +RPA_SECURITY_BLOCKS_TOTAL = Counter( + "rpa_security_blocks_total", + "Requests blocked by security controls", + ["service", "reason"], +) + + +def record_http_request( + service: str, + method: str, + path_template: str, + status_code: int, + duration_seconds: float, +) -> None: + """Record a completed HTTP request.""" + HTTP_REQUESTS_TOTAL.labels( + service=service, + method=method.upper(), + path=path_template, + status=str(int(status_code)), + ).inc() + HTTP_REQUEST_DURATION_SECONDS.labels( + service=service, + method=method.upper(), + path=path_template, + ).observe(max(0.0, float(duration_seconds))) + + +def in_flight_inc(service: str) -> None: + HTTP_REQUESTS_IN_FLIGHT.labels(service=service).inc() + + +def in_flight_dec(service: str) -> None: + HTTP_REQUESTS_IN_FLIGHT.labels(service=service).dec() + + +def record_security_block(service: str, reason: str) -> None: + """Increment counter when a request is blocked for *security* reasons.""" + RPA_SECURITY_BLOCKS_TOTAL.labels(service=service, reason=reason).inc() + + +def safe_template(path_template: Optional[str], fallback_path: str) -> str: + """Return a stable path label. + + Prefer framework route template when available. Fall back to raw path. + """ + if path_template and isinstance(path_template, str): + return path_template + return fallback_path \ No newline at end of file diff --git a/core/monitoring/log_exporter.py b/core/monitoring/log_exporter.py new file mode 100644 index 000000000..7ed3ba7b5 --- /dev/null +++ b/core/monitoring/log_exporter.py @@ -0,0 +1,213 @@ +""" +Log Exporter - Export des logs en format ZIP + +Ce module permet d'exporter les logs système en archives ZIP +pour analyse offline ou partage. +""" + +import json +import zipfile +import io +from datetime import datetime +from pathlib import Path +from typing import Optional, List, Dict, Any + +from core.monitoring.logger import get_logger + +logger = get_logger('log_exporter') + + +class LogExporter: + """Exporteur de logs.""" + + def __init__(self, logs_path: Path): + """ + Initialise l'exporteur. + + Args: + logs_path: Chemin du répertoire des logs + """ + self.logs_path = Path(logs_path) + self.logs_path.mkdir(parents=True, exist_ok=True) + + def export_to_zip( + self, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None + ) -> io.BytesIO: + """ + Exporte les logs en archive ZIP. + + Args: + start_time: Timestamp de début (optionnel) + end_time: Timestamp de fin (optionnel) + + Returns: + BytesIO contenant l'archive ZIP + """ + logger.info(f"Exporting logs (start={start_time}, end={end_time})") + + # Créer un ZIP en mémoire + memory_file = io.BytesIO() + + with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: + # Logs d'exécution + execution_logs = self.get_execution_logs(start_time, end_time) + zf.writestr('execution_logs.json', json.dumps(execution_logs, indent=2)) + + # Logs d'erreurs + error_logs = self.get_error_logs(start_time, end_time) + zf.writestr('error_logs.json', json.dumps(error_logs, indent=2)) + + # Métriques + metrics = self.get_metrics_summary() + zf.writestr('metrics.json', json.dumps(metrics, indent=2)) + + memory_file.seek(0) + logger.info(f"Exported {len(execution_logs)} execution logs, {len(error_logs)} error logs") + + return memory_file + + def get_execution_logs( + self, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None + ) -> List[Dict[str, Any]]: + """ + Récupère les logs d'exécution. + + Args: + start_time: Timestamp de début (optionnel) + end_time: Timestamp de fin (optionnel) + + Returns: + Liste des logs d'exécution + """ + logs = [] + + # Lire les logs depuis les fichiers + for log_file in self.logs_path.glob('*.log'): + try: + with open(log_file, 'r') as f: + for line in f: + # Parser la ligne de log + # Format: timestamp - name - level - message + parts = line.strip().split(' - ', 3) + if len(parts) >= 4: + timestamp_str = parts[0] + try: + timestamp = datetime.fromisoformat(timestamp_str) + + # Filtrer par date si spécifié + if start_time and timestamp < start_time: + continue + if end_time and timestamp > end_time: + continue + + log_entry = { + 'timestamp': timestamp_str, + 'component': parts[1].replace('rpa.', ''), + 'level': parts[2], + 'message': parts[3] + } + + # Filtrer les logs d'exécution (INFO, DEBUG) + if log_entry['level'] in ['INFO', 'DEBUG']: + logs.append(log_entry) + except (ValueError, IndexError): + continue + except Exception as e: + logger.error(f"Error reading log file {log_file}: {e}") + + # Trier par timestamp + logs.sort(key=lambda x: x['timestamp']) + + return logs + + def get_error_logs( + self, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None + ) -> List[Dict[str, Any]]: + """ + Récupère les logs d'erreurs. + + Args: + start_time: Timestamp de début (optionnel) + end_time: Timestamp de fin (optionnel) + + Returns: + Liste des logs d'erreurs + """ + logs = [] + + # Lire les logs depuis les fichiers + for log_file in self.logs_path.glob('*.log'): + try: + with open(log_file, 'r') as f: + for line in f: + # Parser la ligne de log + parts = line.strip().split(' - ', 3) + if len(parts) >= 4: + timestamp_str = parts[0] + try: + timestamp = datetime.fromisoformat(timestamp_str) + + # Filtrer par date si spécifié + if start_time and timestamp < start_time: + continue + if end_time and timestamp > end_time: + continue + + log_entry = { + 'timestamp': timestamp_str, + 'component': parts[1].replace('rpa.', ''), + 'level': parts[2], + 'message': parts[3] + } + + # Filtrer les logs d'erreurs (ERROR, WARNING) + if log_entry['level'] in ['ERROR', 'WARNING']: + logs.append(log_entry) + except (ValueError, IndexError): + continue + except Exception as e: + logger.error(f"Error reading log file {log_file}: {e}") + + # Trier par timestamp + logs.sort(key=lambda x: x['timestamp']) + + return logs + + def get_metrics_summary(self) -> Dict[str, Any]: + """ + Récupère un résumé des métriques. + + Returns: + Dictionnaire des métriques + """ + # Récupérer les métriques depuis Prometheus + try: + from prometheus_client import REGISTRY + + metrics = {} + + # Collecter les métriques + for metric in REGISTRY.collect(): + for sample in metric.samples: + metrics[sample.name] = { + 'value': sample.value, + 'labels': sample.labels, + 'type': metric.type + } + + return { + 'timestamp': datetime.now().isoformat(), + 'metrics': metrics + } + except Exception as e: + logger.error(f"Error collecting metrics: {e}") + return { + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + } diff --git a/core/monitoring/logger.py b/core/monitoring/logger.py new file mode 100644 index 000000000..dfb0637ac --- /dev/null +++ b/core/monitoring/logger.py @@ -0,0 +1,439 @@ +""" +Système de logging centralisé pour RPA Vision V3 + +Ce module fournit un logging unifié pour tous les composants avec: +- Format structuré et cohérent +- Filtrage des codes ANSI +- Nettoyage des noms de composants +- Rotation des fichiers +- Intégration Prometheus +""" + +import logging +import logging.handlers +import json +import os +import re +import sys +from datetime import datetime +from typing import Dict, Any, Optional, List +from dataclasses import dataclass, field +from pathlib import Path + +# Import des métriques Prometheus +try: + from core.monitoring.metrics import increment_log_entry + METRICS_ENABLED = True +except ImportError: + METRICS_ENABLED = False + + +# ============================================================================= +# Configuration +# ============================================================================= + +# Répertoire des logs +LOG_DIR = Path(__file__).parent.parent.parent / "logs" +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# Format unifié +LOG_FORMAT = "%(asctime)s | %(levelname)-7s | %(name)-20s | %(message)s" +LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +# Rotation +MAX_LOG_SIZE = 10 * 1024 * 1024 # 10 MB +BACKUP_COUNT = 5 # Garder 5 fichiers de backup + +# Pattern pour supprimer les codes ANSI +ANSI_ESCAPE_PATTERN = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + + +# ============================================================================= +# Filtres et Formatters +# ============================================================================= + +class ANSIStripFilter(logging.Filter): + """Filtre qui supprime les codes ANSI des messages.""" + + def filter(self, record: logging.LogRecord) -> bool: + if hasattr(record, 'msg') and isinstance(record.msg, str): + record.msg = ANSI_ESCAPE_PATTERN.sub('', record.msg) + return True + + +class CleanFormatter(logging.Formatter): + """Formatter qui nettoie les messages et assure un format cohérent.""" + + def format(self, record: logging.LogRecord) -> str: + # Nettoyer les codes ANSI du message + if hasattr(record, 'msg') and isinstance(record.msg, str): + record.msg = ANSI_ESCAPE_PATTERN.sub('', record.msg) + + # Formater + return super().format(record) + + +# ============================================================================= +# LogEntry Dataclass +# ============================================================================= + +@dataclass +class LogEntry: + """Entrée de log structurée.""" + timestamp: datetime + level: str + component: str + message: str + workflow_id: Optional[str] = None + node_id: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convertit l'entrée en dictionnaire.""" + return { + 'timestamp': self.timestamp.isoformat(), + 'level': self.level, + 'component': self.component, + 'message': self.message, + 'workflow_id': self.workflow_id, + 'node_id': self.node_id, + 'metadata': self.metadata + } + + def to_json(self) -> str: + """Convertit l'entrée en JSON.""" + return json.dumps(self.to_dict()) + + +# ============================================================================= +# RPALogger Class +# ============================================================================= + +class RPALogger: + """Logger centralisé pour le système RPA.""" + + # Liste des composants valides (whitelist) + VALID_COMPONENTS = { + 'api', 'workflow', 'execution', 'detection', 'healing', + 'monitoring', 'dashboard', 'automation_scheduler', 'chain_manager', + 'trigger_manager', 'vwb_backend', 'vwb_frontend', 'main_backend', + 'log_exporter', 'analytics', 'security', 'embedding', 'faiss', + 'coaching', 'correction_packs', 'system', 'backup', 'version', + 'test', 'test_component' + } + + @staticmethod + def _clean_component_name(component: str) -> str: + """ + Nettoie le nom du composant pour éviter les caractères corrompus. + + Args: + component: Nom du composant brut + + Returns: + Nom nettoyé sans caractères problématiques + """ + if not component: + return "unknown" + + # Convertir en minuscules et supprimer les espaces + component = component.lower().strip() + + # Ne garder que les caractères alphanumériques et underscores + cleaned = "" + for char in component: + if char.isalnum() or char == '_': + cleaned += char + elif char in ['-', '.', ' ']: + cleaned += '_' + + # Limiter la longueur + cleaned = cleaned[:30].strip('_') + + # Si le résultat est vide ou trop court, utiliser "unknown" + if not cleaned or len(cleaned) < 2: + return "unknown" + + return cleaned + + def __init__(self, component: str, log_file: Optional[str] = None): + """ + Initialise le logger. + + Args: + component: Nom du composant + log_file: Chemin du fichier de log (optionnel) + """ + # Nettoyer le nom du composant + self.component = self._clean_component_name(component) + + # Chemin du fichier de log + if log_file: + self.log_file = Path(log_file) + else: + self.log_file = LOG_DIR / f"{self.component}.log" + + # Créer le répertoire parent si nécessaire + self.log_file.parent.mkdir(parents=True, exist_ok=True) + + # Stocker les entrées en mémoire (limité) + self.entries: List[LogEntry] = [] + self._max_entries = 1000 + + # Configuration du logger Python standard + self.logger = logging.getLogger(f"rpa.{self.component}") + self.logger.setLevel(logging.DEBUG) + + # Éviter les handlers dupliqués + if not self.logger.handlers: + self._setup_handlers() + + def _setup_handlers(self): + """Configure les handlers de logging.""" + # Formatter commun + formatter = CleanFormatter(LOG_FORMAT, LOG_DATE_FORMAT) + + # Handler fichier avec rotation + try: + file_handler = logging.handlers.RotatingFileHandler( + self.log_file, + maxBytes=MAX_LOG_SIZE, + backupCount=BACKUP_COUNT, + encoding='utf-8' + ) + file_handler.setFormatter(formatter) + file_handler.setLevel(logging.DEBUG) + file_handler.addFilter(ANSIStripFilter()) + self.logger.addHandler(file_handler) + except Exception as e: + print(f"Warning: Could not create file handler for {self.component}: {e}") + + # Handler console (seulement si pas en mode silencieux) + if os.getenv('RPA_LOG_CONSOLE', 'false').lower() == 'true': + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + console_handler.setLevel(logging.INFO) + console_handler.addFilter(ANSIStripFilter()) + self.logger.addHandler(console_handler) + + def _log(self, level: str, message: str, workflow_id: Optional[str] = None, + node_id: Optional[str] = None, **metadata): + """ + Log interne avec métriques. + + Args: + level: Niveau de log (INFO, WARNING, ERROR, DEBUG) + message: Message de log + workflow_id: ID du workflow (optionnel) + node_id: ID du nœud (optionnel) + **metadata: Métadonnées additionnelles + """ + # Nettoyer le message des codes ANSI + clean_message = ANSI_ESCAPE_PATTERN.sub('', str(message)) + + # Créer l'entrée + entry = LogEntry( + timestamp=datetime.now(), + level=level, + component=self.component, + message=clean_message, + workflow_id=workflow_id, + node_id=node_id, + metadata=metadata + ) + + # Stocker en mémoire (avec limite) + self.entries.append(entry) + if len(self.entries) > self._max_entries: + self.entries = self.entries[-self._max_entries:] + + # Incrémenter le compteur Prometheus + if METRICS_ENABLED: + try: + increment_log_entry(level, self.component) + except Exception: + pass + + # Construire le message formaté + log_msg = clean_message + if workflow_id: + log_msg += f" | workflow={workflow_id}" + if node_id: + log_msg += f" | node={node_id}" + if metadata: + # Formater les métadonnées de manière compacte + meta_str = " | ".join(f"{k}={v}" for k, v in metadata.items()) + log_msg += f" | {meta_str}" + + # Log via le logger Python + log_method = getattr(self.logger, level.lower(), self.logger.info) + log_method(log_msg) + + def info(self, message: str, workflow_id: Optional[str] = None, + node_id: Optional[str] = None, **metadata): + """Log niveau INFO.""" + self._log('INFO', message, workflow_id, node_id, **metadata) + + def warning(self, message: str, workflow_id: Optional[str] = None, + node_id: Optional[str] = None, **metadata): + """Log niveau WARNING.""" + self._log('WARNING', message, workflow_id, node_id, **metadata) + + def error(self, message: str, workflow_id: Optional[str] = None, + node_id: Optional[str] = None, **metadata): + """Log niveau ERROR.""" + self._log('ERROR', message, workflow_id, node_id, **metadata) + + def debug(self, message: str, workflow_id: Optional[str] = None, + node_id: Optional[str] = None, **metadata): + """Log niveau DEBUG.""" + self._log('DEBUG', message, workflow_id, node_id, **metadata) + + def critical(self, message: str, workflow_id: Optional[str] = None, + node_id: Optional[str] = None, **metadata): + """Log niveau CRITICAL.""" + self._log('CRITICAL', message, workflow_id, node_id, **metadata) + + def workflow_start(self, workflow_id: str, **metadata): + """Log démarrage de workflow.""" + self.info("Workflow started", workflow_id=workflow_id, **metadata) + + def workflow_end(self, workflow_id: str, success: bool, duration: float, **metadata): + """Log fin de workflow.""" + status = 'success' if success else 'failure' + self.info( + f"Workflow completed: {status}", + workflow_id=workflow_id, + success=success, + duration_s=round(duration, 3), + **metadata + ) + + def action_executed(self, workflow_id: str, node_id: str, action_type: str, + success: bool, confidence: float = 0.0, **metadata): + """Log exécution d'une action.""" + level = 'INFO' if success else 'WARNING' + self._log( + level, + f"Action executed: {action_type}", + workflow_id=workflow_id, + node_id=node_id, + success=success, + confidence=round(confidence, 3), + **metadata + ) + + def get_recent_logs(self, limit: int = 100) -> List[LogEntry]: + """ + Récupère les logs récents. + + Args: + limit: Nombre maximum de logs à retourner + + Returns: + Liste des entrées de log récentes + """ + return self.entries[-limit:] + + def export_logs(self, start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + format: str = 'json') -> str: + """ + Exporte les logs. + + Args: + start_time: Timestamp de début (optionnel) + end_time: Timestamp de fin (optionnel) + format: Format d'export ('json' ou 'text') + + Returns: + Logs exportés + """ + filtered_logs = self.entries + + if start_time: + filtered_logs = [log for log in filtered_logs if log.timestamp >= start_time] + if end_time: + filtered_logs = [log for log in filtered_logs if log.timestamp <= end_time] + + if format == 'json': + return json.dumps([log.to_dict() for log in filtered_logs], indent=2) + else: + lines = [] + for log in filtered_logs: + line = f"{log.timestamp.strftime(LOG_DATE_FORMAT)} | {log.level:7} | {log.component:20} | {log.message}" + lines.append(line) + return "\n".join(lines) + + +# ============================================================================= +# Global Logger Management +# ============================================================================= + +# Instance globale pour chaque composant +_loggers: Dict[str, RPALogger] = {} + + +def get_logger(component: str) -> RPALogger: + """ + Récupère ou crée un logger pour un composant. + + Args: + component: Nom du composant + + Returns: + Logger pour le composant + """ + # Nettoyer le nom avant de chercher + clean_name = RPALogger._clean_component_name(component) + + if clean_name not in _loggers: + _loggers[clean_name] = RPALogger(component) + + return _loggers[clean_name] + + +def configure_root_logger(): + """ + Configure le logger racine pour capturer tous les logs Python. + Appelé une seule fois au démarrage. + """ + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + + # Formatter + formatter = CleanFormatter(LOG_FORMAT, LOG_DATE_FORMAT) + + # Handler fichier principal + main_log = LOG_DIR / "main.log" + try: + handler = logging.handlers.RotatingFileHandler( + main_log, + maxBytes=MAX_LOG_SIZE, + backupCount=BACKUP_COUNT, + encoding='utf-8' + ) + handler.setFormatter(formatter) + handler.addFilter(ANSIStripFilter()) + root_logger.addHandler(handler) + except Exception: + pass + + # Réduire le bruit des bibliothèques externes + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('werkzeug').setLevel(logging.WARNING) + logging.getLogger('socketio').setLevel(logging.WARNING) + logging.getLogger('engineio').setLevel(logging.WARNING) + + +# ============================================================================= +# Pre-configured Loggers +# ============================================================================= + +# Loggers pré-configurés pour les composants principaux +workflow_logger = get_logger('workflow') +execution_logger = get_logger('execution') +detection_logger = get_logger('detection') +api_logger = get_logger('api') +monitoring_logger = get_logger('monitoring') +security_logger = get_logger('security') diff --git a/core/monitoring/metrics.py b/core/monitoring/metrics.py new file mode 100644 index 000000000..685309b54 --- /dev/null +++ b/core/monitoring/metrics.py @@ -0,0 +1,206 @@ +""" +Métriques Prometheus pour RPA Vision + +Ce module définit toutes les métriques Prometheus utilisées +pour monitorer le système RPA. +""" + +from prometheus_client import Counter, Histogram, Gauge, Info + +# ============================================================================= +# Counters - Métriques qui ne font qu'augmenter +# ============================================================================= + +workflow_executions_total = Counter( + 'workflow_executions_total', + 'Total workflow executions', + ['workflow_id', 'status'] +) + +log_entries_total = Counter( + 'log_entries_total', + 'Total log entries', + ['level', 'component'] +) + +chain_executions_total = Counter( + 'chain_executions_total', + 'Total chain executions', + ['chain_id', 'status'] +) + +trigger_fires_total = Counter( + 'trigger_fires_total', + 'Total trigger fires', + ['trigger_type', 'workflow_id'] +) + +# ============================================================================= +# Histograms - Distribution de valeurs +# ============================================================================= + +workflow_duration_seconds = Histogram( + 'workflow_duration_seconds', + 'Workflow execution duration in seconds', + ['workflow_id'], + buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, float('inf')) +) + +chain_duration_seconds = Histogram( + 'chain_duration_seconds', + 'Chain execution duration in seconds', + ['chain_id'], + buckets=(1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, float('inf')) +) + +# ============================================================================= +# Gauges - Valeurs qui peuvent augmenter ou diminuer +# ============================================================================= + +active_workflows = Gauge( + 'active_workflows', + 'Number of currently active workflows' +) + +active_chains = Gauge( + 'active_chains', + 'Number of currently active chains' +) + +error_rate = Gauge( + 'error_rate', + 'Current error rate percentage' +) + +enabled_triggers = Gauge( + 'enabled_triggers', + 'Number of enabled triggers' +) + +# ============================================================================= +# Info - Informations système +# ============================================================================= + +system_info = Info( + 'rpa_system', + 'RPA system information' +) + +# Initialiser les informations système +system_info.info({ + 'version': '3.0.0', + 'component': 'rpa_vision', + 'monitoring': 'enabled' +}) + + +# ============================================================================= +# Fonctions utilitaires +# ============================================================================= + +def increment_workflow_execution(workflow_id: str, success: bool): + """ + Incrémente le compteur d'exécution de workflow. + + Args: + workflow_id: ID du workflow + success: True si succès, False si échec + """ + status = 'success' if success else 'failure' + workflow_executions_total.labels(workflow_id=workflow_id, status=status).inc() + + +def record_workflow_duration(workflow_id: str, duration: float): + """ + Enregistre la durée d'exécution d'un workflow. + + Args: + workflow_id: ID du workflow + duration: Durée en secondes + """ + workflow_duration_seconds.labels(workflow_id=workflow_id).observe(duration) + + +def increment_log_entry(level: str, component: str): + """ + Incrémente le compteur d'entrées de log. + + Args: + level: Niveau de log (INFO, WARNING, ERROR, DEBUG) + component: Composant source + """ + log_entries_total.labels(level=level, component=component).inc() + + +def increment_chain_execution(chain_id: str, success: bool): + """ + Incrémente le compteur d'exécution de chaîne. + + Args: + chain_id: ID de la chaîne + success: True si succès, False si échec + """ + status = 'success' if success else 'failure' + chain_executions_total.labels(chain_id=chain_id, status=status).inc() + + +def record_chain_duration(chain_id: str, duration: float): + """ + Enregistre la durée d'exécution d'une chaîne. + + Args: + chain_id: ID de la chaîne + duration: Durée en secondes + """ + chain_duration_seconds.labels(chain_id=chain_id).observe(duration) + + +def increment_trigger_fire(trigger_type: str, workflow_id: str): + """ + Incrémente le compteur de déclenchements. + + Args: + trigger_type: Type de trigger (schedule, file, manual) + workflow_id: ID du workflow déclenché + """ + trigger_fires_total.labels(trigger_type=trigger_type, workflow_id=workflow_id).inc() + + +def set_active_workflows(count: int): + """ + Met à jour le nombre de workflows actifs. + + Args: + count: Nombre de workflows actifs + """ + active_workflows.set(count) + + +def set_active_chains(count: int): + """ + Met à jour le nombre de chaînes actives. + + Args: + count: Nombre de chaînes actives + """ + active_chains.set(count) + + +def set_error_rate(rate: float): + """ + Met à jour le taux d'erreur. + + Args: + rate: Taux d'erreur en pourcentage (0-100) + """ + error_rate.set(rate) + + +def set_enabled_triggers(count: int): + """ + Met à jour le nombre de triggers activés. + + Args: + count: Nombre de triggers activés + """ + enabled_triggers.set(count) diff --git a/core/monitoring/trigger_manager.py b/core/monitoring/trigger_manager.py new file mode 100644 index 000000000..6b9706cac --- /dev/null +++ b/core/monitoring/trigger_manager.py @@ -0,0 +1,286 @@ +""" +Trigger Manager - Gestion des déclencheurs de workflows + +Ce module permet de créer et gérer des déclencheurs automatiques +pour l'exécution de workflows. +""" + +import json +import uuid +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Dict, Any +from dataclasses import dataclass, field, asdict + +from core.monitoring.logger import get_logger +from core.monitoring.metrics import increment_trigger_fire, set_enabled_triggers + +logger = get_logger('trigger_manager') + + +@dataclass +class Trigger: + """Déclencheur de workflow.""" + trigger_id: str + trigger_type: str # schedule, file, manual + workflow_id: str + config: Dict[str, Any] + enabled: bool = True + created_at: datetime = field(default_factory=datetime.now) + last_fired: Optional[datetime] = None + fire_count: int = 0 + + def to_dict(self) -> Dict[str, Any]: + """Convertit en dictionnaire.""" + data = asdict(self) + data['created_at'] = self.created_at.isoformat() + data['last_fired'] = self.last_fired.isoformat() if self.last_fired else None + return data + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Trigger': + """Crée depuis un dictionnaire.""" + data['created_at'] = datetime.fromisoformat(data['created_at']) + if data.get('last_fired'): + data['last_fired'] = datetime.fromisoformat(data['last_fired']) + return cls(**data) + + +class TriggerManager: + """Gestionnaire de déclencheurs.""" + + def __init__(self, storage_path: Path): + """ + Initialise le gestionnaire. + + Args: + storage_path: Chemin du répertoire de stockage + """ + self.storage_path = Path(storage_path) + self.storage_path.mkdir(parents=True, exist_ok=True) + self.triggers_file = self.storage_path / "triggers.json" + self._triggers: Dict[str, Trigger] = {} + self._load_triggers() + + def _load_triggers(self): + """Charge les triggers depuis le fichier.""" + if self.triggers_file.exists(): + try: + with open(self.triggers_file, 'r') as f: + data = json.load(f) + self._triggers = { + trigger_id: Trigger.from_dict(trigger_data) + for trigger_id, trigger_data in data.items() + } + logger.info(f"Loaded {len(self._triggers)} triggers") + self._update_metrics() + except Exception as e: + logger.error(f"Error loading triggers: {e}") + self._triggers = {} + else: + self._triggers = {} + + def _save_triggers(self): + """Sauvegarde les triggers dans le fichier.""" + try: + data = { + trigger_id: trigger.to_dict() + for trigger_id, trigger in self._triggers.items() + } + with open(self.triggers_file, 'w') as f: + json.dump(data, f, indent=2) + logger.info(f"Saved {len(self._triggers)} triggers") + self._update_metrics() + except Exception as e: + logger.error(f"Error saving triggers: {e}") + + def _update_metrics(self): + """Met à jour les métriques Prometheus.""" + enabled_count = sum(1 for t in self._triggers.values() if t.enabled) + set_enabled_triggers(enabled_count) + + def list_triggers(self) -> List[Trigger]: + """ + Liste tous les triggers. + + Returns: + Liste des triggers + """ + return list(self._triggers.values()) + + def get_trigger(self, trigger_id: str) -> Optional[Trigger]: + """ + Récupère un trigger par son ID. + + Args: + trigger_id: ID du trigger + + Returns: + Trigger ou None si non trouvé + """ + return self._triggers.get(trigger_id) + + def create_trigger( + self, + trigger_type: str, + workflow_id: str, + config: Dict[str, Any] + ) -> Trigger: + """ + Crée un nouveau trigger. + + Args: + trigger_type: Type de trigger (schedule, file, manual) + workflow_id: ID du workflow à déclencher + config: Configuration du trigger + + Returns: + Trigger créé + + Raises: + ValueError: Si la configuration est invalide + """ + # Valider la configuration + if not self.validate_config(trigger_type, config): + raise ValueError(f"Invalid configuration for trigger type {trigger_type}") + + trigger_id = f"trigger_{uuid.uuid4().hex[:8]}" + trigger = Trigger( + trigger_id=trigger_id, + trigger_type=trigger_type, + workflow_id=workflow_id, + config=config + ) + + self._triggers[trigger_id] = trigger + self._save_triggers() + + logger.info(f"Created trigger {trigger_id}: {trigger_type} for workflow {workflow_id}") + return trigger + + def validate_config(self, trigger_type: str, config: Dict[str, Any]) -> bool: + """ + Valide la configuration d'un trigger. + + Args: + trigger_type: Type de trigger + config: Configuration à valider + + Returns: + True si valide, False sinon + """ + if trigger_type == "schedule": + # Valider la configuration schedule + if 'interval_seconds' not in config: + logger.warning("Schedule trigger missing interval_seconds") + return False + + interval = config['interval_seconds'] + if not isinstance(interval, (int, float)) or interval <= 0: + logger.warning(f"Invalid interval_seconds: {interval}") + return False + + return True + + elif trigger_type == "file": + # Valider la configuration file + if 'watch_directory' not in config or 'file_pattern' not in config: + logger.warning("File trigger missing watch_directory or file_pattern") + return False + + return True + + elif trigger_type == "manual": + # Pas de validation spécifique pour manual + return True + + else: + logger.warning(f"Unknown trigger type: {trigger_type}") + return False + + def enable_trigger(self, trigger_id: str) -> bool: + """ + Active un trigger. + + Args: + trigger_id: ID du trigger + + Returns: + True si activé, False si non trouvé + """ + trigger = self.get_trigger(trigger_id) + if trigger: + trigger.enabled = True + self._save_triggers() + logger.info(f"Enabled trigger {trigger_id}") + return True + return False + + def disable_trigger(self, trigger_id: str) -> bool: + """ + Désactive un trigger. + + Args: + trigger_id: ID du trigger + + Returns: + True si désactivé, False si non trouvé + """ + trigger = self.get_trigger(trigger_id) + if trigger: + trigger.enabled = False + self._save_triggers() + logger.info(f"Disabled trigger {trigger_id}") + return True + return False + + def fire_trigger(self, trigger_id: str) -> bool: + """ + Déclenche manuellement un trigger. + + Args: + trigger_id: ID du trigger + + Returns: + True si déclenché, False si non trouvé ou désactivé + """ + trigger = self.get_trigger(trigger_id) + if not trigger: + logger.warning(f"Trigger {trigger_id} not found") + return False + + if not trigger.enabled: + logger.warning(f"Trigger {trigger_id} is disabled") + return False + + # Mettre à jour les stats + trigger.last_fired = datetime.now() + trigger.fire_count += 1 + self._save_triggers() + + # Métriques + increment_trigger_fire(trigger.trigger_type, trigger.workflow_id) + + logger.info(f"Fired trigger {trigger_id} for workflow {trigger.workflow_id}") + + # TODO: Intégrer avec ExecutionLoop pour lancer le workflow + # execution_loop.execute_workflow(trigger.workflow_id) + + return True + + def delete_trigger(self, trigger_id: str) -> bool: + """ + Supprime un trigger. + + Args: + trigger_id: ID du trigger à supprimer + + Returns: + True si supprimé, False si non trouvé + """ + if trigger_id in self._triggers: + del self._triggers[trigger_id] + self._save_triggers() + logger.info(f"Deleted trigger {trigger_id}") + return True + return False