333 lines
11 KiB
Python
Executable File
333 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Generate Sample Data for OMOP Pipeline Testing
|
|
|
|
This script generates fictional healthcare data and loads it into staging tables.
|
|
It creates realistic but completely fake patient, visit, condition, and drug data.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
import random
|
|
from faker import Faker
|
|
from sqlalchemy import text
|
|
import psycopg2
|
|
|
|
# Database configuration
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'port': 5432,
|
|
'database': 'omop_cdm',
|
|
'user': 'dom',
|
|
'password': 'loli'
|
|
}
|
|
|
|
# Initialize Faker for generating fake data
|
|
fake = Faker('fr_FR') # French locale
|
|
Faker.seed(42) # For reproducibility
|
|
random.seed(42)
|
|
|
|
# Sample medical codes
|
|
ICD10_CODES = [
|
|
('E11.9', 'Diabète de type 2 sans complication'),
|
|
('I10', 'Hypertension essentielle'),
|
|
('J45.9', 'Asthme non précisé'),
|
|
('M79.3', 'Panniculite non précisée'),
|
|
('K21.9', 'Reflux gastro-oesophagien sans oesophagite'),
|
|
]
|
|
|
|
ATC_CODES = [
|
|
('A10BA02', 'Metformine'),
|
|
('C09AA02', 'Enalapril'),
|
|
('R03AC02', 'Salbutamol'),
|
|
('A02BC01', 'Oméprazole'),
|
|
('N02BE01', 'Paracétamol'),
|
|
]
|
|
|
|
VISIT_TYPES = [
|
|
('consultation', 'Consultation externe'),
|
|
('urgence', 'Urgence'),
|
|
('hospitalisation', 'Hospitalisation'),
|
|
]
|
|
|
|
|
|
def generate_patients(num_patients=100):
|
|
"""Generate fake patient data."""
|
|
patients = []
|
|
|
|
for i in range(num_patients):
|
|
birth_date = fake.date_of_birth(minimum_age=18, maximum_age=90)
|
|
|
|
patient = {
|
|
'source_patient_id': f'PAT{i+1:05d}',
|
|
'date_naissance': birth_date,
|
|
'sexe': random.choice(['M', 'F']),
|
|
'code_postal': fake.postcode(),
|
|
'source_fichier': 'sample_data_generation',
|
|
'statut_traitement': 'pending'
|
|
}
|
|
patients.append(patient)
|
|
|
|
return patients
|
|
|
|
|
|
def generate_visits(patients, visits_per_patient=3):
|
|
"""Generate fake visit data."""
|
|
visits = []
|
|
visit_id = 1
|
|
|
|
for patient in patients:
|
|
num_visits = random.randint(1, visits_per_patient)
|
|
|
|
for _ in range(num_visits):
|
|
visit_type, visit_desc = random.choice(VISIT_TYPES)
|
|
|
|
# Generate visit dates (within last 2 years)
|
|
days_ago = random.randint(1, 730)
|
|
visit_start = datetime.now() - timedelta(days=days_ago)
|
|
|
|
# Visit duration
|
|
if visit_type == 'hospitalisation':
|
|
duration = random.randint(1, 14)
|
|
elif visit_type == 'urgence':
|
|
duration = random.randint(0, 1)
|
|
else:
|
|
duration = 0
|
|
|
|
visit_end = visit_start + timedelta(days=duration)
|
|
|
|
visit = {
|
|
'source_visit_id': f'VIS{visit_id:06d}',
|
|
'source_patient_id': patient['source_patient_id'],
|
|
'type_visite': visit_type,
|
|
'date_debut': visit_start,
|
|
'date_fin': visit_end,
|
|
'source_fichier': 'sample_data_generation',
|
|
'statut_traitement': 'pending'
|
|
}
|
|
visits.append(visit)
|
|
visit_id += 1
|
|
|
|
return visits
|
|
|
|
|
|
def generate_conditions(visits):
|
|
"""Generate fake condition/diagnosis data."""
|
|
conditions = []
|
|
condition_id = 1
|
|
|
|
for visit in visits:
|
|
# 70% chance of having a condition
|
|
if random.random() < 0.7:
|
|
num_conditions = random.randint(1, 2)
|
|
|
|
for _ in range(num_conditions):
|
|
code, description = random.choice(ICD10_CODES)
|
|
|
|
condition = {
|
|
'source_condition_id': f'COND{condition_id:06d}',
|
|
'source_patient_id': visit['source_patient_id'],
|
|
'source_visit_id': visit['source_visit_id'],
|
|
'code_diagnostic': code,
|
|
'systeme_codage': 'ICD10',
|
|
'date_diagnostic': visit['date_debut'].date(),
|
|
'source_fichier': 'sample_data_generation',
|
|
'statut_traitement': 'pending'
|
|
}
|
|
conditions.append(condition)
|
|
condition_id += 1
|
|
|
|
return conditions
|
|
|
|
|
|
def generate_drugs(visits):
|
|
"""Generate fake drug prescription data."""
|
|
drugs = []
|
|
drug_id = 1
|
|
|
|
for visit in visits:
|
|
# 60% chance of having a drug prescription
|
|
if random.random() < 0.6:
|
|
num_drugs = random.randint(1, 3)
|
|
|
|
for _ in range(num_drugs):
|
|
code, description = random.choice(ATC_CODES)
|
|
|
|
drug_start = visit['date_debut']
|
|
duration = random.randint(7, 90)
|
|
drug_end = drug_start + timedelta(days=duration)
|
|
|
|
drug = {
|
|
'source_drug_id': f'DRUG{drug_id:06d}',
|
|
'source_patient_id': visit['source_patient_id'],
|
|
'source_visit_id': visit['source_visit_id'],
|
|
'code_medicament': code,
|
|
'systeme_codage': 'ATC',
|
|
'date_debut': drug_start.date(),
|
|
'date_fin': drug_end.date(),
|
|
'quantite': random.randint(1, 3),
|
|
'duree_traitement': duration,
|
|
'source_fichier': 'sample_data_generation',
|
|
'statut_traitement': 'pending'
|
|
}
|
|
drugs.append(drug)
|
|
drug_id += 1
|
|
|
|
return drugs
|
|
|
|
|
|
def load_data_to_staging(patients, visits, conditions, drugs):
|
|
"""Load generated data into staging tables."""
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Load patients
|
|
print(f"Loading {len(patients)} patients...")
|
|
for patient in patients:
|
|
cursor.execute("""
|
|
INSERT INTO staging.raw_patients
|
|
(source_patient_id, date_naissance, sexe, code_postal,
|
|
source_fichier, statut_traitement)
|
|
VALUES
|
|
(%s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
patient['source_patient_id'],
|
|
patient['date_naissance'],
|
|
patient['sexe'],
|
|
patient['code_postal'],
|
|
patient['source_fichier'],
|
|
patient['statut_traitement']
|
|
))
|
|
|
|
# Load visits
|
|
print(f"Loading {len(visits)} visits...")
|
|
for visit in visits:
|
|
cursor.execute("""
|
|
INSERT INTO staging.raw_visits
|
|
(source_visit_id, source_patient_id, type_visite,
|
|
date_debut, date_fin, source_fichier, statut_traitement)
|
|
VALUES
|
|
(%s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
visit['source_visit_id'],
|
|
visit['source_patient_id'],
|
|
visit['type_visite'],
|
|
visit['date_debut'],
|
|
visit['date_fin'],
|
|
visit['source_fichier'],
|
|
visit['statut_traitement']
|
|
))
|
|
|
|
# Load conditions
|
|
print(f"Loading {len(conditions)} conditions...")
|
|
for condition in conditions:
|
|
cursor.execute("""
|
|
INSERT INTO staging.raw_conditions
|
|
(source_condition_id, source_patient_id, source_visit_id,
|
|
code_diagnostic, systeme_codage, date_diagnostic,
|
|
source_fichier, statut_traitement)
|
|
VALUES
|
|
(%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
condition['source_condition_id'],
|
|
condition['source_patient_id'],
|
|
condition['source_visit_id'],
|
|
condition['code_diagnostic'],
|
|
condition['systeme_codage'],
|
|
condition['date_diagnostic'],
|
|
condition['source_fichier'],
|
|
condition['statut_traitement']
|
|
))
|
|
|
|
# Load drugs
|
|
print(f"Loading {len(drugs)} drug prescriptions...")
|
|
for drug in drugs:
|
|
cursor.execute("""
|
|
INSERT INTO staging.raw_drugs
|
|
(source_drug_id, source_patient_id, source_visit_id,
|
|
code_medicament, systeme_codage, date_debut, date_fin,
|
|
quantite, source_fichier, statut_traitement)
|
|
VALUES
|
|
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
drug['source_drug_id'],
|
|
drug['source_patient_id'],
|
|
drug['source_visit_id'],
|
|
drug['code_medicament'],
|
|
drug['systeme_codage'],
|
|
drug['date_debut'],
|
|
drug['date_fin'],
|
|
drug['quantite'],
|
|
drug['source_fichier'],
|
|
drug['statut_traitement']
|
|
))
|
|
|
|
conn.commit()
|
|
print("✓ All sample data loaded successfully!")
|
|
|
|
# Print summary
|
|
print("\n" + "="*60)
|
|
print("SAMPLE DATA GENERATION SUMMARY")
|
|
print("="*60)
|
|
print(f"Patients: {len(patients)}")
|
|
print(f"Visits: {len(visits)}")
|
|
print(f"Conditions: {len(conditions)}")
|
|
print(f"Drug prescriptions: {len(drugs)}")
|
|
print("="*60)
|
|
print("\nData loaded into staging tables with status 'pending'")
|
|
print("Ready for ETL processing!")
|
|
print("="*60)
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f"Error loading data: {str(e)}")
|
|
raise
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
"""Main function."""
|
|
print("Generating sample healthcare data...")
|
|
print("="*60)
|
|
|
|
# Configuration
|
|
num_patients = 100
|
|
visits_per_patient = 3
|
|
|
|
# Generate data
|
|
print(f"Generating {num_patients} patients...")
|
|
patients = generate_patients(num_patients)
|
|
|
|
print(f"Generating visits (avg {visits_per_patient} per patient)...")
|
|
visits = generate_visits(patients, visits_per_patient)
|
|
|
|
print("Generating conditions/diagnoses...")
|
|
conditions = generate_conditions(visits)
|
|
|
|
print("Generating drug prescriptions...")
|
|
drugs = generate_drugs(visits)
|
|
|
|
print("\nData generation complete!")
|
|
print(f" - {len(patients)} patients")
|
|
print(f" - {len(visits)} visits")
|
|
print(f" - {len(conditions)} conditions")
|
|
print(f" - {len(drugs)} drug prescriptions")
|
|
|
|
# Load data
|
|
print("\nConnecting to database and loading data...")
|
|
load_data_to_staging(patients, visits, conditions, drugs)
|
|
|
|
print("\n✓ Sample data generation complete!")
|
|
print("\nNext steps:")
|
|
print(" 1. Run ETL pipeline: omop-pipeline etl run --source staging.raw_patients --target person")
|
|
print(" 2. Check results: omop-pipeline stats show")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|