feat(coaching): Implement complete COACHING mode infrastructure

Add comprehensive COACHING mode system with:

Backend:
- core/coaching module with session persistence and metrics
- CoachingSessionPersistence for pause/resume sessions
- CoachingMetricsCollector with learning progress tracking
- REST API blueprint for coaching sessions management
- Execution integration with COACHING mode support

Frontend:
- CoachingPanel component with keyboard shortcuts
- Decision buttons (accept/reject/correct/manual/skip)
- Real-time stats display and correction editor
- CorrectionPacksDashboard for pack visualization
- WebSocket hooks for real-time COACHING events

Metrics & Monitoring:
- WorkflowLearningMetrics with confidence scoring
- GlobalCoachingMetrics for system-wide analytics
- AUTO mode readiness detection (85% acceptance threshold)
- Learning progress levels (OBSERVATION → COACHING → AUTO)

Tests:
- E2E tests for complete OBSERVATION → AUTO journey
- Session persistence and recovery tests
- Metrics threshold validation tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-19 08:40:54 +01:00
parent d6e2530f2a
commit 38a1a5ddd8
21 changed files with 7269 additions and 0 deletions

40
core/coaching/__init__.py Normal file
View File

@@ -0,0 +1,40 @@
"""
COACHING Mode Module
Provides functionality for COACHING mode including:
- Session persistence and recovery
- Session state management
- Statistics tracking
- Metrics and monitoring
"""
from .session_persistence import (
CoachingSessionPersistence,
CoachingSessionState,
get_coaching_persistence,
SessionStatus,
CoachingDecisionRecord,
)
from .metrics import (
CoachingMetricsCollector,
WorkflowLearningMetrics,
GlobalCoachingMetrics,
LearningProgress,
get_metrics_collector,
)
__all__ = [
# Session persistence
'CoachingSessionPersistence',
'CoachingSessionState',
'get_coaching_persistence',
'SessionStatus',
'CoachingDecisionRecord',
# Metrics
'CoachingMetricsCollector',
'WorkflowLearningMetrics',
'GlobalCoachingMetrics',
'LearningProgress',
'get_metrics_collector',
]

462
core/coaching/metrics.py Normal file
View File

@@ -0,0 +1,462 @@
"""
COACHING Metrics Module
Provides comprehensive metrics and monitoring for COACHING mode:
- Session statistics aggregation
- Learning progress tracking
- Performance analytics
- Recommendations for mode transitions
"""
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from enum import Enum
from .session_persistence import (
CoachingSessionPersistence,
CoachingSessionState,
SessionStatus,
get_coaching_persistence
)
class LearningProgress(str, Enum):
"""Learning progress levels for workflow."""
NOT_STARTED = "not_started"
OBSERVATION = "observation" # Still collecting data
LEARNING = "learning" # Actively learning from corrections
COACHING = "coaching" # User coaching mode
READY_FOR_AUTO = "ready" # Ready for autonomous mode
AUTONOMOUS = "autonomous" # Running autonomously
@dataclass
class WorkflowLearningMetrics:
"""Metrics for a single workflow's learning progress."""
workflow_id: str
total_sessions: int = 0
completed_sessions: int = 0
total_steps_coached: int = 0
total_decisions: int = 0
accepted: int = 0
rejected: int = 0
corrected: int = 0
manual_executions: int = 0
skipped: int = 0
# Computed metrics
acceptance_rate: float = 0.0
correction_rate: float = 0.0
completion_rate: float = 0.0
# Time metrics
avg_session_duration_seconds: float = 0.0
avg_decision_time_seconds: float = 0.0
# Learning progress
learning_progress: LearningProgress = LearningProgress.NOT_STARTED
confidence_score: float = 0.0
ready_for_auto: bool = False
# Recommendations
recommendations: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
'workflow_id': self.workflow_id,
'total_sessions': self.total_sessions,
'completed_sessions': self.completed_sessions,
'total_steps_coached': self.total_steps_coached,
'total_decisions': self.total_decisions,
'accepted': self.accepted,
'rejected': self.rejected,
'corrected': self.corrected,
'manual_executions': self.manual_executions,
'skipped': self.skipped,
'acceptance_rate': self.acceptance_rate,
'correction_rate': self.correction_rate,
'completion_rate': self.completion_rate,
'avg_session_duration_seconds': self.avg_session_duration_seconds,
'avg_decision_time_seconds': self.avg_decision_time_seconds,
'learning_progress': self.learning_progress.value,
'confidence_score': self.confidence_score,
'ready_for_auto': self.ready_for_auto,
'recommendations': self.recommendations
}
@dataclass
class GlobalCoachingMetrics:
"""Global metrics across all workflows."""
total_workflows: int = 0
total_sessions: int = 0
active_sessions: int = 0
completed_sessions: int = 0
failed_sessions: int = 0
total_decisions: int = 0
total_accepted: int = 0
total_rejected: int = 0
total_corrected: int = 0
overall_acceptance_rate: float = 0.0
overall_correction_rate: float = 0.0
workflows_ready_for_auto: int = 0
workflows_in_learning: int = 0
# Time-based metrics
sessions_last_24h: int = 0
decisions_last_24h: int = 0
# Top workflows
top_workflows_by_sessions: List[Tuple[str, int]] = field(default_factory=list)
top_workflows_by_corrections: List[Tuple[str, int]] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
'total_workflows': self.total_workflows,
'total_sessions': self.total_sessions,
'active_sessions': self.active_sessions,
'completed_sessions': self.completed_sessions,
'failed_sessions': self.failed_sessions,
'total_decisions': self.total_decisions,
'total_accepted': self.total_accepted,
'total_rejected': self.total_rejected,
'total_corrected': self.total_corrected,
'overall_acceptance_rate': self.overall_acceptance_rate,
'overall_correction_rate': self.overall_correction_rate,
'workflows_ready_for_auto': self.workflows_ready_for_auto,
'workflows_in_learning': self.workflows_in_learning,
'sessions_last_24h': self.sessions_last_24h,
'decisions_last_24h': self.decisions_last_24h,
'top_workflows_by_sessions': self.top_workflows_by_sessions,
'top_workflows_by_corrections': self.top_workflows_by_corrections
}
class CoachingMetricsCollector:
"""
Collector and analyzer for COACHING metrics.
Provides methods to:
- Calculate workflow-specific learning metrics
- Determine readiness for autonomous mode
- Generate recommendations for improvement
- Track global system health
"""
# Thresholds for auto mode readiness
MIN_SESSIONS_FOR_AUTO = 5
MIN_ACCEPTANCE_RATE_FOR_AUTO = 0.85
MAX_CORRECTION_RATE_FOR_AUTO = 0.10
MIN_CONFIDENCE_FOR_AUTO = 0.80
def __init__(self, persistence: Optional[CoachingSessionPersistence] = None):
"""
Initialize metrics collector.
Args:
persistence: Session persistence instance
"""
self.persistence = persistence or get_coaching_persistence()
def get_workflow_metrics(self, workflow_id: str) -> WorkflowLearningMetrics:
"""
Calculate comprehensive metrics for a workflow.
Args:
workflow_id: Workflow ID
Returns:
WorkflowLearningMetrics with all computed values
"""
# Get all sessions for this workflow
sessions = self.persistence.list_sessions(workflow_id=workflow_id, limit=1000)
metrics = WorkflowLearningMetrics(workflow_id=workflow_id)
metrics.total_sessions = len(sessions)
if not sessions:
metrics.learning_progress = LearningProgress.NOT_STARTED
metrics.recommendations = ["Demarrez une premiere session COACHING"]
return metrics
# Load full sessions for detailed analysis
full_sessions: List[CoachingSessionState] = []
for session_info in sessions:
session = self.persistence.load_session(session_info['session_id'])
if session:
full_sessions.append(session)
# Calculate basic stats
total_duration = 0.0
for session in full_sessions:
if session.status == SessionStatus.COMPLETED:
metrics.completed_sessions += 1
# Aggregate decision stats
metrics.total_steps_coached += len(session.decisions)
metrics.total_decisions += session.stats.get('suggestions_made', 0)
metrics.accepted += session.stats.get('accepted', 0)
metrics.rejected += session.stats.get('rejected', 0)
metrics.corrected += session.stats.get('corrected', 0)
metrics.manual_executions += session.stats.get('manual_executions', 0)
metrics.skipped += session.stats.get('skipped', 0)
# Calculate duration
if session.started_at and session.completed_at:
try:
start = datetime.fromisoformat(session.started_at)
end = datetime.fromisoformat(session.completed_at)
total_duration += (end - start).total_seconds()
except:
pass
# Calculate rates
total_decisions = metrics.accepted + metrics.rejected + metrics.corrected
if total_decisions > 0:
metrics.acceptance_rate = metrics.accepted / total_decisions
metrics.correction_rate = metrics.corrected / total_decisions
if metrics.total_sessions > 0:
metrics.completion_rate = metrics.completed_sessions / metrics.total_sessions
if metrics.completed_sessions > 0:
metrics.avg_session_duration_seconds = total_duration / metrics.completed_sessions
if metrics.total_decisions > 0 and total_duration > 0:
metrics.avg_decision_time_seconds = total_duration / metrics.total_decisions
# Determine learning progress
metrics.learning_progress = self._determine_learning_progress(metrics)
# Calculate confidence score
metrics.confidence_score = self._calculate_confidence_score(metrics)
# Check if ready for auto
metrics.ready_for_auto = self._check_ready_for_auto(metrics)
# Generate recommendations
metrics.recommendations = self._generate_recommendations(metrics)
return metrics
def get_global_metrics(self) -> GlobalCoachingMetrics:
"""
Calculate global metrics across all workflows.
Returns:
GlobalCoachingMetrics with aggregated data
"""
metrics = GlobalCoachingMetrics()
# Get all sessions
all_sessions = self.persistence.list_sessions(limit=10000)
metrics.total_sessions = len(all_sessions)
# Track unique workflows
workflow_stats: Dict[str, Dict] = {}
now = datetime.now()
last_24h = now - timedelta(hours=24)
for session_info in all_sessions:
workflow_id = session_info.get('workflow_id', 'unknown')
status = session_info.get('status', 'unknown')
# Initialize workflow stats
if workflow_id not in workflow_stats:
workflow_stats[workflow_id] = {
'sessions': 0,
'corrections': 0
}
workflow_stats[workflow_id]['sessions'] += 1
# Count by status
if status == 'active':
metrics.active_sessions += 1
elif status == 'completed':
metrics.completed_sessions += 1
elif status == 'failed':
metrics.failed_sessions += 1
# Check last 24h
try:
updated_at = datetime.fromisoformat(session_info.get('updated_at', ''))
if updated_at > last_24h:
metrics.sessions_last_24h += 1
except:
pass
# Load full session for decision stats
session = self.persistence.load_session(session_info['session_id'])
if session:
metrics.total_decisions += session.stats.get('suggestions_made', 0)
metrics.total_accepted += session.stats.get('accepted', 0)
metrics.total_rejected += session.stats.get('rejected', 0)
metrics.total_corrected += session.stats.get('corrected', 0)
workflow_stats[workflow_id]['corrections'] += session.stats.get('corrected', 0)
# Decisions in last 24h
for decision in session.decisions:
try:
decision_time = datetime.fromisoformat(decision.timestamp)
if decision_time > last_24h:
metrics.decisions_last_24h += 1
except:
pass
metrics.total_workflows = len(workflow_stats)
# Calculate overall rates
total_decided = metrics.total_accepted + metrics.total_rejected + metrics.total_corrected
if total_decided > 0:
metrics.overall_acceptance_rate = metrics.total_accepted / total_decided
metrics.overall_correction_rate = metrics.total_corrected / total_decided
# Count workflows by learning state
for workflow_id in workflow_stats:
wf_metrics = self.get_workflow_metrics(workflow_id)
if wf_metrics.ready_for_auto:
metrics.workflows_ready_for_auto += 1
elif wf_metrics.learning_progress in [LearningProgress.LEARNING, LearningProgress.COACHING]:
metrics.workflows_in_learning += 1
# Top workflows
sorted_by_sessions = sorted(
workflow_stats.items(),
key=lambda x: x[1]['sessions'],
reverse=True
)[:5]
metrics.top_workflows_by_sessions = [
(wf_id, stats['sessions']) for wf_id, stats in sorted_by_sessions
]
sorted_by_corrections = sorted(
workflow_stats.items(),
key=lambda x: x[1]['corrections'],
reverse=True
)[:5]
metrics.top_workflows_by_corrections = [
(wf_id, stats['corrections']) for wf_id, stats in sorted_by_corrections
]
return metrics
def _determine_learning_progress(self, metrics: WorkflowLearningMetrics) -> LearningProgress:
"""Determine the learning progress level."""
if metrics.total_sessions == 0:
return LearningProgress.NOT_STARTED
if metrics.total_sessions < 3:
return LearningProgress.OBSERVATION
if metrics.acceptance_rate < 0.5:
return LearningProgress.LEARNING
if metrics.acceptance_rate >= self.MIN_ACCEPTANCE_RATE_FOR_AUTO and \
metrics.correction_rate <= self.MAX_CORRECTION_RATE_FOR_AUTO and \
metrics.total_sessions >= self.MIN_SESSIONS_FOR_AUTO:
return LearningProgress.READY_FOR_AUTO
return LearningProgress.COACHING
def _calculate_confidence_score(self, metrics: WorkflowLearningMetrics) -> float:
"""Calculate overall confidence score (0-1)."""
if metrics.total_decisions == 0:
return 0.0
# Weighted factors
acceptance_weight = 0.4
correction_weight = 0.3
completion_weight = 0.2
volume_weight = 0.1
# Acceptance component (higher is better)
acceptance_score = metrics.acceptance_rate
# Correction component (lower is better)
correction_score = max(0, 1 - metrics.correction_rate * 2)
# Completion component
completion_score = metrics.completion_rate
# Volume component (normalized, caps at 10 sessions)
volume_score = min(1, metrics.total_sessions / 10)
confidence = (
acceptance_weight * acceptance_score +
correction_weight * correction_score +
completion_weight * completion_score +
volume_weight * volume_score
)
return round(confidence, 3)
def _check_ready_for_auto(self, metrics: WorkflowLearningMetrics) -> bool:
"""Check if workflow is ready for autonomous mode."""
return (
metrics.total_sessions >= self.MIN_SESSIONS_FOR_AUTO and
metrics.acceptance_rate >= self.MIN_ACCEPTANCE_RATE_FOR_AUTO and
metrics.correction_rate <= self.MAX_CORRECTION_RATE_FOR_AUTO and
metrics.confidence_score >= self.MIN_CONFIDENCE_FOR_AUTO
)
def _generate_recommendations(self, metrics: WorkflowLearningMetrics) -> List[str]:
"""Generate actionable recommendations."""
recommendations = []
if metrics.total_sessions == 0:
recommendations.append("Demarrez votre premiere session COACHING pour commencer l'apprentissage")
return recommendations
if metrics.total_sessions < self.MIN_SESSIONS_FOR_AUTO:
remaining = self.MIN_SESSIONS_FOR_AUTO - metrics.total_sessions
recommendations.append(f"Completez {remaining} session(s) supplementaire(s) pour atteindre le minimum requis")
if metrics.acceptance_rate < self.MIN_ACCEPTANCE_RATE_FOR_AUTO:
current_pct = round(metrics.acceptance_rate * 100, 1)
target_pct = round(self.MIN_ACCEPTANCE_RATE_FOR_AUTO * 100, 1)
recommendations.append(
f"Ameliorez le taux d'acceptation de {current_pct}% a {target_pct}% "
"en ajustant les selecteurs d'elements"
)
if metrics.correction_rate > self.MAX_CORRECTION_RATE_FOR_AUTO:
recommendations.append(
"Le taux de correction est eleve. Verifiez les elements visuels "
"qui necessitent souvent des corrections"
)
if metrics.rejected > metrics.total_sessions * 2:
recommendations.append(
"Beaucoup d'actions rejetees. Revisez le workflow pour supprimer "
"les etapes incorrectes"
)
if metrics.manual_executions > metrics.total_decisions * 0.1:
recommendations.append(
"Plusieurs executions manuelles detectees. Considerez automatiser "
"ces actions frequentes"
)
if metrics.ready_for_auto:
recommendations.append(
"Ce workflow est pret pour le mode autonome ! "
"Vous pouvez le passer en mode AUTO"
)
return recommendations
# Singleton instance
_metrics_collector: Optional[CoachingMetricsCollector] = None
def get_metrics_collector(persistence: Optional[CoachingSessionPersistence] = None) -> CoachingMetricsCollector:
"""Get or create the global metrics collector."""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = CoachingMetricsCollector(persistence)
return _metrics_collector

View File

@@ -0,0 +1,553 @@
"""
COACHING Session Persistence Module
Provides persistence layer for COACHING sessions to enable:
- Save session state for recovery after interruption
- Resume sessions from last known state
- Track session history and statistics
"""
import json
import os
import shutil
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Any
import threading
import uuid
class SessionStatus(str, Enum):
"""Status of a COACHING session."""
ACTIVE = "active"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
ABANDONED = "abandoned"
@dataclass
class CoachingDecisionRecord:
"""Record of a single coaching decision."""
step_index: int
node_id: str
action_type: str
decision: str # accept, reject, correct, manual, skip
correction: Optional[Dict[str, Any]] = None
feedback: Optional[str] = None
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
execution_success: Optional[bool] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CoachingDecisionRecord':
return cls(**data)
@dataclass
class CoachingSessionState:
"""
Complete state of a COACHING session.
This state can be persisted and recovered to resume an interrupted session.
"""
session_id: str
workflow_id: str
execution_id: str
status: SessionStatus = SessionStatus.ACTIVE
current_step_index: int = 0
total_steps: int = 0
decisions: List[CoachingDecisionRecord] = field(default_factory=list)
stats: Dict[str, int] = field(default_factory=lambda: {
'suggestions_made': 0,
'accepted': 0,
'rejected': 0,
'corrected': 0,
'manual_executions': 0,
'skipped': 0
})
variables: Dict[str, Any] = field(default_factory=dict)
started_at: str = field(default_factory=lambda: datetime.now().isoformat())
updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
completed_at: Optional[str] = None
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
data = {
'session_id': self.session_id,
'workflow_id': self.workflow_id,
'execution_id': self.execution_id,
'status': self.status.value if isinstance(self.status, SessionStatus) else self.status,
'current_step_index': self.current_step_index,
'total_steps': self.total_steps,
'decisions': [d.to_dict() for d in self.decisions],
'stats': self.stats,
'variables': self.variables,
'started_at': self.started_at,
'updated_at': self.updated_at,
'completed_at': self.completed_at,
'error_message': self.error_message,
'metadata': self.metadata
}
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CoachingSessionState':
decisions = [
CoachingDecisionRecord.from_dict(d)
for d in data.get('decisions', [])
]
status = data.get('status', 'active')
if isinstance(status, str):
status = SessionStatus(status)
return cls(
session_id=data['session_id'],
workflow_id=data['workflow_id'],
execution_id=data['execution_id'],
status=status,
current_step_index=data.get('current_step_index', 0),
total_steps=data.get('total_steps', 0),
decisions=decisions,
stats=data.get('stats', {}),
variables=data.get('variables', {}),
started_at=data.get('started_at', datetime.now().isoformat()),
updated_at=data.get('updated_at', datetime.now().isoformat()),
completed_at=data.get('completed_at'),
error_message=data.get('error_message'),
metadata=data.get('metadata', {})
)
def update_timestamp(self) -> None:
"""Update the updated_at timestamp."""
self.updated_at = datetime.now().isoformat()
def add_decision(self, decision: CoachingDecisionRecord) -> None:
"""Add a decision and update stats."""
self.decisions.append(decision)
self.stats['suggestions_made'] += 1
if decision.decision == 'accept':
self.stats['accepted'] += 1
elif decision.decision == 'reject':
self.stats['rejected'] += 1
elif decision.decision == 'correct':
self.stats['corrected'] += 1
elif decision.decision == 'manual':
self.stats['manual_executions'] += 1
elif decision.decision == 'skip':
self.stats['skipped'] += 1
self.current_step_index += 1
self.update_timestamp()
def get_acceptance_rate(self) -> float:
"""Calculate acceptance rate."""
total = self.stats['accepted'] + self.stats['rejected'] + self.stats['corrected']
if total == 0:
return 0.0
return self.stats['accepted'] / total
def get_correction_rate(self) -> float:
"""Calculate correction rate."""
total = self.stats['accepted'] + self.stats['rejected'] + self.stats['corrected']
if total == 0:
return 0.0
return self.stats['corrected'] / total
def can_resume(self) -> bool:
"""Check if session can be resumed."""
return self.status in [SessionStatus.ACTIVE, SessionStatus.PAUSED]
class CoachingSessionPersistence:
"""
Persistence layer for COACHING sessions.
Handles saving, loading, and managing COACHING session states.
"""
def __init__(self, storage_path: Optional[Path] = None):
"""
Initialize persistence layer.
Args:
storage_path: Path to store session data. Defaults to data/coaching_sessions
"""
if storage_path is None:
storage_path = Path(__file__).parent.parent.parent / 'data' / 'coaching_sessions'
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
# Index file for quick lookup
self._index_file = self.storage_path / 'sessions_index.json'
self._index: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
self._load_index()
def _load_index(self) -> None:
"""Load the sessions index."""
if self._index_file.exists():
try:
with open(self._index_file, 'r') as f:
self._index = json.load(f)
except Exception as e:
print(f"Warning: Could not load sessions index: {e}")
self._index = {}
def _save_index(self) -> None:
"""Save the sessions index."""
try:
temp_file = self._index_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(self._index, f, indent=2)
shutil.move(str(temp_file), str(self._index_file))
except Exception as e:
print(f"Warning: Could not save sessions index: {e}")
def _session_file(self, session_id: str) -> Path:
"""Get the path for a session file."""
return self.storage_path / f"{session_id}.json"
def create_session(
self,
workflow_id: str,
execution_id: str,
total_steps: int = 0,
variables: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> CoachingSessionState:
"""
Create a new COACHING session.
Args:
workflow_id: ID of the workflow being coached
execution_id: ID of the execution
total_steps: Total number of steps in the workflow
variables: Initial variables
metadata: Additional metadata
Returns:
New session state
"""
session_id = f"coaching_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
session = CoachingSessionState(
session_id=session_id,
workflow_id=workflow_id,
execution_id=execution_id,
total_steps=total_steps,
variables=variables or {},
metadata=metadata or {}
)
self.save_session(session)
return session
def save_session(self, session: CoachingSessionState) -> None:
"""
Save a session state to disk.
Args:
session: Session state to save
"""
with self._lock:
session.update_timestamp()
# Save session file
session_file = self._session_file(session.session_id)
try:
temp_file = session_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(session.to_dict(), f, indent=2)
shutil.move(str(temp_file), str(session_file))
except Exception as e:
raise RuntimeError(f"Failed to save session {session.session_id}: {e}")
# Update index
self._index[session.session_id] = {
'workflow_id': session.workflow_id,
'execution_id': session.execution_id,
'status': session.status.value if isinstance(session.status, SessionStatus) else session.status,
'started_at': session.started_at,
'updated_at': session.updated_at,
'current_step': session.current_step_index,
'total_steps': session.total_steps
}
self._save_index()
def load_session(self, session_id: str) -> Optional[CoachingSessionState]:
"""
Load a session state from disk.
Args:
session_id: ID of the session to load
Returns:
Session state or None if not found
"""
session_file = self._session_file(session_id)
if not session_file.exists():
return None
try:
with open(session_file, 'r') as f:
data = json.load(f)
return CoachingSessionState.from_dict(data)
except Exception as e:
print(f"Warning: Could not load session {session_id}: {e}")
return None
def delete_session(self, session_id: str) -> bool:
"""
Delete a session.
Args:
session_id: ID of the session to delete
Returns:
True if deleted, False otherwise
"""
with self._lock:
session_file = self._session_file(session_id)
if session_file.exists():
session_file.unlink()
if session_id in self._index:
del self._index[session_id]
self._save_index()
return True
return False
def list_sessions(
self,
workflow_id: Optional[str] = None,
status: Optional[SessionStatus] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
List sessions with optional filters.
Args:
workflow_id: Filter by workflow ID
status: Filter by status
limit: Maximum number of sessions to return
Returns:
List of session summaries
"""
sessions = []
for session_id, info in self._index.items():
# Apply filters
if workflow_id and info.get('workflow_id') != workflow_id:
continue
if status:
session_status = info.get('status', 'active')
if session_status != status.value:
continue
sessions.append({
'session_id': session_id,
**info
})
# Sort by updated_at descending
sessions.sort(key=lambda x: x.get('updated_at', ''), reverse=True)
return sessions[:limit]
def get_resumable_sessions(self, workflow_id: str) -> List[CoachingSessionState]:
"""
Get all resumable sessions for a workflow.
Args:
workflow_id: Workflow ID to filter by
Returns:
List of resumable session states
"""
resumable = []
for session_info in self.list_sessions(workflow_id=workflow_id):
status = session_info.get('status', 'active')
if status in ['active', 'paused']:
session = self.load_session(session_info['session_id'])
if session and session.can_resume():
resumable.append(session)
return resumable
def pause_session(self, session_id: str) -> bool:
"""
Pause an active session.
Args:
session_id: Session to pause
Returns:
True if paused, False otherwise
"""
session = self.load_session(session_id)
if session and session.status == SessionStatus.ACTIVE:
session.status = SessionStatus.PAUSED
self.save_session(session)
return True
return False
def resume_session(self, session_id: str) -> Optional[CoachingSessionState]:
"""
Resume a paused session.
Args:
session_id: Session to resume
Returns:
Resumed session or None if cannot resume
"""
session = self.load_session(session_id)
if session and session.can_resume():
session.status = SessionStatus.ACTIVE
self.save_session(session)
return session
return None
def complete_session(
self,
session_id: str,
success: bool = True,
error_message: Optional[str] = None
) -> Optional[CoachingSessionState]:
"""
Mark a session as completed.
Args:
session_id: Session to complete
success: Whether the session completed successfully
error_message: Error message if failed
Returns:
Completed session or None
"""
session = self.load_session(session_id)
if session:
session.status = SessionStatus.COMPLETED if success else SessionStatus.FAILED
session.completed_at = datetime.now().isoformat()
session.error_message = error_message
self.save_session(session)
return session
return None
def abandon_session(self, session_id: str) -> bool:
"""
Mark a session as abandoned.
Args:
session_id: Session to abandon
Returns:
True if abandoned
"""
session = self.load_session(session_id)
if session:
session.status = SessionStatus.ABANDONED
session.completed_at = datetime.now().isoformat()
self.save_session(session)
return True
return False
def cleanup_old_sessions(self, max_age_days: int = 30) -> int:
"""
Remove sessions older than max_age_days.
Args:
max_age_days: Maximum age in days
Returns:
Number of sessions removed
"""
cutoff = datetime.now().timestamp() - (max_age_days * 24 * 3600)
removed = 0
with self._lock:
to_remove = []
for session_id, info in self._index.items():
try:
updated_at = datetime.fromisoformat(info.get('updated_at', '')).timestamp()
if updated_at < cutoff:
to_remove.append(session_id)
except:
pass
for session_id in to_remove:
session_file = self._session_file(session_id)
if session_file.exists():
session_file.unlink()
del self._index[session_id]
removed += 1
if removed > 0:
self._save_index()
return removed
def get_statistics(self) -> Dict[str, Any]:
"""
Get overall statistics about COACHING sessions.
Returns:
Statistics dictionary
"""
total = len(self._index)
by_status = {}
total_decisions = 0
total_accepted = 0
total_corrected = 0
for session_id, info in self._index.items():
status = info.get('status', 'unknown')
by_status[status] = by_status.get(status, 0) + 1
# Load full session for detailed stats
session = self.load_session(session_id)
if session:
total_decisions += session.stats.get('suggestions_made', 0)
total_accepted += session.stats.get('accepted', 0)
total_corrected += session.stats.get('corrected', 0)
return {
'total_sessions': total,
'by_status': by_status,
'total_decisions': total_decisions,
'total_accepted': total_accepted,
'total_corrected': total_corrected,
'overall_acceptance_rate': total_accepted / total_decisions if total_decisions > 0 else 0,
'overall_correction_rate': total_corrected / total_decisions if total_decisions > 0 else 0
}
# Global instance
_global_persistence: Optional[CoachingSessionPersistence] = None
def get_coaching_persistence(storage_path: Optional[Path] = None) -> CoachingSessionPersistence:
"""
Get or create the global coaching session persistence instance.
Args:
storage_path: Optional custom storage path
Returns:
CoachingSessionPersistence instance
"""
global _global_persistence
if _global_persistence is None:
_global_persistence = CoachingSessionPersistence(storage_path)
return _global_persistence