Files
rpa_vision_v3/docs/guides/ANALYTICS_INTEGRATION_GUIDE.md
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

461 lines
14 KiB
Markdown

# 🔗 Analytics Integration Guide
## Overview
Ce guide explique comment intégrer le système analytics avec l'ExecutionLoop pour une collection automatique des métriques.
## Quick Start
### 1. Initialisation
```python
from core.analytics.integration import get_analytics_integration
# Initialiser l'intégration (une seule fois au démarrage)
analytics = get_analytics_integration(enabled=True)
```
### 2. Intégration dans ExecutionLoop
Modifiez `core/execution/execution_loop.py` pour ajouter les hooks analytics :
```python
from core.analytics.integration import get_analytics_integration
class ExecutionLoop:
def __init__(self, ...):
# ... code existant ...
# Ajouter analytics
self.analytics = get_analytics_integration(enabled=True)
def execute_workflow(self, workflow):
"""Execute workflow with analytics."""
# 1. Démarrer le tracking
execution_id = self.analytics.on_execution_start(
workflow_id=workflow.workflow_id,
total_steps=len(workflow.nodes)
)
started_at = datetime.now()
steps_completed = 0
steps_failed = 0
try:
# 2. Exécuter les steps
for i, node in enumerate(workflow.nodes):
step_start = datetime.now()
# Notifier début du step
self.analytics.on_step_start(
execution_id=execution_id,
node_id=node.node_id,
step_number=i + 1
)
try:
# Exécuter le step
result = self._execute_node(node)
success = result.success
error_msg = None
if success:
steps_completed += 1
else:
steps_failed += 1
error_msg = result.error
except Exception as e:
success = False
error_msg = str(e)
steps_failed += 1
# Notifier fin du step
step_end = datetime.now()
self.analytics.on_step_complete(
execution_id=execution_id,
workflow_id=workflow.workflow_id,
node_id=node.node_id,
action_type=node.action_type,
started_at=step_start,
completed_at=step_end,
duration=(step_end - step_start).total_seconds(),
success=success,
error_message=error_msg
)
# 3. Workflow réussi
completed_at = datetime.now()
self.analytics.on_execution_complete(
execution_id=execution_id,
workflow_id=workflow.workflow_id,
started_at=started_at,
completed_at=completed_at,
duration=(completed_at - started_at).total_seconds(),
status='success',
steps_completed=steps_completed,
steps_failed=steps_failed
)
except Exception as e:
# 4. Workflow échoué
completed_at = datetime.now()
self.analytics.on_execution_complete(
execution_id=execution_id,
workflow_id=workflow.workflow_id,
started_at=started_at,
completed_at=completed_at,
duration=(completed_at - started_at).total_seconds(),
status='failed',
error_message=str(e),
steps_completed=steps_completed,
steps_failed=steps_failed
)
raise
```
### 3. Intégration avec Self-Healing
Si vous utilisez le self-healing, ajoutez le tracking des récupérations :
```python
from core.healing.execution_integration import get_self_healing_integration
class ExecutionLoop:
def _execute_node_with_healing(self, node):
"""Execute node with self-healing and analytics."""
try:
result = self._execute_node(node)
if not result.success:
# Tenter la récupération
recovery_start = datetime.now()
healing = get_self_healing_integration()
recovery = healing.handle_execution_failure(
action_info=node.action_info,
execution_result=result,
workflow_id=self.current_workflow_id,
node_id=node.node_id
)
recovery_end = datetime.now()
# Enregistrer la tentative de récupération
self.analytics.on_recovery_attempt(
execution_id=self.current_execution_id,
workflow_id=self.current_workflow_id,
node_id=node.node_id,
strategy=recovery.strategy_used if recovery else 'none',
success=recovery.success if recovery else False,
duration=(recovery_end - recovery_start).total_seconds()
)
if recovery and recovery.success:
result = recovery
return result
except Exception as e:
logger.error(f"Error executing node: {e}")
raise
```
## Fonctionnalités
### Métriques Collectées Automatiquement
1. **Execution Metrics**
- Durée totale
- Status (success/failed/timeout)
- Nombre de steps complétés/échoués
- Messages d'erreur
2. **Step Metrics**
- Durée par step
- Type d'action
- Success/failure
- Erreurs détaillées
3. **Recovery Metrics** (avec self-healing)
- Stratégie utilisée
- Success/failure
- Durée de récupération
4. **Real-time Tracking**
- Progression en temps réel
- ETA de complétion
- Métriques live
### Accès aux Métriques
```python
# Obtenir les métriques live pendant l'exécution
live_metrics = analytics.get_live_metrics(execution_id)
print(f"Progress: {live_metrics['progress_percent']:.1f}%")
print(f"ETA: {live_metrics['estimated_completion']}")
# Obtenir les statistiques d'un workflow
stats = analytics.get_workflow_stats(
workflow_id="my_workflow",
hours=24 # Dernières 24 heures
)
print(f"Success Rate: {stats['success_rate']['success_rate']:.1f}%")
print(f"Avg Duration: {stats['performance']['avg_duration']:.2f}s")
print(f"P95 Duration: {stats['performance']['p95_duration']:.2f}s")
```
## Configuration
### Activer/Désactiver Analytics
```python
# Désactiver complètement
analytics = get_analytics_integration(enabled=False)
# Activer avec configuration personnalisée
from core.analytics.analytics_system import get_analytics_system
analytics_system = get_analytics_system(
db_path="custom/path/metrics.db",
archive_dir="custom/archive",
reports_dir="custom/reports"
)
```
### Monitoring des Ressources
```python
# Démarrer le monitoring des ressources (CPU, RAM, GPU)
analytics_system = get_analytics_system()
analytics_system.start_resource_monitoring(interval_seconds=60)
# Arrêter le monitoring
analytics_system.stop_resource_monitoring()
```
## Exemple Complet
Voici un exemple complet d'intégration :
```python
from datetime import datetime
from core.analytics.integration import get_analytics_integration
from core.healing.execution_integration import get_self_healing_integration
class EnhancedExecutionLoop:
"""ExecutionLoop with analytics and self-healing."""
def __init__(self):
self.analytics = get_analytics_integration(enabled=True)
self.healing = get_self_healing_integration(enabled=True)
self.current_execution_id = None
self.current_workflow_id = None
def execute_workflow(self, workflow):
"""Execute workflow with full instrumentation."""
# Start tracking
self.current_workflow_id = workflow.workflow_id
self.current_execution_id = self.analytics.on_execution_start(
workflow_id=workflow.workflow_id,
total_steps=len(workflow.nodes)
)
started_at = datetime.now()
steps_completed = 0
steps_failed = 0
try:
for i, node in enumerate(workflow.nodes):
# Execute step with analytics and healing
success = self._execute_step_instrumented(
node, i + 1
)
if success:
steps_completed += 1
else:
steps_failed += 1
# Complete successfully
completed_at = datetime.now()
self.analytics.on_execution_complete(
execution_id=self.current_execution_id,
workflow_id=workflow.workflow_id,
started_at=started_at,
completed_at=completed_at,
duration=(completed_at - started_at).total_seconds(),
status='success',
steps_completed=steps_completed,
steps_failed=steps_failed
)
return True
except Exception as e:
# Complete with failure
completed_at = datetime.now()
self.analytics.on_execution_complete(
execution_id=self.current_execution_id,
workflow_id=workflow.workflow_id,
started_at=started_at,
completed_at=completed_at,
duration=(completed_at - started_at).total_seconds(),
status='failed',
error_message=str(e),
steps_completed=steps_completed,
steps_failed=steps_failed
)
return False
def _execute_step_instrumented(self, node, step_number):
"""Execute step with full instrumentation."""
# Notify step start
self.analytics.on_step_start(
execution_id=self.current_execution_id,
node_id=node.node_id,
step_number=step_number
)
step_start = datetime.now()
try:
# Execute
result = self._execute_node(node)
success = result.success
error_msg = None
# Try healing if failed
if not success:
recovery_start = datetime.now()
recovery = self.healing.handle_execution_failure(
action_info=node.action_info,
execution_result=result,
workflow_id=self.current_workflow_id,
node_id=node.node_id
)
recovery_end = datetime.now()
# Record recovery attempt
self.analytics.on_recovery_attempt(
execution_id=self.current_execution_id,
workflow_id=self.current_workflow_id,
node_id=node.node_id,
strategy=recovery.strategy_used if recovery else 'none',
success=recovery.success if recovery else False,
duration=(recovery_end - recovery_start).total_seconds()
)
if recovery and recovery.success:
success = True
result = recovery
else:
error_msg = result.error
except Exception as e:
success = False
error_msg = str(e)
# Notify step complete
step_end = datetime.now()
self.analytics.on_step_complete(
execution_id=self.current_execution_id,
workflow_id=self.current_workflow_id,
node_id=node.node_id,
action_type=node.action_type,
started_at=step_start,
completed_at=step_end,
duration=(step_end - step_start).total_seconds(),
success=success,
error_message=error_msg
)
return success
def _execute_node(self, node):
"""Execute a single node (implement your logic here)."""
# Your execution logic
pass
```
## Visualisation
Une fois intégré, vous pouvez visualiser les métriques :
```python
# Générer un rapport
from core.analytics.reporting.report_generator import ReportConfig
from datetime import datetime, timedelta
config = ReportConfig(
title="Weekly Performance Report",
metric_types=['execution', 'step'],
start_time=datetime.now() - timedelta(days=7),
end_time=datetime.now(),
workflow_ids=['my_workflow'],
format='html'
)
analytics_system = get_analytics_system()
report_data = analytics_system.report_generator.generate_report(config)
html_path = analytics_system.report_generator.export_html(report_data)
print(f"Report generated: {html_path}")
```
## Troubleshooting
### Analytics ne collecte pas de métriques
1. Vérifiez que l'intégration est activée :
```python
analytics = get_analytics_integration(enabled=True)
print(f"Enabled: {analytics.enabled}")
```
2. Vérifiez les logs :
```python
import logging
logging.basicConfig(level=logging.DEBUG)
```
3. Forcez le flush :
```python
analytics.analytics.metrics_collector.flush()
```
### Métriques manquantes
Assurez-vous d'appeler tous les hooks :
- `on_execution_start()` au début
- `on_step_start()` et `on_step_complete()` pour chaque step
- `on_execution_complete()` à la fin
### Performance
Si la collection impacte les performances :
- Augmentez le buffer size du collector
- Réduisez la fréquence du resource monitoring
- Désactivez le real-time tracking si non nécessaire
## Best Practices
1. **Toujours appeler on_execution_complete()** même en cas d'erreur
2. **Utiliser try/except** autour des hooks analytics pour ne pas casser l'exécution
3. **Flush régulièrement** pour éviter la perte de données
4. **Monitorer les ressources** avec un intervalle raisonnable (60s recommandé)
5. **Archiver régulièrement** les anciennes métriques
## Next Steps
- Consultez `ANALYTICS_QUICKSTART.md` pour plus d'exemples
- Voir `demo_analytics.py` pour une démonstration complète
- Configurez des dashboards personnalisés
- Mettez en place des rapports automatiques