496 lines
16 KiB
Python
496 lines
16 KiB
Python
"""
|
|
Collecteur de métriques pour surveiller les performances du système RPA.
|
|
Suit la latence, la concordance, le taux de correction et génère des alertes.
|
|
"""
|
|
|
|
import time
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict
|
|
|
|
from .logger import Logger
|
|
|
|
|
|
class MetricsCollector:
|
|
"""
|
|
Collecteur de métriques pour surveillance des performances.
|
|
"""
|
|
|
|
def __init__(self, logger: Logger, config: Dict[str, Any]):
|
|
"""
|
|
Initialise le collecteur de métriques.
|
|
|
|
Args:
|
|
logger: Logger pour journalisation
|
|
config: Configuration globale
|
|
"""
|
|
self.logger = logger
|
|
self.config = config
|
|
|
|
# Seuils de performance
|
|
self.latency_threshold = config.get("performance", {}).get(
|
|
"max_latency_ms", 400
|
|
)
|
|
self.concordance_threshold = config.get("thresholds", {}).get(
|
|
"concordance_rate", 0.95
|
|
)
|
|
self.correction_rate_threshold = config.get("thresholds", {}).get(
|
|
"correction_rate", 0.03
|
|
)
|
|
|
|
# Métriques par tâche
|
|
self.task_metrics: Dict[str, Dict[str, Any]] = defaultdict(
|
|
lambda: {
|
|
"latencies": [],
|
|
"successes": 0,
|
|
"failures": 0,
|
|
"corrections": 0,
|
|
"total_executions": 0,
|
|
"last_execution": None
|
|
}
|
|
)
|
|
|
|
# Métriques globales
|
|
self.global_metrics = {
|
|
"total_latencies": [],
|
|
"total_successes": 0,
|
|
"total_failures": 0,
|
|
"total_corrections": 0,
|
|
"total_executions": 0,
|
|
"alerts_generated": 0
|
|
}
|
|
|
|
# Historique des alertes
|
|
self.alerts: List[Dict[str, Any]] = []
|
|
|
|
self.logger.log_action({
|
|
"action": "metrics_collector_initialized",
|
|
"latency_threshold_ms": self.latency_threshold,
|
|
"concordance_threshold": self.concordance_threshold,
|
|
"correction_rate_threshold": self.correction_rate_threshold
|
|
})
|
|
|
|
def track_latency(
|
|
self,
|
|
start_time: float,
|
|
end_time: float,
|
|
task_id: Optional[str] = None,
|
|
operation: str = "execution"
|
|
) -> float:
|
|
"""
|
|
Enregistre la latence d'une opération.
|
|
|
|
Args:
|
|
start_time: Timestamp de début
|
|
end_time: Timestamp de fin
|
|
task_id: ID de la tâche (optionnel)
|
|
operation: Type d'opération
|
|
|
|
Returns:
|
|
Latence en millisecondes
|
|
"""
|
|
latency_ms = (end_time - start_time) * 1000
|
|
|
|
# Enregistrer dans les métriques globales
|
|
self.global_metrics["total_latencies"].append(latency_ms)
|
|
|
|
# Enregistrer par tâche si spécifié
|
|
if task_id:
|
|
self.task_metrics[task_id]["latencies"].append(latency_ms)
|
|
|
|
# Logger
|
|
self.logger.log_action({
|
|
"action": "latency_tracked",
|
|
"task_id": task_id,
|
|
"operation": operation,
|
|
"latency_ms": latency_ms,
|
|
"threshold_exceeded": latency_ms > self.latency_threshold
|
|
})
|
|
|
|
# Vérifier le seuil
|
|
if latency_ms > self.latency_threshold:
|
|
self._generate_alert(
|
|
"latency_threshold_exceeded",
|
|
{
|
|
"task_id": task_id,
|
|
"operation": operation,
|
|
"latency_ms": latency_ms,
|
|
"threshold_ms": self.latency_threshold
|
|
}
|
|
)
|
|
|
|
return latency_ms
|
|
|
|
def track_concordance(
|
|
self,
|
|
task_id: str,
|
|
success: bool,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""
|
|
Enregistre le résultat d'une exécution pour calcul de concordance.
|
|
|
|
Args:
|
|
task_id: ID de la tâche
|
|
success: True si succès, False si échec
|
|
metadata: Métadonnées additionnelles
|
|
"""
|
|
# Mettre à jour les compteurs
|
|
self.task_metrics[task_id]["total_executions"] += 1
|
|
self.task_metrics[task_id]["last_execution"] = datetime.now().isoformat()
|
|
self.global_metrics["total_executions"] += 1
|
|
|
|
if success:
|
|
self.task_metrics[task_id]["successes"] += 1
|
|
self.global_metrics["total_successes"] += 1
|
|
else:
|
|
self.task_metrics[task_id]["failures"] += 1
|
|
self.global_metrics["total_failures"] += 1
|
|
|
|
# Calculer le taux de concordance
|
|
concordance_rate = self.get_concordance_rate(task_id)
|
|
|
|
self.logger.log_action({
|
|
"action": "concordance_tracked",
|
|
"task_id": task_id,
|
|
"success": success,
|
|
"concordance_rate": concordance_rate,
|
|
"total_executions": self.task_metrics[task_id]["total_executions"],
|
|
"metadata": metadata
|
|
})
|
|
|
|
# Vérifier le seuil (seulement si assez d'exécutions)
|
|
if self.task_metrics[task_id]["total_executions"] >= 10:
|
|
if concordance_rate < self.concordance_threshold:
|
|
self._generate_alert(
|
|
"concordance_below_threshold",
|
|
{
|
|
"task_id": task_id,
|
|
"concordance_rate": concordance_rate,
|
|
"threshold": self.concordance_threshold,
|
|
"total_executions": self.task_metrics[task_id]["total_executions"]
|
|
}
|
|
)
|
|
|
|
def track_correction_rate(
|
|
self,
|
|
task_id: str,
|
|
correction_made: bool = True
|
|
):
|
|
"""
|
|
Enregistre une correction utilisateur.
|
|
|
|
Args:
|
|
task_id: ID de la tâche
|
|
correction_made: True si correction effectuée
|
|
"""
|
|
if correction_made:
|
|
self.task_metrics[task_id]["corrections"] += 1
|
|
self.global_metrics["total_corrections"] += 1
|
|
|
|
# Calculer le taux de correction
|
|
correction_rate = self.get_correction_rate(task_id)
|
|
|
|
self.logger.log_action({
|
|
"action": "correction_tracked",
|
|
"task_id": task_id,
|
|
"correction_rate": correction_rate,
|
|
"total_corrections": self.task_metrics[task_id]["corrections"],
|
|
"total_executions": self.task_metrics[task_id]["total_executions"]
|
|
})
|
|
|
|
# Vérifier le seuil (seulement si assez d'exécutions)
|
|
if self.task_metrics[task_id]["total_executions"] >= 10:
|
|
if correction_rate > self.correction_rate_threshold:
|
|
self._generate_alert(
|
|
"correction_rate_above_threshold",
|
|
{
|
|
"task_id": task_id,
|
|
"correction_rate": correction_rate,
|
|
"threshold": self.correction_rate_threshold,
|
|
"total_corrections": self.task_metrics[task_id]["corrections"],
|
|
"total_executions": self.task_metrics[task_id]["total_executions"]
|
|
}
|
|
)
|
|
|
|
def check_performance_thresholds(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Vérifie tous les seuils de performance et génère des alertes.
|
|
|
|
Returns:
|
|
Liste des alertes générées
|
|
"""
|
|
alerts = []
|
|
|
|
# Vérifier la latence moyenne globale
|
|
if self.global_metrics["total_latencies"]:
|
|
avg_latency = sum(self.global_metrics["total_latencies"]) / len(
|
|
self.global_metrics["total_latencies"]
|
|
)
|
|
|
|
if avg_latency > self.latency_threshold:
|
|
alert = self._generate_alert(
|
|
"global_latency_high",
|
|
{
|
|
"avg_latency_ms": avg_latency,
|
|
"threshold_ms": self.latency_threshold,
|
|
"num_measurements": len(self.global_metrics["total_latencies"])
|
|
}
|
|
)
|
|
alerts.append(alert)
|
|
|
|
# Vérifier la concordance globale
|
|
if self.global_metrics["total_executions"] > 0:
|
|
global_concordance = (
|
|
self.global_metrics["total_successes"] /
|
|
self.global_metrics["total_executions"]
|
|
)
|
|
|
|
if global_concordance < self.concordance_threshold:
|
|
alert = self._generate_alert(
|
|
"global_concordance_low",
|
|
{
|
|
"concordance_rate": global_concordance,
|
|
"threshold": self.concordance_threshold,
|
|
"total_executions": self.global_metrics["total_executions"]
|
|
}
|
|
)
|
|
alerts.append(alert)
|
|
|
|
# Vérifier le taux de correction global
|
|
if self.global_metrics["total_executions"] > 0:
|
|
global_correction_rate = (
|
|
self.global_metrics["total_corrections"] /
|
|
self.global_metrics["total_executions"]
|
|
)
|
|
|
|
if global_correction_rate > self.correction_rate_threshold:
|
|
alert = self._generate_alert(
|
|
"global_correction_rate_high",
|
|
{
|
|
"correction_rate": global_correction_rate,
|
|
"threshold": self.correction_rate_threshold,
|
|
"total_corrections": self.global_metrics["total_corrections"],
|
|
"total_executions": self.global_metrics["total_executions"]
|
|
}
|
|
)
|
|
alerts.append(alert)
|
|
|
|
# Vérifier chaque tâche
|
|
for task_id, metrics in self.task_metrics.items():
|
|
if metrics["total_executions"] < 10:
|
|
continue # Pas assez de données
|
|
|
|
# Latence moyenne par tâche
|
|
if metrics["latencies"]:
|
|
avg_latency = sum(metrics["latencies"]) / len(metrics["latencies"])
|
|
if avg_latency > self.latency_threshold:
|
|
alert = self._generate_alert(
|
|
"task_latency_high",
|
|
{
|
|
"task_id": task_id,
|
|
"avg_latency_ms": avg_latency,
|
|
"threshold_ms": self.latency_threshold
|
|
}
|
|
)
|
|
alerts.append(alert)
|
|
|
|
return alerts
|
|
|
|
def get_concordance_rate(self, task_id: str) -> float:
|
|
"""
|
|
Calcule le taux de concordance pour une tâche.
|
|
|
|
Args:
|
|
task_id: ID de la tâche
|
|
|
|
Returns:
|
|
Taux de concordance (0.0 à 1.0)
|
|
"""
|
|
metrics = self.task_metrics[task_id]
|
|
total = metrics["total_executions"]
|
|
|
|
if total == 0:
|
|
return 0.0
|
|
|
|
return metrics["successes"] / total
|
|
|
|
def get_correction_rate(self, task_id: str) -> float:
|
|
"""
|
|
Calcule le taux de correction pour une tâche.
|
|
|
|
Args:
|
|
task_id: ID de la tâche
|
|
|
|
Returns:
|
|
Taux de correction (0.0 à 1.0)
|
|
"""
|
|
metrics = self.task_metrics[task_id]
|
|
total = metrics["total_executions"]
|
|
|
|
if total == 0:
|
|
return 0.0
|
|
|
|
return metrics["corrections"] / total
|
|
|
|
def get_average_latency(
|
|
self,
|
|
task_id: Optional[str] = None,
|
|
window_size: Optional[int] = None
|
|
) -> float:
|
|
"""
|
|
Calcule la latence moyenne.
|
|
|
|
Args:
|
|
task_id: ID de la tâche (None pour global)
|
|
window_size: Nombre de dernières mesures à considérer
|
|
|
|
Returns:
|
|
Latence moyenne en millisecondes
|
|
"""
|
|
if task_id:
|
|
latencies = self.task_metrics[task_id]["latencies"]
|
|
else:
|
|
latencies = self.global_metrics["total_latencies"]
|
|
|
|
if not latencies:
|
|
return 0.0
|
|
|
|
if window_size:
|
|
latencies = latencies[-window_size:]
|
|
|
|
return sum(latencies) / len(latencies)
|
|
|
|
def get_task_metrics(self, task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Retourne les métriques d'une tâche.
|
|
|
|
Args:
|
|
task_id: ID de la tâche
|
|
|
|
Returns:
|
|
Dictionnaire de métriques
|
|
"""
|
|
metrics = self.task_metrics[task_id]
|
|
|
|
return {
|
|
"task_id": task_id,
|
|
"total_executions": metrics["total_executions"],
|
|
"successes": metrics["successes"],
|
|
"failures": metrics["failures"],
|
|
"corrections": metrics["corrections"],
|
|
"concordance_rate": self.get_concordance_rate(task_id),
|
|
"correction_rate": self.get_correction_rate(task_id),
|
|
"avg_latency_ms": self.get_average_latency(task_id),
|
|
"last_execution": metrics["last_execution"]
|
|
}
|
|
|
|
def get_global_metrics(self) -> Dict[str, Any]:
|
|
"""
|
|
Retourne les métriques globales.
|
|
|
|
Returns:
|
|
Dictionnaire de métriques globales
|
|
"""
|
|
total_exec = self.global_metrics["total_executions"]
|
|
|
|
return {
|
|
"total_executions": total_exec,
|
|
"total_successes": self.global_metrics["total_successes"],
|
|
"total_failures": self.global_metrics["total_failures"],
|
|
"total_corrections": self.global_metrics["total_corrections"],
|
|
"global_concordance_rate": (
|
|
self.global_metrics["total_successes"] / total_exec
|
|
if total_exec > 0 else 0.0
|
|
),
|
|
"global_correction_rate": (
|
|
self.global_metrics["total_corrections"] / total_exec
|
|
if total_exec > 0 else 0.0
|
|
),
|
|
"avg_latency_ms": self.get_average_latency(),
|
|
"alerts_generated": self.global_metrics["alerts_generated"],
|
|
"num_tasks_tracked": len(self.task_metrics)
|
|
}
|
|
|
|
def get_alerts(
|
|
self,
|
|
limit: int = 50,
|
|
alert_type: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Retourne l'historique des alertes.
|
|
|
|
Args:
|
|
limit: Nombre maximum d'alertes à retourner
|
|
alert_type: Filtrer par type d'alerte
|
|
|
|
Returns:
|
|
Liste des alertes
|
|
"""
|
|
alerts = self.alerts
|
|
|
|
if alert_type:
|
|
alerts = [a for a in alerts if a["type"] == alert_type]
|
|
|
|
return alerts[-limit:]
|
|
|
|
def _generate_alert(
|
|
self,
|
|
alert_type: str,
|
|
data: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Génère une alerte.
|
|
|
|
Args:
|
|
alert_type: Type d'alerte
|
|
data: Données de l'alerte
|
|
|
|
Returns:
|
|
Alerte générée
|
|
"""
|
|
alert = {
|
|
"type": alert_type,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": data
|
|
}
|
|
|
|
self.alerts.append(alert)
|
|
self.global_metrics["alerts_generated"] += 1
|
|
|
|
self.logger.log_action({
|
|
"action": "alert_generated",
|
|
**alert
|
|
})
|
|
|
|
return alert
|
|
|
|
def reset_metrics(self, task_id: Optional[str] = None):
|
|
"""
|
|
Réinitialise les métriques.
|
|
|
|
Args:
|
|
task_id: ID de la tâche (None pour tout réinitialiser)
|
|
"""
|
|
if task_id:
|
|
if task_id in self.task_metrics:
|
|
del self.task_metrics[task_id]
|
|
self.logger.log_action({
|
|
"action": "task_metrics_reset",
|
|
"task_id": task_id
|
|
})
|
|
else:
|
|
self.task_metrics.clear()
|
|
self.global_metrics = {
|
|
"total_latencies": [],
|
|
"total_successes": 0,
|
|
"total_failures": 0,
|
|
"total_corrections": 0,
|
|
"total_executions": 0,
|
|
"alerts_generated": 0
|
|
}
|
|
self.alerts.clear()
|
|
self.logger.log_action({
|
|
"action": "all_metrics_reset"
|
|
})
|