fix(logging): Corriger système de logging avec format unifié et filtrage ANSI

- Réécrire logger.py avec format cohérent: "timestamp | level | component | message"
- Ajouter ANSIStripFilter pour nettoyer les codes couleur des logs
- Implémenter _clean_component_name() pour éviter les noms de fichiers corrompus
- Configurer rotation des fichiers (10MB, 5 backups)
- Rendre imports Prometheus optionnels dans __init__.py
- Réduire bruit des librairies externes (werkzeug, urllib3, etc.)

Corrige les problèmes de fichiers logs corrompus (!0.log, #0.log, %0.log, etc.)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-19 15:34:34 +01:00
parent 4460b63912
commit 758e671732
8 changed files with 1967 additions and 0 deletions

145
core/monitoring/__init__.py Normal file
View File

@@ -0,0 +1,145 @@
"""
Monitoring Module - Système de monitoring et logging centralisé
Ce module fournit:
- Logging centralisé avec métriques Prometheus (optionnel)
- Monitoring des performances
- Export des logs
- Gestion des chaînes de workflows
- Gestion des déclencheurs
"""
# Logger - toujours disponible
from .logger import (
LogEntry,
RPALogger,
get_logger,
workflow_logger,
execution_logger,
detection_logger,
api_logger,
)
# Métriques Prometheus - optionnelles
PROMETHEUS_AVAILABLE = False
try:
from .metrics import (
workflow_executions_total,
log_entries_total,
chain_executions_total,
trigger_fires_total,
workflow_duration_seconds,
chain_duration_seconds,
active_workflows,
active_chains,
error_rate,
enabled_triggers,
increment_workflow_execution,
record_workflow_duration,
increment_log_entry,
increment_chain_execution,
record_chain_duration,
increment_trigger_fire,
set_active_workflows,
set_active_chains,
set_error_rate,
set_enabled_triggers,
)
PROMETHEUS_AVAILABLE = True
except ImportError:
# Prometheus non disponible - créer des fonctions no-op
workflow_executions_total = None
log_entries_total = None
chain_executions_total = None
trigger_fires_total = None
workflow_duration_seconds = None
chain_duration_seconds = None
active_workflows = None
active_chains = None
error_rate = None
enabled_triggers = None
def increment_workflow_execution(*args, **kwargs): pass
def record_workflow_duration(*args, **kwargs): pass
def increment_log_entry(*args, **kwargs): pass
def increment_chain_execution(*args, **kwargs): pass
def record_chain_duration(*args, **kwargs): pass
def increment_trigger_fire(*args, **kwargs): pass
def set_active_workflows(*args, **kwargs): pass
def set_active_chains(*args, **kwargs): pass
def set_error_rate(*args, **kwargs): pass
def set_enabled_triggers(*args, **kwargs): pass
# Automation Scheduler - import conditionnel
try:
from .automation_scheduler import AutomationScheduler
except ImportError:
AutomationScheduler = None
# Fiche #24 - Observabilité (métriques HTTP + sécurité) - optionnel
try:
from .http_server_metrics import (
HTTP_REQUESTS_TOTAL,
HTTP_REQUEST_DURATION_SECONDS,
HTTP_REQUESTS_IN_FLIGHT,
RPA_SECURITY_BLOCKS_TOTAL,
record_http_request,
in_flight_inc,
in_flight_dec,
record_security_block,
safe_template,
)
except ImportError:
HTTP_REQUESTS_TOTAL = None
HTTP_REQUEST_DURATION_SECONDS = None
HTTP_REQUESTS_IN_FLIGHT = None
RPA_SECURITY_BLOCKS_TOTAL = None
def record_http_request(*args, **kwargs): pass
def in_flight_inc(*args, **kwargs): pass
def in_flight_dec(*args, **kwargs): pass
def record_security_block(*args, **kwargs): pass
def safe_template(template, **kwargs): return template.format(**kwargs) if kwargs else template
__version__ = "1.0.0"
__all__ = [
"PROMETHEUS_AVAILABLE",
"LogEntry",
"RPALogger",
"get_logger",
"workflow_logger",
"execution_logger",
"detection_logger",
"api_logger",
"workflow_executions_total",
"log_entries_total",
"chain_executions_total",
"trigger_fires_total",
"workflow_duration_seconds",
"chain_duration_seconds",
"active_workflows",
"active_chains",
"error_rate",
"enabled_triggers",
"increment_workflow_execution",
"record_workflow_duration",
"increment_log_entry",
"increment_chain_execution",
"record_chain_duration",
"increment_trigger_fire",
"set_active_workflows",
"set_active_chains",
"set_error_rate",
"set_enabled_triggers",
"AutomationScheduler",
# Fiche #24
"HTTP_REQUESTS_TOTAL",
"HTTP_REQUEST_DURATION_SECONDS",
"HTTP_REQUESTS_IN_FLIGHT",
"RPA_SECURITY_BLOCKS_TOTAL",
"record_http_request",
"in_flight_inc",
"in_flight_dec",
"record_security_block",
"safe_template",
]

View File

@@ -0,0 +1,283 @@
"""
Automation Scheduler - Exécution automatique des triggers et chaînes
Ce module gère l'exécution automatique des workflows et chaînes
via des déclencheurs (schedule, file, etc.).
"""
import threading
import time
import glob
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Set
from dataclasses import dataclass
from core.monitoring.logger import get_logger
from core.monitoring.chain_manager import ChainManager
from core.monitoring.trigger_manager import TriggerManager
from core.monitoring.metrics import increment_trigger_fire
logger = get_logger('automation_scheduler')
@dataclass
class TriggerState:
"""État d'un trigger pour le scheduling."""
trigger_id: str
last_check: datetime
last_fired: datetime
processed_files: Set[str]
class AutomationScheduler:
"""
Scheduler automatique pour les triggers et chaînes.
Fonctionne en arrière-plan et vérifie périodiquement :
- Schedule triggers : Lance workflows à intervalles réguliers
- File triggers : Surveille des répertoires pour nouveaux fichiers
- Manual triggers : Exécutés via API
"""
def __init__(
self,
trigger_manager: TriggerManager,
chain_manager: ChainManager,
check_interval: float = 1.0
):
"""
Initialise le scheduler.
Args:
trigger_manager: Gestionnaire de triggers
chain_manager: Gestionnaire de chaînes
check_interval: Intervalle de vérification en secondes
"""
self.trigger_manager = trigger_manager
self.chain_manager = chain_manager
self.check_interval = check_interval
self.running = False
self.thread: Optional[threading.Thread] = None
# État des triggers
self.trigger_states: Dict[str, TriggerState] = {}
logger.info("AutomationScheduler initialized")
def start(self):
"""Démarre le scheduler en arrière-plan."""
if self.running:
logger.warning("Scheduler already running")
return
self.running = True
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
logger.info("AutomationScheduler started")
def stop(self):
"""Arrête le scheduler."""
if not self.running:
return
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("AutomationScheduler stopped")
def _run_loop(self):
"""Boucle principale du scheduler."""
logger.info("Scheduler loop started")
while self.running:
try:
# Vérifier les schedule triggers
self._check_schedule_triggers()
# Vérifier les file triggers
self._check_file_triggers()
# Attendre avant la prochaine vérification
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"Error in scheduler loop: {e}", error=str(e))
time.sleep(5) # Attendre plus longtemps en cas d'erreur
def _check_schedule_triggers(self):
"""Vérifie et déclenche les schedule triggers."""
triggers = self.trigger_manager.list_triggers()
for trigger in triggers:
if not trigger.enabled or trigger.trigger_type != "schedule":
continue
try:
# Initialiser l'état si nécessaire
if trigger.trigger_id not in self.trigger_states:
self.trigger_states[trigger.trigger_id] = TriggerState(
trigger_id=trigger.trigger_id,
last_check=datetime.now(),
last_fired=trigger.last_fired or datetime.min,
processed_files=set()
)
state = self.trigger_states[trigger.trigger_id]
# Vérifier si l'intervalle est écoulé
interval = trigger.config.get('interval_seconds', 3600)
time_since_last_fire = (datetime.now() - state.last_fired).total_seconds()
if time_since_last_fire >= interval:
logger.info(
f"Firing schedule trigger {trigger.trigger_id}",
trigger_id=trigger.trigger_id,
workflow_id=trigger.workflow_id,
interval=interval
)
# Exécuter le target (workflow ou chaîne)
success = self._execute_target(trigger.workflow_id)
if success:
# Mettre à jour l'état
state.last_fired = datetime.now()
# Mettre à jour le trigger
self.trigger_manager.fire_trigger(trigger.trigger_id)
# Métriques
increment_trigger_fire(trigger.trigger_type, trigger.workflow_id)
except Exception as e:
logger.error(
f"Error checking schedule trigger {trigger.trigger_id}: {e}",
trigger_id=trigger.trigger_id,
error=str(e)
)
def _check_file_triggers(self):
"""Vérifie et déclenche les file triggers."""
triggers = self.trigger_manager.list_triggers()
for trigger in triggers:
if not trigger.enabled or trigger.trigger_type != "file":
continue
try:
# Initialiser l'état si nécessaire
if trigger.trigger_id not in self.trigger_states:
self.trigger_states[trigger.trigger_id] = TriggerState(
trigger_id=trigger.trigger_id,
last_check=datetime.now(),
last_fired=trigger.last_fired or datetime.min,
processed_files=set()
)
state = self.trigger_states[trigger.trigger_id]
# Récupérer la config
watch_directory = trigger.config.get('watch_directory')
file_pattern = trigger.config.get('file_pattern', '*')
if not watch_directory:
continue
# Chercher les fichiers correspondants
pattern_path = Path(watch_directory) / file_pattern
matching_files = glob.glob(str(pattern_path))
# Traiter les nouveaux fichiers
for file_path in matching_files:
if file_path not in state.processed_files:
logger.info(
f"New file detected for trigger {trigger.trigger_id}: {file_path}",
trigger_id=trigger.trigger_id,
file_path=file_path
)
# Exécuter le target avec le fichier en variable
success = self._execute_target(
trigger.workflow_id,
variables={'file_path': file_path}
)
if success:
# Marquer le fichier comme traité
state.processed_files.add(file_path)
# Mettre à jour le trigger
self.trigger_manager.fire_trigger(trigger.trigger_id)
# Métriques
increment_trigger_fire(trigger.trigger_type, trigger.workflow_id)
# Nettoyer les fichiers traités (garder seulement les 1000 derniers)
if len(state.processed_files) > 1000:
state.processed_files = set(list(state.processed_files)[-1000:])
except Exception as e:
logger.error(
f"Error checking file trigger {trigger.trigger_id}: {e}",
trigger_id=trigger.trigger_id,
error=str(e)
)
def _execute_target(self, target_id: str, variables: Optional[Dict] = None) -> bool:
"""
Exécute un workflow ou une chaîne.
Args:
target_id: ID du workflow ou de la chaîne
variables: Variables optionnelles pour l'exécution
Returns:
True si succès, False sinon
"""
try:
# Vérifier si c'est une chaîne
chain = self.chain_manager.get_chain(target_id)
if chain:
logger.info(f"Executing chain {target_id}")
result = self.chain_manager.execute_chain(target_id)
return result.success
# Sinon c'est un workflow simple
logger.info(f"Executing workflow {target_id}")
# TODO: Intégrer avec ExecutionLoop pour exécution réelle
# Pour l'instant, on simule
logger.warning(f"Workflow execution not yet integrated with ExecutionLoop")
# Simulation d'exécution
time.sleep(0.1)
return True
except Exception as e:
logger.error(f"Error executing target {target_id}: {e}", error=str(e))
return False
def get_status(self) -> Dict:
"""
Récupère le statut du scheduler.
Returns:
Dictionnaire avec le statut
"""
return {
'running': self.running,
'check_interval': self.check_interval,
'active_triggers': len(self.trigger_states),
'trigger_states': {
tid: {
'last_check': state.last_check.isoformat(),
'last_fired': state.last_fired.isoformat(),
'processed_files_count': len(state.processed_files)
}
for tid, state in self.trigger_states.items()
}
}

View File

@@ -0,0 +1,297 @@
"""
Chain Manager - Gestion des chaînes de workflows
Ce module permet de créer, gérer et exécuter des séquences
ordonnées de workflows.
"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any, Callable
from dataclasses import dataclass, field, asdict
from core.monitoring.logger import get_logger
from core.monitoring.metrics import increment_chain_execution, record_chain_duration, set_active_chains
logger = get_logger('chain_manager')
@dataclass
class ChainExecutionResult:
"""Résultat d'exécution d'une chaîne."""
chain_id: str
success: bool
duration: float
workflows_executed: int
failed_at: Optional[str] = None
error_message: Optional[str] = None
@dataclass
class WorkflowChain:
"""Chaîne de workflows."""
chain_id: str
name: str
workflows: List[str] # Ordered list of workflow_ids
status: str = "active" # active, inactive, running
created_at: datetime = field(default_factory=datetime.now)
last_execution: Optional[datetime] = None
success_rate: float = 0.0
execution_history: List[Dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convertit en dictionnaire."""
data = asdict(self)
data['created_at'] = self.created_at.isoformat()
data['last_execution'] = self.last_execution.isoformat() if self.last_execution else None
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WorkflowChain':
"""Crée depuis un dictionnaire."""
data['created_at'] = datetime.fromisoformat(data['created_at'])
if data.get('last_execution'):
data['last_execution'] = datetime.fromisoformat(data['last_execution'])
return cls(**data)
class ChainManager:
"""Gestionnaire de chaînes de workflows."""
def __init__(self, storage_path: Path):
"""
Initialise le gestionnaire.
Args:
storage_path: Chemin du répertoire de stockage
"""
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.chains_file = self.storage_path / "chains.json"
self._chains: Dict[str, WorkflowChain] = {}
self._load_chains()
def _load_chains(self):
"""Charge les chaînes depuis le fichier."""
if self.chains_file.exists():
try:
with open(self.chains_file, 'r') as f:
data = json.load(f)
self._chains = {
chain_id: WorkflowChain.from_dict(chain_data)
for chain_id, chain_data in data.items()
}
logger.info(f"Loaded {len(self._chains)} chains")
except Exception as e:
logger.error(f"Error loading chains: {e}")
self._chains = {}
else:
self._chains = {}
def _save_chains(self):
"""Sauvegarde les chaînes dans le fichier."""
try:
data = {
chain_id: chain.to_dict()
for chain_id, chain in self._chains.items()
}
with open(self.chains_file, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved {len(self._chains)} chains")
except Exception as e:
logger.error(f"Error saving chains: {e}")
def list_chains(self) -> List[WorkflowChain]:
"""
Liste toutes les chaînes.
Returns:
Liste des chaînes
"""
return list(self._chains.values())
def get_chain(self, chain_id: str) -> Optional[WorkflowChain]:
"""
Récupère une chaîne par son ID.
Args:
chain_id: ID de la chaîne
Returns:
Chaîne ou None si non trouvée
"""
return self._chains.get(chain_id)
def create_chain(self, name: str, workflows: List[str]) -> WorkflowChain:
"""
Crée une nouvelle chaîne.
Args:
name: Nom de la chaîne
workflows: Liste ordonnée des workflow_ids
Returns:
Chaîne créée
Raises:
ValueError: Si les workflows n'existent pas
"""
# Valider que les workflows existent
if not self.validate_workflows_exist(workflows):
raise ValueError("One or more workflows do not exist")
chain_id = f"chain_{uuid.uuid4().hex[:8]}"
chain = WorkflowChain(
chain_id=chain_id,
name=name,
workflows=workflows
)
self._chains[chain_id] = chain
self._save_chains()
logger.info(f"Created chain {chain_id}: {name} with {len(workflows)} workflows")
return chain
def validate_workflows_exist(self, workflow_ids: List[str]) -> bool:
"""
Valide que tous les workflows existent.
Args:
workflow_ids: Liste des workflow_ids à valider
Returns:
True si tous existent, False sinon
"""
# Pour l'instant, on simule la validation
# Dans une vraie implémentation, on vérifierait dans le storage
from pathlib import Path
workflows_path = Path("data/training/workflows")
if not workflows_path.exists():
return False
for wf_id in workflow_ids:
wf_file = workflows_path / f"{wf_id}.json"
if not wf_file.exists():
logger.warning(f"Workflow {wf_id} does not exist")
return False
return True
def execute_chain(
self,
chain_id: str,
on_progress: Optional[Callable[[str, int, int], None]] = None
) -> ChainExecutionResult:
"""
Exécute une chaîne de workflows.
Args:
chain_id: ID de la chaîne à exécuter
on_progress: Callback optionnel pour les mises à jour de progression
(workflow_id, current_index, total_workflows)
Returns:
Résultat de l'exécution
Raises:
ValueError: Si la chaîne n'existe pas
"""
chain = self.get_chain(chain_id)
if not chain:
raise ValueError(f"Chain {chain_id} not found")
logger.info(f"Starting chain execution: {chain_id}")
start_time = datetime.now()
# Mettre à jour le statut
chain.status = "running"
set_active_chains(sum(1 for c in self._chains.values() if c.status == "running"))
workflows_executed = 0
failed_at = None
error_message = None
success = True
try:
for i, workflow_id in enumerate(chain.workflows):
if on_progress:
on_progress(workflow_id, i, len(chain.workflows))
logger.info(f"Executing workflow {workflow_id} ({i+1}/{len(chain.workflows)})")
# Simuler l'exécution du workflow
# Dans une vraie implémentation, on appellerait l'ExecutionLoop
try:
# TODO: Intégrer avec ExecutionLoop
# result = execution_loop.execute_workflow(workflow_id)
# if not result.success:
# raise Exception(result.error_message)
workflows_executed += 1
except Exception as e:
failed_at = workflow_id
error_message = str(e)
success = False
logger.error(f"Chain execution failed at {workflow_id}: {e}")
break
finally:
# Calculer la durée
duration = (datetime.now() - start_time).total_seconds()
# Mettre à jour le statut
chain.status = "active"
chain.last_execution = datetime.now()
# Mettre à jour l'historique
execution_record = {
'timestamp': datetime.now().isoformat(),
'success': success,
'duration': duration,
'workflows_executed': workflows_executed,
'failed_at': failed_at
}
chain.execution_history.append(execution_record)
# Calculer le taux de succès
if chain.execution_history:
successes = sum(1 for r in chain.execution_history if r['success'])
chain.success_rate = (successes / len(chain.execution_history)) * 100
self._save_chains()
# Métriques
increment_chain_execution(chain_id, success)
record_chain_duration(chain_id, duration)
set_active_chains(sum(1 for c in self._chains.values() if c.status == "running"))
logger.info(f"Chain execution completed: {chain_id} (success={success}, duration={duration:.2f}s)")
return ChainExecutionResult(
chain_id=chain_id,
success=success,
duration=duration,
workflows_executed=workflows_executed,
failed_at=failed_at,
error_message=error_message
)
def delete_chain(self, chain_id: str) -> bool:
"""
Supprime une chaîne.
Args:
chain_id: ID de la chaîne à supprimer
Returns:
True si supprimée, False si non trouvée
"""
if chain_id in self._chains:
del self._chains[chain_id]
self._save_chains()
logger.info(f"Deleted chain {chain_id}")
return True
return False

View File

@@ -0,0 +1,98 @@
"""HTTP server metrics for Prometheus.
Fiche #24 - Observabilité.
Used from security middlewares (FastAPI/Flask) so we can count blocked
requests too (401/403/423/429).
Labels are intentionally simple to avoid cardinality explosions.
"""
from __future__ import annotations
from typing import Optional
from prometheus_client import Counter, Histogram, Gauge
HTTP_REQUESTS_TOTAL = Counter(
"http_requests_total",
"Total HTTP requests",
["service", "method", "path", "status"],
)
HTTP_REQUEST_DURATION_SECONDS = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["service", "method", "path"],
buckets=(
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
10.0,
float("inf"),
),
)
HTTP_REQUESTS_IN_FLIGHT = Gauge(
"http_requests_in_flight",
"Number of HTTP requests currently being processed",
["service"],
)
RPA_SECURITY_BLOCKS_TOTAL = Counter(
"rpa_security_blocks_total",
"Requests blocked by security controls",
["service", "reason"],
)
def record_http_request(
service: str,
method: str,
path_template: str,
status_code: int,
duration_seconds: float,
) -> None:
"""Record a completed HTTP request."""
HTTP_REQUESTS_TOTAL.labels(
service=service,
method=method.upper(),
path=path_template,
status=str(int(status_code)),
).inc()
HTTP_REQUEST_DURATION_SECONDS.labels(
service=service,
method=method.upper(),
path=path_template,
).observe(max(0.0, float(duration_seconds)))
def in_flight_inc(service: str) -> None:
HTTP_REQUESTS_IN_FLIGHT.labels(service=service).inc()
def in_flight_dec(service: str) -> None:
HTTP_REQUESTS_IN_FLIGHT.labels(service=service).dec()
def record_security_block(service: str, reason: str) -> None:
"""Increment counter when a request is blocked for *security* reasons."""
RPA_SECURITY_BLOCKS_TOTAL.labels(service=service, reason=reason).inc()
def safe_template(path_template: Optional[str], fallback_path: str) -> str:
"""Return a stable path label.
Prefer framework route template when available. Fall back to raw path.
"""
if path_template and isinstance(path_template, str):
return path_template
return fallback_path

View File

@@ -0,0 +1,213 @@
"""
Log Exporter - Export des logs en format ZIP
Ce module permet d'exporter les logs système en archives ZIP
pour analyse offline ou partage.
"""
import json
import zipfile
import io
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
from core.monitoring.logger import get_logger
logger = get_logger('log_exporter')
class LogExporter:
"""Exporteur de logs."""
def __init__(self, logs_path: Path):
"""
Initialise l'exporteur.
Args:
logs_path: Chemin du répertoire des logs
"""
self.logs_path = Path(logs_path)
self.logs_path.mkdir(parents=True, exist_ok=True)
def export_to_zip(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> io.BytesIO:
"""
Exporte les logs en archive ZIP.
Args:
start_time: Timestamp de début (optionnel)
end_time: Timestamp de fin (optionnel)
Returns:
BytesIO contenant l'archive ZIP
"""
logger.info(f"Exporting logs (start={start_time}, end={end_time})")
# Créer un ZIP en mémoire
memory_file = io.BytesIO()
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# Logs d'exécution
execution_logs = self.get_execution_logs(start_time, end_time)
zf.writestr('execution_logs.json', json.dumps(execution_logs, indent=2))
# Logs d'erreurs
error_logs = self.get_error_logs(start_time, end_time)
zf.writestr('error_logs.json', json.dumps(error_logs, indent=2))
# Métriques
metrics = self.get_metrics_summary()
zf.writestr('metrics.json', json.dumps(metrics, indent=2))
memory_file.seek(0)
logger.info(f"Exported {len(execution_logs)} execution logs, {len(error_logs)} error logs")
return memory_file
def get_execution_logs(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
Récupère les logs d'exécution.
Args:
start_time: Timestamp de début (optionnel)
end_time: Timestamp de fin (optionnel)
Returns:
Liste des logs d'exécution
"""
logs = []
# Lire les logs depuis les fichiers
for log_file in self.logs_path.glob('*.log'):
try:
with open(log_file, 'r') as f:
for line in f:
# Parser la ligne de log
# Format: timestamp - name - level - message
parts = line.strip().split(' - ', 3)
if len(parts) >= 4:
timestamp_str = parts[0]
try:
timestamp = datetime.fromisoformat(timestamp_str)
# Filtrer par date si spécifié
if start_time and timestamp < start_time:
continue
if end_time and timestamp > end_time:
continue
log_entry = {
'timestamp': timestamp_str,
'component': parts[1].replace('rpa.', ''),
'level': parts[2],
'message': parts[3]
}
# Filtrer les logs d'exécution (INFO, DEBUG)
if log_entry['level'] in ['INFO', 'DEBUG']:
logs.append(log_entry)
except (ValueError, IndexError):
continue
except Exception as e:
logger.error(f"Error reading log file {log_file}: {e}")
# Trier par timestamp
logs.sort(key=lambda x: x['timestamp'])
return logs
def get_error_logs(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
Récupère les logs d'erreurs.
Args:
start_time: Timestamp de début (optionnel)
end_time: Timestamp de fin (optionnel)
Returns:
Liste des logs d'erreurs
"""
logs = []
# Lire les logs depuis les fichiers
for log_file in self.logs_path.glob('*.log'):
try:
with open(log_file, 'r') as f:
for line in f:
# Parser la ligne de log
parts = line.strip().split(' - ', 3)
if len(parts) >= 4:
timestamp_str = parts[0]
try:
timestamp = datetime.fromisoformat(timestamp_str)
# Filtrer par date si spécifié
if start_time and timestamp < start_time:
continue
if end_time and timestamp > end_time:
continue
log_entry = {
'timestamp': timestamp_str,
'component': parts[1].replace('rpa.', ''),
'level': parts[2],
'message': parts[3]
}
# Filtrer les logs d'erreurs (ERROR, WARNING)
if log_entry['level'] in ['ERROR', 'WARNING']:
logs.append(log_entry)
except (ValueError, IndexError):
continue
except Exception as e:
logger.error(f"Error reading log file {log_file}: {e}")
# Trier par timestamp
logs.sort(key=lambda x: x['timestamp'])
return logs
def get_metrics_summary(self) -> Dict[str, Any]:
"""
Récupère un résumé des métriques.
Returns:
Dictionnaire des métriques
"""
# Récupérer les métriques depuis Prometheus
try:
from prometheus_client import REGISTRY
metrics = {}
# Collecter les métriques
for metric in REGISTRY.collect():
for sample in metric.samples:
metrics[sample.name] = {
'value': sample.value,
'labels': sample.labels,
'type': metric.type
}
return {
'timestamp': datetime.now().isoformat(),
'metrics': metrics
}
except Exception as e:
logger.error(f"Error collecting metrics: {e}")
return {
'timestamp': datetime.now().isoformat(),
'error': str(e)
}

439
core/monitoring/logger.py Normal file
View File

@@ -0,0 +1,439 @@
"""
Système de logging centralisé pour RPA Vision V3
Ce module fournit un logging unifié pour tous les composants avec:
- Format structuré et cohérent
- Filtrage des codes ANSI
- Nettoyage des noms de composants
- Rotation des fichiers
- Intégration Prometheus
"""
import logging
import logging.handlers
import json
import os
import re
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from pathlib import Path
# Import des métriques Prometheus
try:
from core.monitoring.metrics import increment_log_entry
METRICS_ENABLED = True
except ImportError:
METRICS_ENABLED = False
# =============================================================================
# Configuration
# =============================================================================
# Répertoire des logs
LOG_DIR = Path(__file__).parent.parent.parent / "logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
# Format unifié
LOG_FORMAT = "%(asctime)s | %(levelname)-7s | %(name)-20s | %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# Rotation
MAX_LOG_SIZE = 10 * 1024 * 1024 # 10 MB
BACKUP_COUNT = 5 # Garder 5 fichiers de backup
# Pattern pour supprimer les codes ANSI
ANSI_ESCAPE_PATTERN = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
# =============================================================================
# Filtres et Formatters
# =============================================================================
class ANSIStripFilter(logging.Filter):
"""Filtre qui supprime les codes ANSI des messages."""
def filter(self, record: logging.LogRecord) -> bool:
if hasattr(record, 'msg') and isinstance(record.msg, str):
record.msg = ANSI_ESCAPE_PATTERN.sub('', record.msg)
return True
class CleanFormatter(logging.Formatter):
"""Formatter qui nettoie les messages et assure un format cohérent."""
def format(self, record: logging.LogRecord) -> str:
# Nettoyer les codes ANSI du message
if hasattr(record, 'msg') and isinstance(record.msg, str):
record.msg = ANSI_ESCAPE_PATTERN.sub('', record.msg)
# Formater
return super().format(record)
# =============================================================================
# LogEntry Dataclass
# =============================================================================
@dataclass
class LogEntry:
"""Entrée de log structurée."""
timestamp: datetime
level: str
component: str
message: str
workflow_id: Optional[str] = None
node_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convertit l'entrée en dictionnaire."""
return {
'timestamp': self.timestamp.isoformat(),
'level': self.level,
'component': self.component,
'message': self.message,
'workflow_id': self.workflow_id,
'node_id': self.node_id,
'metadata': self.metadata
}
def to_json(self) -> str:
"""Convertit l'entrée en JSON."""
return json.dumps(self.to_dict())
# =============================================================================
# RPALogger Class
# =============================================================================
class RPALogger:
"""Logger centralisé pour le système RPA."""
# Liste des composants valides (whitelist)
VALID_COMPONENTS = {
'api', 'workflow', 'execution', 'detection', 'healing',
'monitoring', 'dashboard', 'automation_scheduler', 'chain_manager',
'trigger_manager', 'vwb_backend', 'vwb_frontend', 'main_backend',
'log_exporter', 'analytics', 'security', 'embedding', 'faiss',
'coaching', 'correction_packs', 'system', 'backup', 'version',
'test', 'test_component'
}
@staticmethod
def _clean_component_name(component: str) -> str:
"""
Nettoie le nom du composant pour éviter les caractères corrompus.
Args:
component: Nom du composant brut
Returns:
Nom nettoyé sans caractères problématiques
"""
if not component:
return "unknown"
# Convertir en minuscules et supprimer les espaces
component = component.lower().strip()
# Ne garder que les caractères alphanumériques et underscores
cleaned = ""
for char in component:
if char.isalnum() or char == '_':
cleaned += char
elif char in ['-', '.', ' ']:
cleaned += '_'
# Limiter la longueur
cleaned = cleaned[:30].strip('_')
# Si le résultat est vide ou trop court, utiliser "unknown"
if not cleaned or len(cleaned) < 2:
return "unknown"
return cleaned
def __init__(self, component: str, log_file: Optional[str] = None):
"""
Initialise le logger.
Args:
component: Nom du composant
log_file: Chemin du fichier de log (optionnel)
"""
# Nettoyer le nom du composant
self.component = self._clean_component_name(component)
# Chemin du fichier de log
if log_file:
self.log_file = Path(log_file)
else:
self.log_file = LOG_DIR / f"{self.component}.log"
# Créer le répertoire parent si nécessaire
self.log_file.parent.mkdir(parents=True, exist_ok=True)
# Stocker les entrées en mémoire (limité)
self.entries: List[LogEntry] = []
self._max_entries = 1000
# Configuration du logger Python standard
self.logger = logging.getLogger(f"rpa.{self.component}")
self.logger.setLevel(logging.DEBUG)
# Éviter les handlers dupliqués
if not self.logger.handlers:
self._setup_handlers()
def _setup_handlers(self):
"""Configure les handlers de logging."""
# Formatter commun
formatter = CleanFormatter(LOG_FORMAT, LOG_DATE_FORMAT)
# Handler fichier avec rotation
try:
file_handler = logging.handlers.RotatingFileHandler(
self.log_file,
maxBytes=MAX_LOG_SIZE,
backupCount=BACKUP_COUNT,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
file_handler.addFilter(ANSIStripFilter())
self.logger.addHandler(file_handler)
except Exception as e:
print(f"Warning: Could not create file handler for {self.component}: {e}")
# Handler console (seulement si pas en mode silencieux)
if os.getenv('RPA_LOG_CONSOLE', 'false').lower() == 'true':
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO)
console_handler.addFilter(ANSIStripFilter())
self.logger.addHandler(console_handler)
def _log(self, level: str, message: str, workflow_id: Optional[str] = None,
node_id: Optional[str] = None, **metadata):
"""
Log interne avec métriques.
Args:
level: Niveau de log (INFO, WARNING, ERROR, DEBUG)
message: Message de log
workflow_id: ID du workflow (optionnel)
node_id: ID du nœud (optionnel)
**metadata: Métadonnées additionnelles
"""
# Nettoyer le message des codes ANSI
clean_message = ANSI_ESCAPE_PATTERN.sub('', str(message))
# Créer l'entrée
entry = LogEntry(
timestamp=datetime.now(),
level=level,
component=self.component,
message=clean_message,
workflow_id=workflow_id,
node_id=node_id,
metadata=metadata
)
# Stocker en mémoire (avec limite)
self.entries.append(entry)
if len(self.entries) > self._max_entries:
self.entries = self.entries[-self._max_entries:]
# Incrémenter le compteur Prometheus
if METRICS_ENABLED:
try:
increment_log_entry(level, self.component)
except Exception:
pass
# Construire le message formaté
log_msg = clean_message
if workflow_id:
log_msg += f" | workflow={workflow_id}"
if node_id:
log_msg += f" | node={node_id}"
if metadata:
# Formater les métadonnées de manière compacte
meta_str = " | ".join(f"{k}={v}" for k, v in metadata.items())
log_msg += f" | {meta_str}"
# Log via le logger Python
log_method = getattr(self.logger, level.lower(), self.logger.info)
log_method(log_msg)
def info(self, message: str, workflow_id: Optional[str] = None,
node_id: Optional[str] = None, **metadata):
"""Log niveau INFO."""
self._log('INFO', message, workflow_id, node_id, **metadata)
def warning(self, message: str, workflow_id: Optional[str] = None,
node_id: Optional[str] = None, **metadata):
"""Log niveau WARNING."""
self._log('WARNING', message, workflow_id, node_id, **metadata)
def error(self, message: str, workflow_id: Optional[str] = None,
node_id: Optional[str] = None, **metadata):
"""Log niveau ERROR."""
self._log('ERROR', message, workflow_id, node_id, **metadata)
def debug(self, message: str, workflow_id: Optional[str] = None,
node_id: Optional[str] = None, **metadata):
"""Log niveau DEBUG."""
self._log('DEBUG', message, workflow_id, node_id, **metadata)
def critical(self, message: str, workflow_id: Optional[str] = None,
node_id: Optional[str] = None, **metadata):
"""Log niveau CRITICAL."""
self._log('CRITICAL', message, workflow_id, node_id, **metadata)
def workflow_start(self, workflow_id: str, **metadata):
"""Log démarrage de workflow."""
self.info("Workflow started", workflow_id=workflow_id, **metadata)
def workflow_end(self, workflow_id: str, success: bool, duration: float, **metadata):
"""Log fin de workflow."""
status = 'success' if success else 'failure'
self.info(
f"Workflow completed: {status}",
workflow_id=workflow_id,
success=success,
duration_s=round(duration, 3),
**metadata
)
def action_executed(self, workflow_id: str, node_id: str, action_type: str,
success: bool, confidence: float = 0.0, **metadata):
"""Log exécution d'une action."""
level = 'INFO' if success else 'WARNING'
self._log(
level,
f"Action executed: {action_type}",
workflow_id=workflow_id,
node_id=node_id,
success=success,
confidence=round(confidence, 3),
**metadata
)
def get_recent_logs(self, limit: int = 100) -> List[LogEntry]:
"""
Récupère les logs récents.
Args:
limit: Nombre maximum de logs à retourner
Returns:
Liste des entrées de log récentes
"""
return self.entries[-limit:]
def export_logs(self, start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
format: str = 'json') -> str:
"""
Exporte les logs.
Args:
start_time: Timestamp de début (optionnel)
end_time: Timestamp de fin (optionnel)
format: Format d'export ('json' ou 'text')
Returns:
Logs exportés
"""
filtered_logs = self.entries
if start_time:
filtered_logs = [log for log in filtered_logs if log.timestamp >= start_time]
if end_time:
filtered_logs = [log for log in filtered_logs if log.timestamp <= end_time]
if format == 'json':
return json.dumps([log.to_dict() for log in filtered_logs], indent=2)
else:
lines = []
for log in filtered_logs:
line = f"{log.timestamp.strftime(LOG_DATE_FORMAT)} | {log.level:7} | {log.component:20} | {log.message}"
lines.append(line)
return "\n".join(lines)
# =============================================================================
# Global Logger Management
# =============================================================================
# Instance globale pour chaque composant
_loggers: Dict[str, RPALogger] = {}
def get_logger(component: str) -> RPALogger:
"""
Récupère ou crée un logger pour un composant.
Args:
component: Nom du composant
Returns:
Logger pour le composant
"""
# Nettoyer le nom avant de chercher
clean_name = RPALogger._clean_component_name(component)
if clean_name not in _loggers:
_loggers[clean_name] = RPALogger(component)
return _loggers[clean_name]
def configure_root_logger():
"""
Configure le logger racine pour capturer tous les logs Python.
Appelé une seule fois au démarrage.
"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Formatter
formatter = CleanFormatter(LOG_FORMAT, LOG_DATE_FORMAT)
# Handler fichier principal
main_log = LOG_DIR / "main.log"
try:
handler = logging.handlers.RotatingFileHandler(
main_log,
maxBytes=MAX_LOG_SIZE,
backupCount=BACKUP_COUNT,
encoding='utf-8'
)
handler.setFormatter(formatter)
handler.addFilter(ANSIStripFilter())
root_logger.addHandler(handler)
except Exception:
pass
# Réduire le bruit des bibliothèques externes
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('werkzeug').setLevel(logging.WARNING)
logging.getLogger('socketio').setLevel(logging.WARNING)
logging.getLogger('engineio').setLevel(logging.WARNING)
# =============================================================================
# Pre-configured Loggers
# =============================================================================
# Loggers pré-configurés pour les composants principaux
workflow_logger = get_logger('workflow')
execution_logger = get_logger('execution')
detection_logger = get_logger('detection')
api_logger = get_logger('api')
monitoring_logger = get_logger('monitoring')
security_logger = get_logger('security')

206
core/monitoring/metrics.py Normal file
View File

@@ -0,0 +1,206 @@
"""
Métriques Prometheus pour RPA Vision
Ce module définit toutes les métriques Prometheus utilisées
pour monitorer le système RPA.
"""
from prometheus_client import Counter, Histogram, Gauge, Info
# =============================================================================
# Counters - Métriques qui ne font qu'augmenter
# =============================================================================
workflow_executions_total = Counter(
'workflow_executions_total',
'Total workflow executions',
['workflow_id', 'status']
)
log_entries_total = Counter(
'log_entries_total',
'Total log entries',
['level', 'component']
)
chain_executions_total = Counter(
'chain_executions_total',
'Total chain executions',
['chain_id', 'status']
)
trigger_fires_total = Counter(
'trigger_fires_total',
'Total trigger fires',
['trigger_type', 'workflow_id']
)
# =============================================================================
# Histograms - Distribution de valeurs
# =============================================================================
workflow_duration_seconds = Histogram(
'workflow_duration_seconds',
'Workflow execution duration in seconds',
['workflow_id'],
buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, float('inf'))
)
chain_duration_seconds = Histogram(
'chain_duration_seconds',
'Chain execution duration in seconds',
['chain_id'],
buckets=(1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, float('inf'))
)
# =============================================================================
# Gauges - Valeurs qui peuvent augmenter ou diminuer
# =============================================================================
active_workflows = Gauge(
'active_workflows',
'Number of currently active workflows'
)
active_chains = Gauge(
'active_chains',
'Number of currently active chains'
)
error_rate = Gauge(
'error_rate',
'Current error rate percentage'
)
enabled_triggers = Gauge(
'enabled_triggers',
'Number of enabled triggers'
)
# =============================================================================
# Info - Informations système
# =============================================================================
system_info = Info(
'rpa_system',
'RPA system information'
)
# Initialiser les informations système
system_info.info({
'version': '3.0.0',
'component': 'rpa_vision',
'monitoring': 'enabled'
})
# =============================================================================
# Fonctions utilitaires
# =============================================================================
def increment_workflow_execution(workflow_id: str, success: bool):
"""
Incrémente le compteur d'exécution de workflow.
Args:
workflow_id: ID du workflow
success: True si succès, False si échec
"""
status = 'success' if success else 'failure'
workflow_executions_total.labels(workflow_id=workflow_id, status=status).inc()
def record_workflow_duration(workflow_id: str, duration: float):
"""
Enregistre la durée d'exécution d'un workflow.
Args:
workflow_id: ID du workflow
duration: Durée en secondes
"""
workflow_duration_seconds.labels(workflow_id=workflow_id).observe(duration)
def increment_log_entry(level: str, component: str):
"""
Incrémente le compteur d'entrées de log.
Args:
level: Niveau de log (INFO, WARNING, ERROR, DEBUG)
component: Composant source
"""
log_entries_total.labels(level=level, component=component).inc()
def increment_chain_execution(chain_id: str, success: bool):
"""
Incrémente le compteur d'exécution de chaîne.
Args:
chain_id: ID de la chaîne
success: True si succès, False si échec
"""
status = 'success' if success else 'failure'
chain_executions_total.labels(chain_id=chain_id, status=status).inc()
def record_chain_duration(chain_id: str, duration: float):
"""
Enregistre la durée d'exécution d'une chaîne.
Args:
chain_id: ID de la chaîne
duration: Durée en secondes
"""
chain_duration_seconds.labels(chain_id=chain_id).observe(duration)
def increment_trigger_fire(trigger_type: str, workflow_id: str):
"""
Incrémente le compteur de déclenchements.
Args:
trigger_type: Type de trigger (schedule, file, manual)
workflow_id: ID du workflow déclenché
"""
trigger_fires_total.labels(trigger_type=trigger_type, workflow_id=workflow_id).inc()
def set_active_workflows(count: int):
"""
Met à jour le nombre de workflows actifs.
Args:
count: Nombre de workflows actifs
"""
active_workflows.set(count)
def set_active_chains(count: int):
"""
Met à jour le nombre de chaînes actives.
Args:
count: Nombre de chaînes actives
"""
active_chains.set(count)
def set_error_rate(rate: float):
"""
Met à jour le taux d'erreur.
Args:
rate: Taux d'erreur en pourcentage (0-100)
"""
error_rate.set(rate)
def set_enabled_triggers(count: int):
"""
Met à jour le nombre de triggers activés.
Args:
count: Nombre de triggers activés
"""
enabled_triggers.set(count)

View File

@@ -0,0 +1,286 @@
"""
Trigger Manager - Gestion des déclencheurs de workflows
Ce module permet de créer et gérer des déclencheurs automatiques
pour l'exécution de workflows.
"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field, asdict
from core.monitoring.logger import get_logger
from core.monitoring.metrics import increment_trigger_fire, set_enabled_triggers
logger = get_logger('trigger_manager')
@dataclass
class Trigger:
"""Déclencheur de workflow."""
trigger_id: str
trigger_type: str # schedule, file, manual
workflow_id: str
config: Dict[str, Any]
enabled: bool = True
created_at: datetime = field(default_factory=datetime.now)
last_fired: Optional[datetime] = None
fire_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convertit en dictionnaire."""
data = asdict(self)
data['created_at'] = self.created_at.isoformat()
data['last_fired'] = self.last_fired.isoformat() if self.last_fired else None
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Trigger':
"""Crée depuis un dictionnaire."""
data['created_at'] = datetime.fromisoformat(data['created_at'])
if data.get('last_fired'):
data['last_fired'] = datetime.fromisoformat(data['last_fired'])
return cls(**data)
class TriggerManager:
"""Gestionnaire de déclencheurs."""
def __init__(self, storage_path: Path):
"""
Initialise le gestionnaire.
Args:
storage_path: Chemin du répertoire de stockage
"""
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.triggers_file = self.storage_path / "triggers.json"
self._triggers: Dict[str, Trigger] = {}
self._load_triggers()
def _load_triggers(self):
"""Charge les triggers depuis le fichier."""
if self.triggers_file.exists():
try:
with open(self.triggers_file, 'r') as f:
data = json.load(f)
self._triggers = {
trigger_id: Trigger.from_dict(trigger_data)
for trigger_id, trigger_data in data.items()
}
logger.info(f"Loaded {len(self._triggers)} triggers")
self._update_metrics()
except Exception as e:
logger.error(f"Error loading triggers: {e}")
self._triggers = {}
else:
self._triggers = {}
def _save_triggers(self):
"""Sauvegarde les triggers dans le fichier."""
try:
data = {
trigger_id: trigger.to_dict()
for trigger_id, trigger in self._triggers.items()
}
with open(self.triggers_file, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved {len(self._triggers)} triggers")
self._update_metrics()
except Exception as e:
logger.error(f"Error saving triggers: {e}")
def _update_metrics(self):
"""Met à jour les métriques Prometheus."""
enabled_count = sum(1 for t in self._triggers.values() if t.enabled)
set_enabled_triggers(enabled_count)
def list_triggers(self) -> List[Trigger]:
"""
Liste tous les triggers.
Returns:
Liste des triggers
"""
return list(self._triggers.values())
def get_trigger(self, trigger_id: str) -> Optional[Trigger]:
"""
Récupère un trigger par son ID.
Args:
trigger_id: ID du trigger
Returns:
Trigger ou None si non trouvé
"""
return self._triggers.get(trigger_id)
def create_trigger(
self,
trigger_type: str,
workflow_id: str,
config: Dict[str, Any]
) -> Trigger:
"""
Crée un nouveau trigger.
Args:
trigger_type: Type de trigger (schedule, file, manual)
workflow_id: ID du workflow à déclencher
config: Configuration du trigger
Returns:
Trigger créé
Raises:
ValueError: Si la configuration est invalide
"""
# Valider la configuration
if not self.validate_config(trigger_type, config):
raise ValueError(f"Invalid configuration for trigger type {trigger_type}")
trigger_id = f"trigger_{uuid.uuid4().hex[:8]}"
trigger = Trigger(
trigger_id=trigger_id,
trigger_type=trigger_type,
workflow_id=workflow_id,
config=config
)
self._triggers[trigger_id] = trigger
self._save_triggers()
logger.info(f"Created trigger {trigger_id}: {trigger_type} for workflow {workflow_id}")
return trigger
def validate_config(self, trigger_type: str, config: Dict[str, Any]) -> bool:
"""
Valide la configuration d'un trigger.
Args:
trigger_type: Type de trigger
config: Configuration à valider
Returns:
True si valide, False sinon
"""
if trigger_type == "schedule":
# Valider la configuration schedule
if 'interval_seconds' not in config:
logger.warning("Schedule trigger missing interval_seconds")
return False
interval = config['interval_seconds']
if not isinstance(interval, (int, float)) or interval <= 0:
logger.warning(f"Invalid interval_seconds: {interval}")
return False
return True
elif trigger_type == "file":
# Valider la configuration file
if 'watch_directory' not in config or 'file_pattern' not in config:
logger.warning("File trigger missing watch_directory or file_pattern")
return False
return True
elif trigger_type == "manual":
# Pas de validation spécifique pour manual
return True
else:
logger.warning(f"Unknown trigger type: {trigger_type}")
return False
def enable_trigger(self, trigger_id: str) -> bool:
"""
Active un trigger.
Args:
trigger_id: ID du trigger
Returns:
True si activé, False si non trouvé
"""
trigger = self.get_trigger(trigger_id)
if trigger:
trigger.enabled = True
self._save_triggers()
logger.info(f"Enabled trigger {trigger_id}")
return True
return False
def disable_trigger(self, trigger_id: str) -> bool:
"""
Désactive un trigger.
Args:
trigger_id: ID du trigger
Returns:
True si désactivé, False si non trouvé
"""
trigger = self.get_trigger(trigger_id)
if trigger:
trigger.enabled = False
self._save_triggers()
logger.info(f"Disabled trigger {trigger_id}")
return True
return False
def fire_trigger(self, trigger_id: str) -> bool:
"""
Déclenche manuellement un trigger.
Args:
trigger_id: ID du trigger
Returns:
True si déclenché, False si non trouvé ou désactivé
"""
trigger = self.get_trigger(trigger_id)
if not trigger:
logger.warning(f"Trigger {trigger_id} not found")
return False
if not trigger.enabled:
logger.warning(f"Trigger {trigger_id} is disabled")
return False
# Mettre à jour les stats
trigger.last_fired = datetime.now()
trigger.fire_count += 1
self._save_triggers()
# Métriques
increment_trigger_fire(trigger.trigger_type, trigger.workflow_id)
logger.info(f"Fired trigger {trigger_id} for workflow {trigger.workflow_id}")
# TODO: Intégrer avec ExecutionLoop pour lancer le workflow
# execution_loop.execute_workflow(trigger.workflow_id)
return True
def delete_trigger(self, trigger_id: str) -> bool:
"""
Supprime un trigger.
Args:
trigger_id: ID du trigger à supprimer
Returns:
True si supprimé, False si non trouvé
"""
if trigger_id in self._triggers:
del self._triggers[trigger_id]
self._save_triggers()
logger.info(f"Deleted trigger {trigger_id}")
return True
return False