Files
rpa_vision_v3/core/analytics/analytics_system.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

198 lines
6.4 KiB
Python

"""Integrated analytics system."""
import logging
from typing import Optional
from pathlib import Path
from .collection.metrics_collector import MetricsCollector
from .collection.resource_collector import ResourceCollector
from .storage.timeseries_store import TimeSeriesStore
from .storage.archive_storage import ArchiveStorage, RetentionPolicyEngine
from .engine.performance_analyzer import PerformanceAnalyzer
from .engine.anomaly_detector import AnomalyDetector
from .engine.insight_generator import InsightGenerator
from .engine.success_rate_calculator import SuccessRateCalculator
from .query.query_engine import QueryEngine
from .realtime.realtime_analytics import RealtimeAnalytics
from .reporting.report_generator import ReportGenerator
from .dashboard.dashboard_manager import DashboardManager
from .api.analytics_api import AnalyticsAPI
logger = logging.getLogger(__name__)
class AnalyticsSystem:
"""Integrated analytics system."""
def __init__(
self,
db_path: str = "data/analytics/metrics.db",
archive_dir: str = "data/analytics/archive",
reports_dir: str = "data/analytics/reports",
dashboards_dir: str = "data/analytics/dashboards"
):
"""
Initialize analytics system.
Args:
db_path: Path to metrics database
archive_dir: Directory for archived data
reports_dir: Directory for reports
dashboards_dir: Directory for dashboards
"""
logger.info("Initializing AnalyticsSystem...")
# Storage layer
self.store = TimeSeriesStore(db_path)
self.archive = ArchiveStorage(archive_dir)
self.retention_engine = RetentionPolicyEngine(self.archive)
# Collection layer
self.metrics_collector = MetricsCollector(self.store)
self.resource_collector = ResourceCollector(self.store)
# Analysis layer
self.performance_analyzer = PerformanceAnalyzer(self.store)
self.anomaly_detector = AnomalyDetector(self.store)
self.insight_generator = InsightGenerator(
self.performance_analyzer,
self.anomaly_detector
)
self.success_rate_calculator = SuccessRateCalculator(self.store)
# Query layer
self.query_engine = QueryEngine(self.store)
self.realtime_analytics = RealtimeAnalytics(self.metrics_collector)
# Reporting layer
self.report_generator = ReportGenerator(
self.query_engine,
self.performance_analyzer,
self.insight_generator,
reports_dir
)
# Dashboard layer
self.dashboard_manager = DashboardManager(dashboards_dir)
# API layer
self.api = AnalyticsAPI(
self.query_engine,
self.performance_analyzer,
self.anomaly_detector,
self.insight_generator,
self.success_rate_calculator,
self.report_generator,
self.dashboard_manager
)
logger.info("AnalyticsSystem initialized successfully")
def start_resource_monitoring(
self,
interval_seconds: int = 60
) -> None:
"""
Start resource monitoring.
Args:
interval_seconds: Monitoring interval in seconds
"""
self.resource_collector.start_monitoring(interval_seconds)
logger.info(f"Resource monitoring started (interval: {interval_seconds}s)")
def stop_resource_monitoring(self) -> None:
"""Stop resource monitoring."""
self.resource_collector.stop_monitoring()
logger.info("Resource monitoring stopped")
def apply_retention_policies(self, dry_run: bool = False) -> dict:
"""
Apply retention policies.
Args:
dry_run: If True, don't actually delete data
Returns:
Dictionary with application results
"""
results = self.retention_engine.apply_policies(self.store, dry_run)
logger.info(f"Retention policies applied (dry_run={dry_run})")
return results
def get_system_stats(self) -> dict:
"""
Get system statistics.
Returns:
Dictionary with system stats
"""
return {
'storage': {
'metrics_count': self.store.get_metrics_count(),
'database_size': Path(self.store.db_path).stat().st_size if Path(self.store.db_path).exists() else 0
},
'archive': self.archive.get_archive_stats(),
'collectors': {
'metrics_buffer_size': len(self.metrics_collector.buffer),
'resource_monitoring_active': self.resource_collector.monitoring_active
},
'dashboards': {
'total': len(self.dashboard_manager.dashboards)
},
'reports': {
'scheduled': len(self.report_generator.scheduled_reports)
}
}
def shutdown(self) -> None:
"""Shutdown analytics system."""
logger.info("Shutting down AnalyticsSystem...")
# Stop monitoring
if self.resource_collector.monitoring_active:
self.stop_resource_monitoring()
# Flush any pending metrics
self.metrics_collector.flush()
# Close database connection
self.store.close()
logger.info("AnalyticsSystem shutdown complete")
# Global instance
_analytics_system: Optional[AnalyticsSystem] = None
def get_analytics_system(
db_path: str = "data/analytics/metrics.db",
archive_dir: str = "data/analytics/archive",
reports_dir: str = "data/analytics/reports",
dashboards_dir: str = "data/analytics/dashboards"
) -> AnalyticsSystem:
"""
Get or create global analytics system instance.
Args:
db_path: Path to metrics database
archive_dir: Directory for archived data
reports_dir: Directory for reports
dashboards_dir: Directory for dashboards
Returns:
AnalyticsSystem instance
"""
global _analytics_system
if _analytics_system is None:
_analytics_system = AnalyticsSystem(
db_path=db_path,
archive_dir=archive_dir,
reports_dir=reports_dir,
dashboards_dir=dashboards_dir
)
return _analytics_system