- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
10 KiB
10 KiB
🚀 RPA Analytics - Quick Start Guide
Overview
Le système RPA Analytics fournit une suite complète d'outils pour analyser, monitorer et optimiser vos workflows RPA.
Installation
Les dépendances analytics sont déjà incluses dans requirements.txt. Aucune installation supplémentaire n'est nécessaire.
Démarrage Rapide
1. Initialisation du Système
from core.analytics.analytics_system import get_analytics_system
# Initialiser le système analytics
analytics = get_analytics_system()
# Démarrer le monitoring des ressources
analytics.start_resource_monitoring(interval_seconds=60)
2. Collecter des Métriques
from core.analytics.collection.metrics_collector import ExecutionMetrics
from datetime import datetime, timedelta
# Enregistrer une exécution de workflow
execution = ExecutionMetrics(
execution_id="exec_001",
workflow_id="my_workflow",
started_at=datetime.now(),
completed_at=datetime.now() + timedelta(seconds=30),
duration=30.0,
status="success"
)
analytics.metrics_collector.record_execution(execution)
analytics.metrics_collector.flush() # Persister immédiatement
3. Analyser les Performances
from datetime import datetime, timedelta
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
# Analyser les performances
perf_stats = analytics.performance_analyzer.analyze_performance(
workflow_id="my_workflow",
start_time=start_time,
end_time=end_time
)
print(f"Average Duration: {perf_stats.avg_duration:.2f}s")
print(f"Success Rate: {perf_stats.success_rate:.1f}%")
print(f"P95 Duration: {perf_stats.p95_duration:.2f}s")
# Identifier les bottlenecks
bottlenecks = analytics.performance_analyzer.identify_bottlenecks(
workflow_id="my_workflow",
start_time=start_time,
end_time=end_time
)
for bottleneck in bottlenecks:
print(f"Bottleneck: {bottleneck.node_id} - {bottleneck.avg_duration:.2f}s")
4. Détecter les Anomalies
# Détecter les anomalies
anomalies = analytics.anomaly_detector.detect_anomalies(
workflow_id="my_workflow",
start_time=start_time,
end_time=end_time
)
for anomaly in anomalies:
print(f"Anomaly: {anomaly.metric_name}")
print(f" Severity: {anomaly.severity:.2f}")
print(f" Expected: {anomaly.expected_value:.2f}")
print(f" Actual: {anomaly.actual_value:.2f}")
5. Générer des Insights
# Générer des insights automatiques
insights = analytics.insight_generator.generate_insights(
start_time=start_time,
end_time=end_time
)
for insight in insights:
print(f"\n{insight.title}")
print(f" {insight.description}")
print(f" Priority: {insight.priority_score:.2f}")
print(f" Impact: {insight.expected_impact}")
6. Calculer les Taux de Succès
# Calculer le taux de succès
success_stats = analytics.success_rate_calculator.calculate_success_rate(
workflow_id="my_workflow",
time_window_hours=24
)
print(f"Success Rate: {success_stats.success_rate:.1f}%")
print(f"Reliability Score: {success_stats.reliability_score:.1f}")
print(f"Total Executions: {success_stats.total_executions}")
# Catégoriser les échecs
for category, count in success_stats.failure_categories.items():
print(f" {category}: {count}")
# Classement de fiabilité
rankings = analytics.success_rate_calculator.rank_workflows_by_reliability(
time_window_hours=168 # 1 semaine
)
for ranking in rankings[:5]:
print(f"{ranking.rank}. {ranking.workflow_id}")
print(f" Reliability: {ranking.reliability_score:.1f}")
print(f" Success Rate: {ranking.success_rate:.1f}%")
7. Tracking en Temps Réel
# Démarrer le tracking d'une exécution
analytics.realtime_analytics.track_execution(
execution_id="live_exec",
workflow_id="my_workflow",
total_steps=10
)
# Mettre à jour la progression
analytics.realtime_analytics.update_progress(
execution_id="live_exec",
current_step=5,
current_node_id="step_5"
)
# Obtenir les métriques live
live_metrics = analytics.realtime_analytics.get_live_metrics("live_exec")
print(f"Progress: {live_metrics['progress_percent']:.1f}%")
print(f"ETA: {live_metrics['estimated_completion']}")
# Compléter l'exécution
analytics.realtime_analytics.complete_execution(
execution_id="live_exec",
status="success"
)
8. Générer des Rapports
from core.analytics.reporting.report_generator import ReportConfig
# Configurer le rapport
config = ReportConfig(
title="Weekly Performance Report",
metric_types=['execution', 'step'],
start_time=start_time,
end_time=end_time,
workflow_ids=['my_workflow'],
include_charts=True,
include_insights=True,
format='html' # ou 'json', 'csv', 'pdf'
)
# Générer et exporter
report_data = analytics.report_generator.generate_report(config)
html_path = analytics.report_generator.export_html(report_data)
print(f"Report generated: {html_path}")
# Programmer un rapport récurrent
from core.analytics.reporting.report_generator import ScheduledReport
scheduled = ScheduledReport(
report_id="weekly_report",
config=config,
schedule_cron="0 9 * * 1", # Tous les lundis à 9h
delivery_method="email",
delivery_config={"to": "admin@example.com"}
)
analytics.report_generator.schedule_report(scheduled)
9. Créer des Dashboards
# Créer un dashboard depuis un template
dashboard = analytics.dashboard_manager.create_dashboard(
name="Performance Dashboard",
description="Monitor workflow performance",
owner="admin",
template_id="performance" # Utilise le template prédéfini
)
# Ajouter un widget personnalisé
widget = analytics.dashboard_manager.add_widget(
dashboard_id=dashboard.dashboard_id,
widget_type="chart",
title="Success Rate Trend",
config={
'chart_type': 'line',
'metric': 'success_rate',
'time_range': '7d'
},
position={'x': 0, 'y': 0, 'width': 6, 'height': 4}
)
# Partager le dashboard
analytics.dashboard_manager.share_dashboard(
dashboard_id=dashboard.dashboard_id,
username="user@example.com"
)
# Rendre public
analytics.dashboard_manager.make_public(
dashboard_id=dashboard.dashboard_id,
is_public=True
)
10. Utiliser l'API REST
from flask import Flask
app = Flask(__name__)
# Enregistrer le blueprint analytics
app.register_blueprint(analytics.api.get_blueprint())
# Démarrer le serveur
app.run(host='0.0.0.0', port=5000)
Endpoints disponibles:
GET /api/analytics/metrics- Récupérer les métriquesGET /api/analytics/performance- Analyse de performanceGET /api/analytics/anomalies- Anomalies détectéesGET /api/analytics/insights- Insights générésGET /api/analytics/success-rate- Taux de succèsPOST /api/analytics/reports- Générer un rapportGET /api/analytics/dashboards- Lister les dashboards
11. Gestion de la Rétention
from core.analytics.storage.archive_storage import RetentionPolicy
# Ajouter une politique de rétention
policy = RetentionPolicy(
metric_type='execution',
hot_retention_days=30, # Garder 30 jours en base active
archive_retention_days=365, # Garder 1 an en archive
compression_enabled=True
)
analytics.retention_engine.add_policy(policy)
# Appliquer les politiques
results = analytics.apply_retention_policies(dry_run=False)
print(f"Archived: {results['archived']}")
print(f"Deleted: {results['deleted']}")
Demo Script
Exécutez le script de démonstration pour voir toutes les fonctionnalités :
python demo_analytics.py
Intégration avec ExecutionLoop
Pour intégrer automatiquement avec vos workflows :
from core.execution.execution_loop import ExecutionLoop
from core.analytics.analytics_system import get_analytics_system
# Dans votre ExecutionLoop
analytics = get_analytics_system()
# Avant l'exécution
execution_id = str(uuid.uuid4())
analytics.realtime_analytics.track_execution(
execution_id=execution_id,
workflow_id=workflow.workflow_id,
total_steps=len(workflow.nodes)
)
# Après chaque step
analytics.realtime_analytics.update_progress(
execution_id=execution_id,
current_step=current_step,
current_node_id=node.node_id
)
# Après l'exécution
from core.analytics.collection.metrics_collector import ExecutionMetrics
execution_metrics = ExecutionMetrics(
execution_id=execution_id,
workflow_id=workflow.workflow_id,
started_at=start_time,
completed_at=end_time,
duration=(end_time - start_time).total_seconds(),
status="success" if success else "failed",
error_message=error_msg if not success else None
)
analytics.metrics_collector.record_execution(execution_metrics)
Statistiques Système
# Obtenir les statistiques du système
stats = analytics.get_system_stats()
print(f"Metrics Count: {stats['storage']['metrics_count']}")
print(f"Database Size: {stats['storage']['database_size']} bytes")
print(f"Archive Stats: {stats['archive']}")
print(f"Dashboards: {stats['dashboards']['total']}")
Arrêt Propre
# Arrêter le système proprement
analytics.shutdown()
Bonnes Pratiques
- Flush régulier: Appelez
flush()après avoir enregistré des métriques importantes - Monitoring des ressources: Activez le monitoring pour suivre l'utilisation CPU/RAM/GPU
- Rétention: Configurez des politiques de rétention adaptées à vos besoins
- Dashboards: Utilisez les templates pour démarrer rapidement
- Rapports: Programmez des rapports récurrents pour un suivi régulier
- Anomalies: Surveillez les anomalies pour détecter les problèmes tôt
- Insights: Agissez sur les insights pour optimiser vos workflows
Troubleshooting
Problème: Métriques non enregistrées
Solution: Appelez analytics.metrics_collector.flush() pour forcer la persistance
Problème: Base de données trop volumineuse
Solution: Configurez et appliquez des politiques de rétention
Problème: Rapports PDF ne se génèrent pas
Solution: Installez reportlab: pip install reportlab
Problème: Monitoring des ressources ne démarre pas
Solution: Vérifiez que psutil est installé: pip install psutil
Support
Pour plus d'informations, consultez:
- Documentation complète:
docs/analytics/ - Exemples:
examples/analytics/ - Tests:
tests/unit/test_analytics*.py
Prochaines Étapes
- Intégrez avec vos workflows existants
- Créez des dashboards personnalisés
- Configurez des rapports automatiques
- Optimisez basé sur les insights
- Surveillez les anomalies en temps réel