OMOP CDM 5.4 Data Pipeline
A comprehensive ETL pipeline for transforming healthcare data to OMOP Common Data Model (CDM) version 5.4 format.
Overview
This pipeline provides a complete solution for:
- Extracting data from staging tables
- Mapping source codes to OMOP standard concepts
- Transforming data to OMOP CDM 5.4 format
- Validating data quality and OMOP compliance
- Loading data into OMOP tables with parallel processing
Features
- ✅ Complete OMOP CDM 5.4 Support: All clinical, vocabulary, and metadata tables
- ✅ Automated Concept Mapping: LRU-cached mapping with fallback strategies
- ✅ Parallel Processing: Multi-threaded ETL with configurable workers
- ✅ Data Quality Validation: Comprehensive validation rules and OMOP compliance checks
- ✅ Error Handling: Retry logic, circuit breaker, and checkpoint/resume functionality
- ✅ Web Interface: Modern React dashboard for managing ETL pipelines (NEW!)
- ✅ REST API: FastAPI backend with complete API documentation
- ✅ CLI Interface: User-friendly command-line interface for all operations
- ✅ Vocabulary Management: Tools for loading and managing OMOP vocabularies
- ✅ Comprehensive Logging: Detailed logging with audit trail
Quick Start
Option 1: Web Interface (Recommended)
cd omop
# Install dependencies
pip install -r requirements.txt
pip install -r requirements-api.txt
# Start web interface (API + Frontend)
./start_web.sh
Then open http://localhost:3000 in your browser.
See QUICK_START_WEB.md for detailed instructions.
Option 2: Command Line Interface
# Clone the repository
cd omop
# Install dependencies
pip install -r requirements.txt
# Or install in development mode
pip install -e .
Configuration
- Copy the example environment file:
cp .env.example .env
- Edit
.envwith your database credentials:
DB_HOST=localhost
DB_PORT=5432
DB_NAME=omop_db
DB_USER=your_user
DB_PASSWORD=your_password
- Review and customize
config.yamlas needed.
Create Database Schemas
# Create all schemas (OMOP, staging, audit)
omop-pipeline schema create --type all
# Or create individually
omop-pipeline schema create --type omop
omop-pipeline schema create --type staging
omop-pipeline schema create --type audit
Load Vocabularies
- Download vocabularies from Athena OHDSI
- Extract the ZIP file to a directory
- Load vocabularies:
omop-pipeline vocab load --path /path/to/vocabularies
Run ETL Pipeline
# Run complete ETL pipeline
omop-pipeline etl run --source staging.raw_patients --target person
# With custom batch size and workers
omop-pipeline etl run --source staging.raw_patients --target person --batch-size 5000 --workers 8
# Run in sequential mode (no parallelization)
omop-pipeline etl run --source staging.raw_patients --target person --sequential
Web Interface
The pipeline includes a modern web interface built with FastAPI and React.
Features
- 📊 Dashboard: Real-time statistics and performance metrics
- ⚙️ ETL Manager: Launch and monitor ETL pipelines
- 🗄️ Schema Manager: Create and validate database schemas
- ✅ Validation: Data quality checks and unmapped codes
- 📝 Logs: System logs and validation errors
Quick Start
./start_web.sh
Access the interface at http://localhost:3000
For more details, see README_WEB_INTERFACE.md and WEB_INTERFACE_SUMMARY.md.
CLI Commands
Schema Management
# Create schemas
omop-pipeline schema create --type [omop|staging|audit|all]
# Validate schema
omop-pipeline schema validate
ETL Operations
# Run complete ETL
omop-pipeline etl run --source <table> --target <table>
# Run extraction only
omop-pipeline etl extract --source <table>
# Run transformation only
omop-pipeline etl transform --target <table>
# Run loading only
omop-pipeline etl load --target <table>
Data Validation
# Validate data quality
omop-pipeline validate
# Validate specific table
omop-pipeline validate --table person
Statistics
# Show ETL statistics
omop-pipeline stats show
# Show summary
omop-pipeline stats summary
Vocabulary Management
# Prepare vocabulary loading (shows instructions)
omop-pipeline vocab prepare
# Load vocabularies
omop-pipeline vocab load --path /path/to/vocabularies
Configuration
# Validate configuration
omop-pipeline config validate
Logs
# Show recent log entries
omop-pipeline logs show
# Show last 100 lines
omop-pipeline logs show --lines 100
# Filter by log level
omop-pipeline logs show --level ERROR
Architecture
The pipeline consists of the following components:
- Extractor: Extracts data from staging tables with batch processing
- Concept Mapper: Maps source codes to OMOP concepts with LRU caching
- Transformer: Transforms data to OMOP format with validation
- Validator: Validates data quality and OMOP compliance
- Loader: Loads data into OMOP tables using bulk operations
- Orchestrator: Coordinates the complete ETL flow with parallel processing
- Error Handler: Manages errors with retry logic and circuit breaker
- Schema Manager: Creates and manages database schemas
- Vocabulary Loader: Loads OMOP vocabularies from CSV files
Configuration
The pipeline is configured via config.yaml:
database:
host: localhost
port: 5432
database: omop_db
user: postgres
password: ${DB_PASSWORD} # From environment variable
etl:
batch_size: 1000
num_workers: 4
concept_cache_size: 10000
validate_before_load: true
logging:
level: INFO
file: logs/omop_pipeline.log
max_bytes: 10485760
backup_count: 5
Performance
The pipeline is optimized for high-volume data processing:
- Parallel Processing: Multi-threaded execution with configurable workers
- Batch Operations: Efficient batch processing with PostgreSQL COPY
- Caching: LRU cache for frequently used concept mappings
- Connection Pooling: Optimized database connection management
Typical performance on a 16-core, 125GB RAM system:
- Throughput: 5,000-10,000 records/second
- Memory Usage: ~2-4GB per worker
- CPU Usage: Scales linearly with number of workers
Data Quality
The pipeline includes comprehensive data quality checks:
- Referential Integrity: Validates all foreign key relationships
- Date Consistency: Ensures start dates <= end dates
- Concept Validation: Verifies all concept_ids exist
- Value Ranges: Checks numeric values are within acceptable ranges
- OMOP Compliance: Validates against OMOP CDM specifications
Error Handling
The pipeline implements robust error handling:
- Error Levels: INFO, WARNING, ERROR, CRITICAL
- Retry Logic: Exponential backoff for transient errors
- Circuit Breaker: Prevents cascading failures
- Checkpoint/Resume: Resume processing after interruption
- Audit Trail: Complete error logging to audit tables
Testing
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_transformer.py
Documentation
- User Guide - Detailed usage instructions
- Architecture - System architecture and design
- Transformation Rules - Data transformation specifications
- CHANGELOG - Version history and changes
Requirements
- Python 3.12+
- PostgreSQL 16.11+
- 8GB+ RAM (16GB+ recommended for parallel processing)
- OMOP vocabularies from Athena OHDSI
License
MIT License - see LICENSE file for details
Support
For issues, questions, or contributions, please open an issue on GitHub.
Acknowledgments
- OHDSI Community for OMOP CDM specifications
- Athena OHDSI for vocabulary management