# OMOP CDM 5.4 Data Pipeline A comprehensive ETL pipeline for transforming healthcare data to OMOP Common Data Model (CDM) version 5.4 format. ## Overview This pipeline provides a complete solution for: - Extracting data from staging tables - Mapping source codes to OMOP standard concepts - Transforming data to OMOP CDM 5.4 format - Validating data quality and OMOP compliance - Loading data into OMOP tables with parallel processing ## Features - ✅ **Complete OMOP CDM 5.4 Support**: All clinical, vocabulary, and metadata tables - ✅ **Automated Concept Mapping**: LRU-cached mapping with fallback strategies - ✅ **Parallel Processing**: Multi-threaded ETL with configurable workers - ✅ **Data Quality Validation**: Comprehensive validation rules and OMOP compliance checks - ✅ **Error Handling**: Retry logic, circuit breaker, and checkpoint/resume functionality - ✅ **Web Interface**: Modern React dashboard for managing ETL pipelines (NEW!) - ✅ **REST API**: FastAPI backend with complete API documentation - ✅ **CLI Interface**: User-friendly command-line interface for all operations - ✅ **Vocabulary Management**: Tools for loading and managing OMOP vocabularies - ✅ **Comprehensive Logging**: Detailed logging with audit trail ## Quick Start ### Option 1: Web Interface (Recommended) ```bash cd omop # Install dependencies pip install -r requirements.txt pip install -r requirements-api.txt # Start web interface (API + Frontend) ./start_web.sh ``` Then open http://localhost:3000 in your browser. See `QUICK_START_WEB.md` for detailed instructions. ### Option 2: Command Line Interface ```bash # Clone the repository cd omop # Install dependencies pip install -r requirements.txt # Or install in development mode pip install -e . ``` ### Configuration 1. Copy the example environment file: ```bash cp .env.example .env ``` 2. Edit `.env` with your database credentials: ``` DB_HOST=localhost DB_PORT=5432 DB_NAME=omop_db DB_USER=your_user DB_PASSWORD=your_password ``` 3. Review and customize `config.yaml` as needed. ### Create Database Schemas ```bash # Create all schemas (OMOP, staging, audit) omop-pipeline schema create --type all # Or create individually omop-pipeline schema create --type omop omop-pipeline schema create --type staging omop-pipeline schema create --type audit ``` ### Load Vocabularies 1. Download vocabularies from [Athena OHDSI](https://athena.ohdsi.org/) 2. Extract the ZIP file to a directory 3. Load vocabularies: ```bash omop-pipeline vocab load --path /path/to/vocabularies ``` ### Run ETL Pipeline ```bash # Run complete ETL pipeline omop-pipeline etl run --source staging.raw_patients --target person # With custom batch size and workers omop-pipeline etl run --source staging.raw_patients --target person --batch-size 5000 --workers 8 # Run in sequential mode (no parallelization) omop-pipeline etl run --source staging.raw_patients --target person --sequential ``` ## Web Interface The pipeline includes a modern web interface built with FastAPI and React. ### Features - 📊 **Dashboard**: Real-time statistics and performance metrics - ⚙️ **ETL Manager**: Launch and monitor ETL pipelines - 🗄️ **Schema Manager**: Create and validate database schemas - ✅ **Validation**: Data quality checks and unmapped codes - 📝 **Logs**: System logs and validation errors ### Quick Start ```bash ./start_web.sh ``` Access the interface at http://localhost:3000 For more details, see `README_WEB_INTERFACE.md` and `WEB_INTERFACE_SUMMARY.md`. ## CLI Commands ### Schema Management ```bash # Create schemas omop-pipeline schema create --type [omop|staging|audit|all] # Validate schema omop-pipeline schema validate ``` ### ETL Operations ```bash # Run complete ETL omop-pipeline etl run --source --target
# Run extraction only omop-pipeline etl extract --source
# Run transformation only omop-pipeline etl transform --target
# Run loading only omop-pipeline etl load --target
``` ### Data Validation ```bash # Validate data quality omop-pipeline validate # Validate specific table omop-pipeline validate --table person ``` ### Statistics ```bash # Show ETL statistics omop-pipeline stats show # Show summary omop-pipeline stats summary ``` ### Vocabulary Management ```bash # Prepare vocabulary loading (shows instructions) omop-pipeline vocab prepare # Load vocabularies omop-pipeline vocab load --path /path/to/vocabularies ``` ### Configuration ```bash # Validate configuration omop-pipeline config validate ``` ### Logs ```bash # Show recent log entries omop-pipeline logs show # Show last 100 lines omop-pipeline logs show --lines 100 # Filter by log level omop-pipeline logs show --level ERROR ``` ## Architecture The pipeline consists of the following components: - **Extractor**: Extracts data from staging tables with batch processing - **Concept Mapper**: Maps source codes to OMOP concepts with LRU caching - **Transformer**: Transforms data to OMOP format with validation - **Validator**: Validates data quality and OMOP compliance - **Loader**: Loads data into OMOP tables using bulk operations - **Orchestrator**: Coordinates the complete ETL flow with parallel processing - **Error Handler**: Manages errors with retry logic and circuit breaker - **Schema Manager**: Creates and manages database schemas - **Vocabulary Loader**: Loads OMOP vocabularies from CSV files ## Configuration The pipeline is configured via `config.yaml`: ```yaml database: host: localhost port: 5432 database: omop_db user: postgres password: ${DB_PASSWORD} # From environment variable etl: batch_size: 1000 num_workers: 4 concept_cache_size: 10000 validate_before_load: true logging: level: INFO file: logs/omop_pipeline.log max_bytes: 10485760 backup_count: 5 ``` ## Performance The pipeline is optimized for high-volume data processing: - **Parallel Processing**: Multi-threaded execution with configurable workers - **Batch Operations**: Efficient batch processing with PostgreSQL COPY - **Caching**: LRU cache for frequently used concept mappings - **Connection Pooling**: Optimized database connection management Typical performance on a 16-core, 125GB RAM system: - **Throughput**: 5,000-10,000 records/second - **Memory Usage**: ~2-4GB per worker - **CPU Usage**: Scales linearly with number of workers ## Data Quality The pipeline includes comprehensive data quality checks: - **Referential Integrity**: Validates all foreign key relationships - **Date Consistency**: Ensures start dates <= end dates - **Concept Validation**: Verifies all concept_ids exist - **Value Ranges**: Checks numeric values are within acceptable ranges - **OMOP Compliance**: Validates against OMOP CDM specifications ## Error Handling The pipeline implements robust error handling: - **Error Levels**: INFO, WARNING, ERROR, CRITICAL - **Retry Logic**: Exponential backoff for transient errors - **Circuit Breaker**: Prevents cascading failures - **Checkpoint/Resume**: Resume processing after interruption - **Audit Trail**: Complete error logging to audit tables ## Testing ```bash # Run all tests pytest # Run with coverage pytest --cov=src --cov-report=html # Run specific test file pytest tests/test_transformer.py ``` ## Documentation - [User Guide](docs/user_guide.md) - Detailed usage instructions - [Architecture](docs/architecture.md) - System architecture and design - [Transformation Rules](docs/transformation_rules.md) - Data transformation specifications - [CHANGELOG](CHANGELOG.md) - Version history and changes ## Requirements - Python 3.12+ - PostgreSQL 16.11+ - 8GB+ RAM (16GB+ recommended for parallel processing) - OMOP vocabularies from Athena OHDSI ## License MIT License - see LICENSE file for details ## Support For issues, questions, or contributions, please open an issue on GitHub. ## Acknowledgments - OHDSI Community for OMOP CDM specifications - Athena OHDSI for vocabulary management