322 lines
7.8 KiB
Markdown
322 lines
7.8 KiB
Markdown
# OMOP CDM 5.4 Data Pipeline
|
|
|
|
A comprehensive ETL pipeline for transforming healthcare data to OMOP Common Data Model (CDM) version 5.4 format.
|
|
|
|
## Overview
|
|
|
|
This pipeline provides a complete solution for:
|
|
- Extracting data from staging tables
|
|
- Mapping source codes to OMOP standard concepts
|
|
- Transforming data to OMOP CDM 5.4 format
|
|
- Validating data quality and OMOP compliance
|
|
- Loading data into OMOP tables with parallel processing
|
|
|
|
## Features
|
|
|
|
- ✅ **Complete OMOP CDM 5.4 Support**: All clinical, vocabulary, and metadata tables
|
|
- ✅ **Automated Concept Mapping**: LRU-cached mapping with fallback strategies
|
|
- ✅ **Parallel Processing**: Multi-threaded ETL with configurable workers
|
|
- ✅ **Data Quality Validation**: Comprehensive validation rules and OMOP compliance checks
|
|
- ✅ **Error Handling**: Retry logic, circuit breaker, and checkpoint/resume functionality
|
|
- ✅ **Web Interface**: Modern React dashboard for managing ETL pipelines (NEW!)
|
|
- ✅ **REST API**: FastAPI backend with complete API documentation
|
|
- ✅ **CLI Interface**: User-friendly command-line interface for all operations
|
|
- ✅ **Vocabulary Management**: Tools for loading and managing OMOP vocabularies
|
|
- ✅ **Comprehensive Logging**: Detailed logging with audit trail
|
|
|
|
## Quick Start
|
|
|
|
### Option 1: Web Interface (Recommended)
|
|
|
|
```bash
|
|
cd omop
|
|
|
|
# Install dependencies
|
|
pip install -r requirements.txt
|
|
pip install -r requirements-api.txt
|
|
|
|
# Start web interface (API + Frontend)
|
|
./start_web.sh
|
|
```
|
|
|
|
Then open http://localhost:3000 in your browser.
|
|
|
|
See `QUICK_START_WEB.md` for detailed instructions.
|
|
|
|
### Option 2: Command Line Interface
|
|
|
|
```bash
|
|
# Clone the repository
|
|
cd omop
|
|
|
|
# Install dependencies
|
|
pip install -r requirements.txt
|
|
|
|
# Or install in development mode
|
|
pip install -e .
|
|
```
|
|
|
|
### Configuration
|
|
|
|
1. Copy the example environment file:
|
|
```bash
|
|
cp .env.example .env
|
|
```
|
|
|
|
2. Edit `.env` with your database credentials:
|
|
```
|
|
DB_HOST=localhost
|
|
DB_PORT=5432
|
|
DB_NAME=omop_db
|
|
DB_USER=your_user
|
|
DB_PASSWORD=your_password
|
|
```
|
|
|
|
3. Review and customize `config.yaml` as needed.
|
|
|
|
### Create Database Schemas
|
|
|
|
```bash
|
|
# Create all schemas (OMOP, staging, audit)
|
|
omop-pipeline schema create --type all
|
|
|
|
# Or create individually
|
|
omop-pipeline schema create --type omop
|
|
omop-pipeline schema create --type staging
|
|
omop-pipeline schema create --type audit
|
|
```
|
|
|
|
### Load Vocabularies
|
|
|
|
1. Download vocabularies from [Athena OHDSI](https://athena.ohdsi.org/)
|
|
2. Extract the ZIP file to a directory
|
|
3. Load vocabularies:
|
|
|
|
```bash
|
|
omop-pipeline vocab load --path /path/to/vocabularies
|
|
```
|
|
|
|
### Run ETL Pipeline
|
|
|
|
```bash
|
|
# Run complete ETL pipeline
|
|
omop-pipeline etl run --source staging.raw_patients --target person
|
|
|
|
# With custom batch size and workers
|
|
omop-pipeline etl run --source staging.raw_patients --target person --batch-size 5000 --workers 8
|
|
|
|
# Run in sequential mode (no parallelization)
|
|
omop-pipeline etl run --source staging.raw_patients --target person --sequential
|
|
```
|
|
|
|
## Web Interface
|
|
|
|
The pipeline includes a modern web interface built with FastAPI and React.
|
|
|
|
### Features
|
|
- 📊 **Dashboard**: Real-time statistics and performance metrics
|
|
- ⚙️ **ETL Manager**: Launch and monitor ETL pipelines
|
|
- 🗄️ **Schema Manager**: Create and validate database schemas
|
|
- ✅ **Validation**: Data quality checks and unmapped codes
|
|
- 📝 **Logs**: System logs and validation errors
|
|
|
|
### Quick Start
|
|
```bash
|
|
./start_web.sh
|
|
```
|
|
|
|
Access the interface at http://localhost:3000
|
|
|
|
For more details, see `README_WEB_INTERFACE.md` and `WEB_INTERFACE_SUMMARY.md`.
|
|
|
|
## CLI Commands
|
|
|
|
### Schema Management
|
|
|
|
```bash
|
|
# Create schemas
|
|
omop-pipeline schema create --type [omop|staging|audit|all]
|
|
|
|
# Validate schema
|
|
omop-pipeline schema validate
|
|
```
|
|
|
|
### ETL Operations
|
|
|
|
```bash
|
|
# Run complete ETL
|
|
omop-pipeline etl run --source <table> --target <table>
|
|
|
|
# Run extraction only
|
|
omop-pipeline etl extract --source <table>
|
|
|
|
# Run transformation only
|
|
omop-pipeline etl transform --target <table>
|
|
|
|
# Run loading only
|
|
omop-pipeline etl load --target <table>
|
|
```
|
|
|
|
### Data Validation
|
|
|
|
```bash
|
|
# Validate data quality
|
|
omop-pipeline validate
|
|
|
|
# Validate specific table
|
|
omop-pipeline validate --table person
|
|
```
|
|
|
|
### Statistics
|
|
|
|
```bash
|
|
# Show ETL statistics
|
|
omop-pipeline stats show
|
|
|
|
# Show summary
|
|
omop-pipeline stats summary
|
|
```
|
|
|
|
### Vocabulary Management
|
|
|
|
```bash
|
|
# Prepare vocabulary loading (shows instructions)
|
|
omop-pipeline vocab prepare
|
|
|
|
# Load vocabularies
|
|
omop-pipeline vocab load --path /path/to/vocabularies
|
|
```
|
|
|
|
### Configuration
|
|
|
|
```bash
|
|
# Validate configuration
|
|
omop-pipeline config validate
|
|
```
|
|
|
|
### Logs
|
|
|
|
```bash
|
|
# Show recent log entries
|
|
omop-pipeline logs show
|
|
|
|
# Show last 100 lines
|
|
omop-pipeline logs show --lines 100
|
|
|
|
# Filter by log level
|
|
omop-pipeline logs show --level ERROR
|
|
```
|
|
|
|
## Architecture
|
|
|
|
The pipeline consists of the following components:
|
|
|
|
- **Extractor**: Extracts data from staging tables with batch processing
|
|
- **Concept Mapper**: Maps source codes to OMOP concepts with LRU caching
|
|
- **Transformer**: Transforms data to OMOP format with validation
|
|
- **Validator**: Validates data quality and OMOP compliance
|
|
- **Loader**: Loads data into OMOP tables using bulk operations
|
|
- **Orchestrator**: Coordinates the complete ETL flow with parallel processing
|
|
- **Error Handler**: Manages errors with retry logic and circuit breaker
|
|
- **Schema Manager**: Creates and manages database schemas
|
|
- **Vocabulary Loader**: Loads OMOP vocabularies from CSV files
|
|
|
|
## Configuration
|
|
|
|
The pipeline is configured via `config.yaml`:
|
|
|
|
```yaml
|
|
database:
|
|
host: localhost
|
|
port: 5432
|
|
database: omop_db
|
|
user: postgres
|
|
password: ${DB_PASSWORD} # From environment variable
|
|
|
|
etl:
|
|
batch_size: 1000
|
|
num_workers: 4
|
|
concept_cache_size: 10000
|
|
validate_before_load: true
|
|
|
|
logging:
|
|
level: INFO
|
|
file: logs/omop_pipeline.log
|
|
max_bytes: 10485760
|
|
backup_count: 5
|
|
```
|
|
|
|
## Performance
|
|
|
|
The pipeline is optimized for high-volume data processing:
|
|
|
|
- **Parallel Processing**: Multi-threaded execution with configurable workers
|
|
- **Batch Operations**: Efficient batch processing with PostgreSQL COPY
|
|
- **Caching**: LRU cache for frequently used concept mappings
|
|
- **Connection Pooling**: Optimized database connection management
|
|
|
|
Typical performance on a 16-core, 125GB RAM system:
|
|
- **Throughput**: 5,000-10,000 records/second
|
|
- **Memory Usage**: ~2-4GB per worker
|
|
- **CPU Usage**: Scales linearly with number of workers
|
|
|
|
## Data Quality
|
|
|
|
The pipeline includes comprehensive data quality checks:
|
|
|
|
- **Referential Integrity**: Validates all foreign key relationships
|
|
- **Date Consistency**: Ensures start dates <= end dates
|
|
- **Concept Validation**: Verifies all concept_ids exist
|
|
- **Value Ranges**: Checks numeric values are within acceptable ranges
|
|
- **OMOP Compliance**: Validates against OMOP CDM specifications
|
|
|
|
## Error Handling
|
|
|
|
The pipeline implements robust error handling:
|
|
|
|
- **Error Levels**: INFO, WARNING, ERROR, CRITICAL
|
|
- **Retry Logic**: Exponential backoff for transient errors
|
|
- **Circuit Breaker**: Prevents cascading failures
|
|
- **Checkpoint/Resume**: Resume processing after interruption
|
|
- **Audit Trail**: Complete error logging to audit tables
|
|
|
|
## Testing
|
|
|
|
```bash
|
|
# Run all tests
|
|
pytest
|
|
|
|
# Run with coverage
|
|
pytest --cov=src --cov-report=html
|
|
|
|
# Run specific test file
|
|
pytest tests/test_transformer.py
|
|
```
|
|
|
|
## Documentation
|
|
|
|
- [User Guide](docs/user_guide.md) - Detailed usage instructions
|
|
- [Architecture](docs/architecture.md) - System architecture and design
|
|
- [Transformation Rules](docs/transformation_rules.md) - Data transformation specifications
|
|
- [CHANGELOG](CHANGELOG.md) - Version history and changes
|
|
|
|
## Requirements
|
|
|
|
- Python 3.12+
|
|
- PostgreSQL 16.11+
|
|
- 8GB+ RAM (16GB+ recommended for parallel processing)
|
|
- OMOP vocabularies from Athena OHDSI
|
|
|
|
## License
|
|
|
|
MIT License - see LICENSE file for details
|
|
|
|
## Support
|
|
|
|
For issues, questions, or contributions, please open an issue on GitHub.
|
|
|
|
## Acknowledgments
|
|
|
|
- OHDSI Community for OMOP CDM specifications
|
|
- Athena OHDSI for vocabulary management
|