Files
rpa_vision_v3/core/supervision/supervisor.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

571 lines
21 KiB
Python

"""
core/supervision/supervisor.py
Main Production Supervisor - Intelligent coordination brain for Fiche #22
Monitors system health, coordinates recovery strategies, and makes intelligent
decisions about when to trigger circuit breakers, rollbacks, or degraded mode.
Integrates with:
- Fiche #21: Production healthchecks and systemd services
- Fiche #19: Failure case recording and analysis
- Fiche #18: Persistent learning system
- Fiche #10: Precision metrics engine
- Core healing engine: Self-healing strategies
"""
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
from .circuit_breaker import CircuitBreaker, CircuitState
from .rollback_manager import RollbackManager
from .degraded_mode import DegradedModeManager
from .production_policy import ProductionPolicy
from ..analytics.engine.performance_analyzer import PerformanceAnalyzer
from ..healing.healing_engine import SelfHealingEngine
from ..evaluation.failure_case_recorder import FailureCaseRecorder
from ..precision.metrics_engine import PrecisionMetricsEngine
logger = logging.getLogger(__name__)
class SupervisionLevel(Enum):
"""Levels of supervision intensity"""
MINIMAL = "minimal" # Basic monitoring only
STANDARD = "standard" # Normal production supervision
INTENSIVE = "intensive" # High-frequency monitoring
EMERGENCY = "emergency" # Crisis mode with aggressive intervention
@dataclass
class SystemHealth:
"""Current system health snapshot"""
timestamp: datetime
overall_score: float # 0.0 to 1.0
api_healthy: bool
dashboard_healthy: bool
worker_healthy: bool
disk_space_ok: bool
failure_rate: float
avg_response_time_ms: float
circuit_breaker_states: Dict[str, CircuitState]
degraded_mode_active: bool
active_issues: List[str] = field(default_factory=list)
@dataclass
class SupervisionEvent:
"""Event in the supervision system"""
timestamp: datetime
event_type: str
severity: str # info, warning, error, critical
component: str
message: str
metadata: Dict[str, Any] = field(default_factory=dict)
class ProductionSupervisor:
"""
Main production supervisor - the intelligent brain of Fiche #22.
Responsibilities:
- Monitor system health across all components
- Coordinate recovery strategies intelligently
- Trigger circuit breakers when needed
- Initiate rollbacks for persistent failures
- Activate degraded mode for unstable contexts
- Learn from supervision decisions
"""
def __init__(
self,
policy: Optional[ProductionPolicy] = None,
data_dir: Path = Path("data"),
supervision_level: SupervisionLevel = SupervisionLevel.STANDARD
):
"""
Initialize the production supervisor.
Args:
policy: Production policy configuration
data_dir: Data directory for storage
supervision_level: Level of supervision intensity
"""
self.policy = policy or ProductionPolicy()
self.data_dir = data_dir
self.supervision_level = supervision_level
# Core components
self.circuit_breaker = CircuitBreaker(
failure_threshold=self.policy.circuit_breaker_failure_threshold,
recovery_timeout=self.policy.circuit_breaker_recovery_timeout,
half_open_max_calls=self.policy.circuit_breaker_half_open_calls
)
self.rollback_manager = RollbackManager(
data_dir / "supervision" / "rollbacks"
)
self.degraded_mode = DegradedModeManager(
data_dir / "supervision" / "degraded_mode"
)
# Integration with existing systems
self.healing_engine: Optional[SelfHealingEngine] = None
self.performance_analyzer: Optional[PerformanceAnalyzer] = None
self.failure_recorder: Optional[FailureCaseRecorder] = None
self.metrics_engine: Optional[PrecisionMetricsEngine] = None
# State tracking
self.current_health: Optional[SystemHealth] = None
self.supervision_events: List[SupervisionEvent] = []
self.last_health_check = datetime.min
self.running = False
# Callbacks for external integration
self.health_check_callback: Optional[Callable[[], Dict[str, Any]]] = None
self.restart_service_callback: Optional[Callable[[str], bool]] = None
logger.info(f"ProductionSupervisor initialized with {supervision_level.value} level")
def integrate_systems(
self,
healing_engine: Optional[SelfHealingEngine] = None,
performance_analyzer: Optional[PerformanceAnalyzer] = None,
failure_recorder: Optional[FailureCaseRecorder] = None,
metrics_engine: Optional[PrecisionMetricsEngine] = None
):
"""
Integrate with existing RPA Vision V3 systems.
Args:
healing_engine: Self-healing engine from core.healing
performance_analyzer: Performance analyzer from core.analytics
failure_recorder: Failure case recorder from core.evaluation
metrics_engine: Precision metrics engine from core.precision
"""
self.healing_engine = healing_engine
self.performance_analyzer = performance_analyzer
self.failure_recorder = failure_recorder
self.metrics_engine = metrics_engine
logger.info("Integrated with existing RPA Vision V3 systems")
def set_callbacks(
self,
health_check_callback: Optional[Callable[[], Dict[str, Any]]] = None,
restart_service_callback: Optional[Callable[[str], bool]] = None
):
"""
Set callbacks for external system integration.
Args:
health_check_callback: Function to perform health checks
restart_service_callback: Function to restart services
"""
self.health_check_callback = health_check_callback
self.restart_service_callback = restart_service_callback
async def start_supervision(self):
"""Start the supervision loop."""
if self.running:
logger.warning("Supervision already running")
return
self.running = True
logger.info("Starting production supervision")
try:
while self.running:
await self._supervision_cycle()
# Sleep based on supervision level
sleep_time = self._get_supervision_interval()
await asyncio.sleep(sleep_time)
except Exception as e:
logger.exception(f"Supervision loop crashed: {e}")
self._record_event(
"supervision_crash",
"critical",
"supervisor",
f"Supervision loop crashed: {e}"
)
finally:
self.running = False
logger.info("Production supervision stopped")
def stop_supervision(self):
"""Stop the supervision loop."""
self.running = False
logger.info("Stopping production supervision")
async def _supervision_cycle(self):
"""Single supervision cycle."""
try:
# 1. Assess system health
health = await self._assess_system_health()
self.current_health = health
# 2. Make supervision decisions
decisions = self._make_supervision_decisions(health)
# 3. Execute decisions
await self._execute_decisions(decisions)
# 4. Update circuit breaker states
self._update_circuit_breakers(health)
# 5. Learn from outcomes
self._learn_from_supervision_cycle(health, decisions)
except Exception as e:
logger.exception(f"Error in supervision cycle: {e}")
self._record_event(
"supervision_error",
"error",
"supervisor",
f"Error in supervision cycle: {e}"
)
async def _assess_system_health(self) -> SystemHealth:
"""Assess overall system health."""
now = datetime.now()
# Get basic health from callback or default
basic_health = {}
if self.health_check_callback:
try:
basic_health = self.health_check_callback()
except Exception as e:
logger.warning(f"Health check callback failed: {e}")
# Default health values
api_healthy = basic_health.get('api_healthy', True)
dashboard_healthy = basic_health.get('dashboard_healthy', True)
worker_healthy = basic_health.get('worker_healthy', True)
disk_space_ok = basic_health.get('disk_space_ok', True)
# Calculate failure rate from recent data
failure_rate = await self._calculate_recent_failure_rate()
# Get average response time
avg_response_time = await self._calculate_avg_response_time()
# Get circuit breaker states
cb_states = {
'main': self.circuit_breaker.state,
'healing': self.circuit_breaker.state # Could have multiple CBs
}
# Calculate overall health score
health_factors = [
1.0 if api_healthy else 0.0,
1.0 if dashboard_healthy else 0.8, # Dashboard less critical
1.0 if worker_healthy else 0.7,
1.0 if disk_space_ok else 0.5,
max(0.0, 1.0 - failure_rate), # Failure rate impact
max(0.0, 1.0 - min(1.0, avg_response_time / 5000.0)) # Response time impact
]
overall_score = sum(health_factors) / len(health_factors)
# Identify active issues
active_issues = []
if not api_healthy:
active_issues.append("API service unhealthy")
if not dashboard_healthy:
active_issues.append("Dashboard service unhealthy")
if not worker_healthy:
active_issues.append("Worker service unhealthy")
if not disk_space_ok:
active_issues.append("Low disk space")
if failure_rate > self.policy.max_acceptable_failure_rate:
active_issues.append(f"High failure rate: {failure_rate:.2%}")
if avg_response_time > self.policy.max_acceptable_response_time_ms:
active_issues.append(f"Slow response time: {avg_response_time:.0f}ms")
health = SystemHealth(
timestamp=now,
overall_score=overall_score,
api_healthy=api_healthy,
dashboard_healthy=dashboard_healthy,
worker_healthy=worker_healthy,
disk_space_ok=disk_space_ok,
failure_rate=failure_rate,
avg_response_time_ms=avg_response_time,
circuit_breaker_states=cb_states,
degraded_mode_active=self.degraded_mode.is_active(),
active_issues=active_issues
)
self.last_health_check = now
return health
def _make_supervision_decisions(self, health: SystemHealth) -> List[Dict[str, Any]]:
"""Make intelligent supervision decisions based on health."""
decisions = []
# Decision 1: Circuit breaker management
if health.failure_rate > self.policy.circuit_breaker_failure_threshold:
if self.circuit_breaker.state == CircuitState.CLOSED:
decisions.append({
'type': 'open_circuit_breaker',
'reason': f'High failure rate: {health.failure_rate:.2%}',
'priority': 'high'
})
# Decision 2: Service restart
if not health.api_healthy and self.restart_service_callback:
decisions.append({
'type': 'restart_service',
'service': 'api',
'reason': 'API service unhealthy',
'priority': 'critical'
})
# Decision 3: Degraded mode activation
if (health.overall_score < self.policy.degraded_mode_threshold and
not self.degraded_mode.is_active()):
decisions.append({
'type': 'activate_degraded_mode',
'reason': f'Low health score: {health.overall_score:.2f}',
'priority': 'medium'
})
# Decision 4: Rollback consideration
if (health.failure_rate > self.policy.rollback_failure_threshold and
self.rollback_manager.has_stable_version()):
decisions.append({
'type': 'consider_rollback',
'reason': f'Persistent high failure rate: {health.failure_rate:.2%}',
'priority': 'high'
})
# Decision 5: Supervision level adjustment
if health.overall_score < 0.5 and self.supervision_level != SupervisionLevel.INTENSIVE:
decisions.append({
'type': 'increase_supervision',
'level': SupervisionLevel.INTENSIVE,
'reason': 'Critical health issues detected',
'priority': 'medium'
})
# Sort by priority
priority_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
decisions.sort(key=lambda d: priority_order.get(d.get('priority', 'low'), 3))
return decisions
async def _execute_decisions(self, decisions: List[Dict[str, Any]]):
"""Execute supervision decisions."""
for decision in decisions:
try:
await self._execute_single_decision(decision)
except Exception as e:
logger.exception(f"Failed to execute decision {decision}: {e}")
self._record_event(
"decision_execution_failed",
"error",
"supervisor",
f"Failed to execute {decision['type']}: {e}",
{'decision': decision}
)
async def _execute_single_decision(self, decision: Dict[str, Any]):
"""Execute a single supervision decision."""
decision_type = decision['type']
if decision_type == 'open_circuit_breaker':
self.circuit_breaker.record_failure()
self._record_event(
"circuit_breaker_opened",
"warning",
"circuit_breaker",
decision['reason']
)
elif decision_type == 'restart_service':
service = decision['service']
if self.restart_service_callback:
success = self.restart_service_callback(service)
if success:
self._record_event(
"service_restarted",
"info",
"supervisor",
f"Successfully restarted {service} service"
)
else:
self._record_event(
"service_restart_failed",
"error",
"supervisor",
f"Failed to restart {service} service"
)
elif decision_type == 'activate_degraded_mode':
await self.degraded_mode.activate(decision['reason'])
self._record_event(
"degraded_mode_activated",
"warning",
"degraded_mode",
decision['reason']
)
elif decision_type == 'consider_rollback':
# This is a consideration, not immediate action
# Could trigger user notification or automatic rollback based on policy
self._record_event(
"rollback_considered",
"warning",
"rollback_manager",
decision['reason']
)
elif decision_type == 'increase_supervision':
old_level = self.supervision_level
self.supervision_level = decision['level']
self._record_event(
"supervision_level_changed",
"info",
"supervisor",
f"Changed supervision level from {old_level.value} to {decision['level'].value}"
)
def _update_circuit_breakers(self, health: SystemHealth):
"""Update circuit breaker states based on health."""
if health.overall_score > 0.8 and health.failure_rate < 0.05:
# System is healthy, record success
self.circuit_breaker.record_success()
def _learn_from_supervision_cycle(
self,
health: SystemHealth,
decisions: List[Dict[str, Any]]
):
"""Learn from supervision decisions and outcomes."""
# This could integrate with the persistent learning system (Fiche #18)
# For now, just log the learning opportunity
if decisions:
logger.debug(f"Learning opportunity: health={health.overall_score:.2f}, "
f"decisions={len(decisions)}")
async def _calculate_recent_failure_rate(self) -> float:
"""Calculate recent failure rate from metrics."""
if not self.performance_analyzer:
return 0.0
try:
# Look at last hour of data
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
# This would need to be implemented based on available metrics
# For now, return a placeholder
return 0.02 # 2% failure rate
except Exception as e:
logger.warning(f"Could not calculate failure rate: {e}")
return 0.0
async def _calculate_avg_response_time(self) -> float:
"""Calculate average response time from metrics."""
if not self.performance_analyzer:
return 1000.0 # Default 1 second
try:
# This would integrate with the performance analyzer
# For now, return a placeholder
return 1500.0 # 1.5 seconds
except Exception as e:
logger.warning(f"Could not calculate response time: {e}")
return 1000.0
def _get_supervision_interval(self) -> float:
"""Get supervision interval based on current level."""
intervals = {
SupervisionLevel.MINIMAL: 300.0, # 5 minutes
SupervisionLevel.STANDARD: 60.0, # 1 minute
SupervisionLevel.INTENSIVE: 15.0, # 15 seconds
SupervisionLevel.EMERGENCY: 5.0 # 5 seconds
}
return intervals.get(self.supervision_level, 60.0)
def _record_event(
self,
event_type: str,
severity: str,
component: str,
message: str,
metadata: Optional[Dict[str, Any]] = None
):
"""Record a supervision event."""
event = SupervisionEvent(
timestamp=datetime.now(),
event_type=event_type,
severity=severity,
component=component,
message=message,
metadata=metadata or {}
)
self.supervision_events.append(event)
# Keep only recent events to prevent memory bloat
max_events = 1000
if len(self.supervision_events) > max_events:
self.supervision_events = self.supervision_events[-max_events:]
# Log the event
log_level = {
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}.get(severity, logging.INFO)
logger.log(log_level, f"[{component}] {message}")
def get_current_status(self) -> Dict[str, Any]:
"""Get current supervision status."""
return {
'supervision_level': self.supervision_level.value,
'running': self.running,
'current_health': self.current_health.__dict__ if self.current_health else None,
'circuit_breaker_state': self.circuit_breaker.state.value,
'degraded_mode_active': self.degraded_mode.is_active(),
'recent_events': [
{
'timestamp': e.timestamp.isoformat(),
'type': e.event_type,
'severity': e.severity,
'component': e.component,
'message': e.message
}
for e in self.supervision_events[-10:] # Last 10 events
]
}
def force_health_check(self) -> Optional[SystemHealth]:
"""Force an immediate health check (synchronous)."""
try:
# Run async health check in sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
health = loop.run_until_complete(self._assess_system_health())
self.current_health = health
return health
finally:
loop.close()
except Exception as e:
logger.exception(f"Force health check failed: {e}")
return None