Files
2026-03-05 01:20:15 +01:00
..
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00
2026-03-05 01:20:15 +01:00

OMOP CDM 5.4 Data Pipeline

A comprehensive ETL pipeline for transforming healthcare data to OMOP Common Data Model (CDM) version 5.4 format.

Overview

This pipeline provides a complete solution for:

  • Extracting data from staging tables
  • Mapping source codes to OMOP standard concepts
  • Transforming data to OMOP CDM 5.4 format
  • Validating data quality and OMOP compliance
  • Loading data into OMOP tables with parallel processing

Features

  • Complete OMOP CDM 5.4 Support: All clinical, vocabulary, and metadata tables
  • Automated Concept Mapping: LRU-cached mapping with fallback strategies
  • Parallel Processing: Multi-threaded ETL with configurable workers
  • Data Quality Validation: Comprehensive validation rules and OMOP compliance checks
  • Error Handling: Retry logic, circuit breaker, and checkpoint/resume functionality
  • Web Interface: Modern React dashboard for managing ETL pipelines (NEW!)
  • REST API: FastAPI backend with complete API documentation
  • CLI Interface: User-friendly command-line interface for all operations
  • Vocabulary Management: Tools for loading and managing OMOP vocabularies
  • Comprehensive Logging: Detailed logging with audit trail

Quick Start

cd omop

# Install dependencies
pip install -r requirements.txt
pip install -r requirements-api.txt

# Start web interface (API + Frontend)
./start_web.sh

Then open http://localhost:3000 in your browser.

See QUICK_START_WEB.md for detailed instructions.

Option 2: Command Line Interface

# Clone the repository
cd omop

# Install dependencies
pip install -r requirements.txt

# Or install in development mode
pip install -e .

Configuration

  1. Copy the example environment file:
cp .env.example .env
  1. Edit .env with your database credentials:
DB_HOST=localhost
DB_PORT=5432
DB_NAME=omop_db
DB_USER=your_user
DB_PASSWORD=your_password
  1. Review and customize config.yaml as needed.

Create Database Schemas

# Create all schemas (OMOP, staging, audit)
omop-pipeline schema create --type all

# Or create individually
omop-pipeline schema create --type omop
omop-pipeline schema create --type staging
omop-pipeline schema create --type audit

Load Vocabularies

  1. Download vocabularies from Athena OHDSI
  2. Extract the ZIP file to a directory
  3. Load vocabularies:
omop-pipeline vocab load --path /path/to/vocabularies

Run ETL Pipeline

# Run complete ETL pipeline
omop-pipeline etl run --source staging.raw_patients --target person

# With custom batch size and workers
omop-pipeline etl run --source staging.raw_patients --target person --batch-size 5000 --workers 8

# Run in sequential mode (no parallelization)
omop-pipeline etl run --source staging.raw_patients --target person --sequential

Web Interface

The pipeline includes a modern web interface built with FastAPI and React.

Features

  • 📊 Dashboard: Real-time statistics and performance metrics
  • ⚙️ ETL Manager: Launch and monitor ETL pipelines
  • 🗄️ Schema Manager: Create and validate database schemas
  • Validation: Data quality checks and unmapped codes
  • 📝 Logs: System logs and validation errors

Quick Start

./start_web.sh

Access the interface at http://localhost:3000

For more details, see README_WEB_INTERFACE.md and WEB_INTERFACE_SUMMARY.md.

CLI Commands

Schema Management

# Create schemas
omop-pipeline schema create --type [omop|staging|audit|all]

# Validate schema
omop-pipeline schema validate

ETL Operations

# Run complete ETL
omop-pipeline etl run --source <table> --target <table>

# Run extraction only
omop-pipeline etl extract --source <table>

# Run transformation only
omop-pipeline etl transform --target <table>

# Run loading only
omop-pipeline etl load --target <table>

Data Validation

# Validate data quality
omop-pipeline validate

# Validate specific table
omop-pipeline validate --table person

Statistics

# Show ETL statistics
omop-pipeline stats show

# Show summary
omop-pipeline stats summary

Vocabulary Management

# Prepare vocabulary loading (shows instructions)
omop-pipeline vocab prepare

# Load vocabularies
omop-pipeline vocab load --path /path/to/vocabularies

Configuration

# Validate configuration
omop-pipeline config validate

Logs

# Show recent log entries
omop-pipeline logs show

# Show last 100 lines
omop-pipeline logs show --lines 100

# Filter by log level
omop-pipeline logs show --level ERROR

Architecture

The pipeline consists of the following components:

  • Extractor: Extracts data from staging tables with batch processing
  • Concept Mapper: Maps source codes to OMOP concepts with LRU caching
  • Transformer: Transforms data to OMOP format with validation
  • Validator: Validates data quality and OMOP compliance
  • Loader: Loads data into OMOP tables using bulk operations
  • Orchestrator: Coordinates the complete ETL flow with parallel processing
  • Error Handler: Manages errors with retry logic and circuit breaker
  • Schema Manager: Creates and manages database schemas
  • Vocabulary Loader: Loads OMOP vocabularies from CSV files

Configuration

The pipeline is configured via config.yaml:

database:
  host: localhost
  port: 5432
  database: omop_db
  user: postgres
  password: ${DB_PASSWORD}  # From environment variable

etl:
  batch_size: 1000
  num_workers: 4
  concept_cache_size: 10000
  validate_before_load: true

logging:
  level: INFO
  file: logs/omop_pipeline.log
  max_bytes: 10485760
  backup_count: 5

Performance

The pipeline is optimized for high-volume data processing:

  • Parallel Processing: Multi-threaded execution with configurable workers
  • Batch Operations: Efficient batch processing with PostgreSQL COPY
  • Caching: LRU cache for frequently used concept mappings
  • Connection Pooling: Optimized database connection management

Typical performance on a 16-core, 125GB RAM system:

  • Throughput: 5,000-10,000 records/second
  • Memory Usage: ~2-4GB per worker
  • CPU Usage: Scales linearly with number of workers

Data Quality

The pipeline includes comprehensive data quality checks:

  • Referential Integrity: Validates all foreign key relationships
  • Date Consistency: Ensures start dates <= end dates
  • Concept Validation: Verifies all concept_ids exist
  • Value Ranges: Checks numeric values are within acceptable ranges
  • OMOP Compliance: Validates against OMOP CDM specifications

Error Handling

The pipeline implements robust error handling:

  • Error Levels: INFO, WARNING, ERROR, CRITICAL
  • Retry Logic: Exponential backoff for transient errors
  • Circuit Breaker: Prevents cascading failures
  • Checkpoint/Resume: Resume processing after interruption
  • Audit Trail: Complete error logging to audit tables

Testing

# Run all tests
pytest

# Run with coverage
pytest --cov=src --cov-report=html

# Run specific test file
pytest tests/test_transformer.py

Documentation

Requirements

  • Python 3.12+
  • PostgreSQL 16.11+
  • 8GB+ RAM (16GB+ recommended for parallel processing)
  • OMOP vocabularies from Athena OHDSI

License

MIT License - see LICENSE file for details

Support

For issues, questions, or contributions, please open an issue on GitHub.

Acknowledgments

  • OHDSI Community for OMOP CDM specifications
  • Athena OHDSI for vocabulary management