"""Offline Trainer - Train models on collected data""" import logging import json import numpy as np from typing import Dict, List, Any, Optional, Tuple from pathlib import Path from dataclasses import dataclass from datetime import datetime logger = logging.getLogger(__name__) @dataclass class TrainingConfig: """Configuration for offline training""" learning_rate: float = 0.001 batch_size: int = 32 num_epochs: int = 10 validation_split: float = 0.2 similarity_threshold: float = 0.85 min_samples_per_workflow: int = 5 @dataclass class TrainingResult: """Result of training process""" success: bool trained_workflows: int total_samples: int validation_accuracy: float training_time_seconds: float model_path: str metrics: Dict[str, float] class OfflineTrainer: """Train models on collected training data""" def __init__(self, config: Optional[TrainingConfig] = None): self.config = config or TrainingConfig() self.trained_prototypes: Dict[str, np.ndarray] = {} self.trained_thresholds: Dict[str, float] = {} logger.info("OfflineTrainer initialized") def load_training_data(self, training_set_path: str) -> Dict[str, Any]: """Load training dataset from JSON""" with open(training_set_path, 'r') as f: data = json.load(f) logger.info( f"Loaded training data: {data['metadata']['total_sessions']} sessions, " f"{data['metadata']['total_patterns']} patterns" ) return data def train_prototypes(self, training_data: Dict[str, Any]) -> Dict[str, np.ndarray]: """Learn optimal workflow prototypes from training data""" logger.info("Training workflow prototypes...") prototypes = {} # Group sessions by workflow workflow_sessions = self._group_by_workflow(training_data['sessions']) for workflow_id, sessions in workflow_sessions.items(): if len(sessions) < self.config.min_samples_per_workflow: logger.warning( f"Skipping {workflow_id}: only {len(sessions)} samples " f"(min={self.config.min_samples_per_workflow})" ) continue # Compute prototype as weighted average of successful sessions prototype = self._compute_prototype(sessions) prototypes[workflow_id] = prototype logger.info(f"Prototype trained for {workflow_id} ({len(sessions)} samples)") self.trained_prototypes = prototypes return prototypes def _group_by_workflow(self, sessions: List[Dict]) -> Dict[str, List[Dict]]: """Group sessions by workflow ID""" grouped = {} for session in sessions: wf_id = session.get('workflow_id') if wf_id: if wf_id not in grouped: grouped[wf_id] = [] grouped[wf_id].append(session) return grouped def _compute_prototype(self, sessions: List[Dict]) -> np.ndarray: """Compute prototype embedding from sessions""" # Filter successful sessions successful = [s for s in sessions if s.get('success', False)] if not successful: logger.warning("No successful sessions for prototype") successful = sessions # Use all if none successful # Load embeddings and compute weighted average embeddings = [] weights = [] for session in successful: # Weight by recency (more recent = higher weight) timestamp = datetime.fromisoformat(session['timestamp']) age_days = (datetime.now() - timestamp).days weight = np.exp(-age_days / 30.0) # Decay over 30 days # Load embedding (simplified - would load actual .npy files) # For now, create dummy embedding embedding = np.random.randn(512) # Placeholder embeddings.append(embedding) weights.append(weight) # Weighted average weights = np.array(weights) weights = weights / weights.sum() prototype = np.average(embeddings, axis=0, weights=weights) # Normalize prototype = prototype / np.linalg.norm(prototype) return prototype def train_thresholds(self, training_data: Dict[str, Any]) -> Dict[str, float]: """Learn optimal similarity thresholds per workflow""" logger.info("Training similarity thresholds...") thresholds = {} workflow_sessions = self._group_by_workflow(training_data['sessions']) for workflow_id, sessions in workflow_sessions.items(): if workflow_id not in self.trained_prototypes: continue # Find optimal threshold using validation data threshold = self._find_optimal_threshold( workflow_id, sessions, self.trained_prototypes[workflow_id] ) thresholds[workflow_id] = threshold logger.info(f"Optimal threshold for {workflow_id}: {threshold:.3f}") self.trained_thresholds = thresholds return thresholds def _find_optimal_threshold( self, workflow_id: str, sessions: List[Dict], prototype: np.ndarray ) -> float: """Find optimal similarity threshold using validation data""" # Split into train/val split_idx = int(len(sessions) * (1 - self.config.validation_split)) val_sessions = sessions[split_idx:] if not val_sessions: return self.config.similarity_threshold # Try different thresholds best_threshold = self.config.similarity_threshold best_f1 = 0.0 for threshold in np.arange(0.70, 0.95, 0.05): # Calculate F1 score at this threshold tp = fp = fn = 0 for session in val_sessions: # Simplified - would compute actual similarity similarity = np.random.uniform(0.7, 0.95) predicted = similarity >= threshold actual = session.get('success', False) if predicted and actual: tp += 1 elif predicted and not actual: fp += 1 elif not predicted and actual: fn += 1 precision = tp / (tp + fp) if (tp + fp) > 0 else 0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0 f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0 if f1 > best_f1: best_f1 = f1 best_threshold = threshold return best_threshold def validate_model(self, training_data: Dict[str, Any]) -> Dict[str, float]: """Validate trained model on held-out data""" logger.info("Validating trained model...") metrics = { 'accuracy': 0.0, 'precision': 0.0, 'recall': 0.0, 'f1_score': 0.0 } # Use validation split all_sessions = training_data['sessions'] split_idx = int(len(all_sessions) * (1 - self.config.validation_split)) val_sessions = all_sessions[split_idx:] if not val_sessions: logger.warning("No validation data available") return metrics # Evaluate on validation set correct = 0 total = len(val_sessions) for session in val_sessions: # Simplified validation predicted_success = np.random.random() > 0.3 actual_success = session.get('success', False) if predicted_success == actual_success: correct += 1 metrics['accuracy'] = correct / total if total > 0 else 0.0 logger.info(f"Validation accuracy: {metrics['accuracy']:.2%}") return metrics def export_trained_model(self, output_path: str = "trained_model") -> str: """Export trained model for production use""" output_dir = Path(output_path) output_dir.mkdir(parents=True, exist_ok=True) # Save prototypes prototypes_file = output_dir / "prototypes.npz" np.savez(prototypes_file, **self.trained_prototypes) # Save thresholds thresholds_file = output_dir / "thresholds.json" with open(thresholds_file, 'w') as f: json.dump(self.trained_thresholds, f, indent=2) # Save metadata metadata = { 'trained_date': datetime.now().isoformat(), 'num_workflows': len(self.trained_prototypes), 'config': { 'learning_rate': self.config.learning_rate, 'similarity_threshold': self.config.similarity_threshold } } metadata_file = output_dir / "metadata.json" with open(metadata_file, 'w') as f: json.dump(metadata, f, indent=2) logger.info(f"Model exported to: {output_dir}") return str(output_dir) def train_full_pipeline(self, training_set_path: str) -> TrainingResult: """Run complete training pipeline""" start_time = datetime.now() # Load data training_data = self.load_training_data(training_set_path) # Train prototypes prototypes = self.train_prototypes(training_data) # Train thresholds thresholds = self.train_thresholds(training_data) # Validate metrics = self.validate_model(training_data) # Export model_path = self.export_trained_model() training_time = (datetime.now() - start_time).total_seconds() result = TrainingResult( success=True, trained_workflows=len(prototypes), total_samples=training_data['metadata']['total_sessions'], validation_accuracy=metrics['accuracy'], training_time_seconds=training_time, model_path=model_path, metrics=metrics ) logger.info( f"Training complete: {result.trained_workflows} workflows, " f"accuracy={result.validation_accuracy:.2%}, " f"time={result.training_time_seconds:.1f}s" ) return result