# OMOP CDM 5.4 Data Pipeline
A comprehensive ETL pipeline for transforming healthcare data to OMOP Common Data Model (CDM) version 5.4 format.
## Overview
This pipeline provides a complete solution for:
- Extracting data from staging tables
- Mapping source codes to OMOP standard concepts
- Transforming data to OMOP CDM 5.4 format
- Validating data quality and OMOP compliance
- Loading data into OMOP tables with parallel processing
## Features
- ✅ **Complete OMOP CDM 5.4 Support**: All clinical, vocabulary, and metadata tables
- ✅ **Automated Concept Mapping**: LRU-cached mapping with fallback strategies
- ✅ **Parallel Processing**: Multi-threaded ETL with configurable workers
- ✅ **Data Quality Validation**: Comprehensive validation rules and OMOP compliance checks
- ✅ **Error Handling**: Retry logic, circuit breaker, and checkpoint/resume functionality
- ✅ **Web Interface**: Modern React dashboard for managing ETL pipelines (NEW!)
- ✅ **REST API**: FastAPI backend with complete API documentation
- ✅ **CLI Interface**: User-friendly command-line interface for all operations
- ✅ **Vocabulary Management**: Tools for loading and managing OMOP vocabularies
- ✅ **Comprehensive Logging**: Detailed logging with audit trail
## Quick Start
### Option 1: Web Interface (Recommended)
```bash
cd omop
# Install dependencies
pip install -r requirements.txt
pip install -r requirements-api.txt
# Start web interface (API + Frontend)
./start_web.sh
```
Then open http://localhost:3000 in your browser.
See `QUICK_START_WEB.md` for detailed instructions.
### Option 2: Command Line Interface
```bash
# Clone the repository
cd omop
# Install dependencies
pip install -r requirements.txt
# Or install in development mode
pip install -e .
```
### Configuration
1. Copy the example environment file:
```bash
cp .env.example .env
```
2. Edit `.env` with your database credentials:
```
DB_HOST=localhost
DB_PORT=5432
DB_NAME=omop_db
DB_USER=your_user
DB_PASSWORD=your_password
```
3. Review and customize `config.yaml` as needed.
### Create Database Schemas
```bash
# Create all schemas (OMOP, staging, audit)
omop-pipeline schema create --type all
# Or create individually
omop-pipeline schema create --type omop
omop-pipeline schema create --type staging
omop-pipeline schema create --type audit
```
### Load Vocabularies
1. Download vocabularies from [Athena OHDSI](https://athena.ohdsi.org/)
2. Extract the ZIP file to a directory
3. Load vocabularies:
```bash
omop-pipeline vocab load --path /path/to/vocabularies
```
### Run ETL Pipeline
```bash
# Run complete ETL pipeline
omop-pipeline etl run --source staging.raw_patients --target person
# With custom batch size and workers
omop-pipeline etl run --source staging.raw_patients --target person --batch-size 5000 --workers 8
# Run in sequential mode (no parallelization)
omop-pipeline etl run --source staging.raw_patients --target person --sequential
```
## Web Interface
The pipeline includes a modern web interface built with FastAPI and React.
### Features
- 📊 **Dashboard**: Real-time statistics and performance metrics
- ⚙️ **ETL Manager**: Launch and monitor ETL pipelines
- 🗄️ **Schema Manager**: Create and validate database schemas
- ✅ **Validation**: Data quality checks and unmapped codes
- 📝 **Logs**: System logs and validation errors
### Quick Start
```bash
./start_web.sh
```
Access the interface at http://localhost:3000
For more details, see `README_WEB_INTERFACE.md` and `WEB_INTERFACE_SUMMARY.md`.
## CLI Commands
### Schema Management
```bash
# Create schemas
omop-pipeline schema create --type [omop|staging|audit|all]
# Validate schema
omop-pipeline schema validate
```
### ETL Operations
```bash
# Run complete ETL
omop-pipeline etl run --source
--target
# Run extraction only
omop-pipeline etl extract --source
# Run transformation only
omop-pipeline etl transform --target
# Run loading only
omop-pipeline etl load --target
```
### Data Validation
```bash
# Validate data quality
omop-pipeline validate
# Validate specific table
omop-pipeline validate --table person
```
### Statistics
```bash
# Show ETL statistics
omop-pipeline stats show
# Show summary
omop-pipeline stats summary
```
### Vocabulary Management
```bash
# Prepare vocabulary loading (shows instructions)
omop-pipeline vocab prepare
# Load vocabularies
omop-pipeline vocab load --path /path/to/vocabularies
```
### Configuration
```bash
# Validate configuration
omop-pipeline config validate
```
### Logs
```bash
# Show recent log entries
omop-pipeline logs show
# Show last 100 lines
omop-pipeline logs show --lines 100
# Filter by log level
omop-pipeline logs show --level ERROR
```
## Architecture
The pipeline consists of the following components:
- **Extractor**: Extracts data from staging tables with batch processing
- **Concept Mapper**: Maps source codes to OMOP concepts with LRU caching
- **Transformer**: Transforms data to OMOP format with validation
- **Validator**: Validates data quality and OMOP compliance
- **Loader**: Loads data into OMOP tables using bulk operations
- **Orchestrator**: Coordinates the complete ETL flow with parallel processing
- **Error Handler**: Manages errors with retry logic and circuit breaker
- **Schema Manager**: Creates and manages database schemas
- **Vocabulary Loader**: Loads OMOP vocabularies from CSV files
## Configuration
The pipeline is configured via `config.yaml`:
```yaml
database:
host: localhost
port: 5432
database: omop_db
user: postgres
password: ${DB_PASSWORD} # From environment variable
etl:
batch_size: 1000
num_workers: 4
concept_cache_size: 10000
validate_before_load: true
logging:
level: INFO
file: logs/omop_pipeline.log
max_bytes: 10485760
backup_count: 5
```
## Performance
The pipeline is optimized for high-volume data processing:
- **Parallel Processing**: Multi-threaded execution with configurable workers
- **Batch Operations**: Efficient batch processing with PostgreSQL COPY
- **Caching**: LRU cache for frequently used concept mappings
- **Connection Pooling**: Optimized database connection management
Typical performance on a 16-core, 125GB RAM system:
- **Throughput**: 5,000-10,000 records/second
- **Memory Usage**: ~2-4GB per worker
- **CPU Usage**: Scales linearly with number of workers
## Data Quality
The pipeline includes comprehensive data quality checks:
- **Referential Integrity**: Validates all foreign key relationships
- **Date Consistency**: Ensures start dates <= end dates
- **Concept Validation**: Verifies all concept_ids exist
- **Value Ranges**: Checks numeric values are within acceptable ranges
- **OMOP Compliance**: Validates against OMOP CDM specifications
## Error Handling
The pipeline implements robust error handling:
- **Error Levels**: INFO, WARNING, ERROR, CRITICAL
- **Retry Logic**: Exponential backoff for transient errors
- **Circuit Breaker**: Prevents cascading failures
- **Checkpoint/Resume**: Resume processing after interruption
- **Audit Trail**: Complete error logging to audit tables
## Testing
```bash
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_transformer.py
```
## Documentation
- [User Guide](docs/user_guide.md) - Detailed usage instructions
- [Architecture](docs/architecture.md) - System architecture and design
- [Transformation Rules](docs/transformation_rules.md) - Data transformation specifications
- [CHANGELOG](CHANGELOG.md) - Version history and changes
## Requirements
- Python 3.12+
- PostgreSQL 16.11+
- 8GB+ RAM (16GB+ recommended for parallel processing)
- OMOP vocabularies from Athena OHDSI
## License
MIT License - see LICENSE file for details
## Support
For issues, questions, or contributions, please open an issue on GitHub.
## Acknowledgments
- OHDSI Community for OMOP CDM specifications
- Athena OHDSI for vocabulary management