Files
aivanov_database/omop/WORKFLOW_DIAGRAM.md
2026-03-05 01:20:15 +01:00

468 lines
38 KiB
Markdown

# 🔄 Diagrammes de Flux - OMOP Pipeline
## Architecture Globale
```
┌─────────────────────────────────────────────────────────────┐
│ UTILISATEUR │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ INTERFACE WEB (React) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Dashboard │ │ ETL │ │ Schema │ │ Logs │ │
│ │ │ │ Manager │ │ Manager │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ HTTP REST
┌─────────────────────────────────────────────────────────────┐
│ API FASTAPI │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ETL │ │ Schema │ │ Stats │ │ Logs │ │
│ │ Router │ │ Router │ │ Router │ │ Router │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ SQLAlchemy
┌─────────────────────────────────────────────────────────────┐
│ POSTGRESQL │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ OMOP │ │ Staging │ │ Audit │ │
│ │ Schema │ │ Schema │ │ Schema │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux ETL Complet
```
┌─────────────────────────────────────────────────────────────┐
│ DONNÉES SOURCE │
│ (Fichiers, API, Base externe) │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ STAGING SCHEMA │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ raw_patients │ │ raw_visits │ │ raw_drugs │ │
│ │ │ │ │ │ │ │
│ │ statut: │ │ statut: │ │ statut: │ │
│ │ 'pending' │ │ 'pending' │ │ 'pending' │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ EXTRACTION │
│ • Lecture par batch (1000 records) │
│ • Filtrage par statut 'pending' │
│ • Pagination automatique │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ MAPPING │
│ • Recherche dans SOURCE_TO_CONCEPT_MAP │
│ • Fallback sur CONCEPT_SYNONYM │
│ • Cache LRU (10000 concepts) │
│ • Tracking des codes non mappés │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ TRANSFORMATION │
│ • Conversion vers modèles OMOP │
│ • Génération des IDs (sequences PostgreSQL) │
│ • Validation des champs requis │
│ • Parsing des dates │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ VALIDATION │
│ • Vérification intégrité référentielle │
│ • Validation des dates (start <= end) │
│ • Vérification des concepts │
│ • Calcul des métriques de qualité │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ CHARGEMENT │
│ • Bulk insert (PostgreSQL COPY) │
│ • Gestion des transactions │
│ • Mise à jour statut staging ('processed') │
│ • Tracking des statistiques │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ OMOP SCHEMA │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ PERSON │ │ VISIT │ │ CONDITION │ │
│ │ │ │ OCCURRENCE │ │ OCCURRENCE │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux Interface Web
```
┌─────────────────────────────────────────────────────────────┐
│ UTILISATEUR │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ OUVRE http://localhost:3000 │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ DASHBOARD │
│ • Affiche les statistiques │
│ • Requête GET /api/stats/summary │
│ • Refresh automatique (5s) │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ETL MANAGER │
│ • Remplit le formulaire │
│ • Clique "Lancer le pipeline" │
│ • Requête POST /api/etl/run │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ BACKEND API │
│ • Démarre le job ETL │
│ • Retourne job_id │
│ • Exécute en background │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ SUIVI DU JOB │
│ • Requête GET /api/etl/jobs/{job_id} │
│ • Refresh automatique (2s) │
│ • Affiche progression │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ JOB TERMINÉ │
│ • Statut: completed │
│ • Affiche statistiques │
│ • Retour au Dashboard │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux de Données API
```
┌─────────────────────────────────────────────────────────────┐
│ REACT FRONTEND │
│ │
│ useQuery({ │
│ queryKey: ['stats'], │
│ queryFn: () => api.stats.summary() │
│ }) │
└────────────────────────┬────────────────────────────────────┘
│ HTTP GET
┌─────────────────────────────────────────────────────────────┐
│ AXIOS CLIENT │
│ │
│ axios.get('http://localhost:8000/api/stats/summary') │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ FASTAPI ROUTER │
│ │
│ @router.get("/summary") │
│ async def get_summary(): │
│ # Requête SQL │
│ return {"status": "success", "data": ...} │
└────────────────────────┬────────────────────────────────────┘
│ SQLAlchemy
┌─────────────────────────────────────────────────────────────┐
│ POSTGRESQL │
│ │
│ SELECT COUNT(*) FROM omop.person; │
│ SELECT COUNT(*) FROM staging.raw_patients │
│ WHERE statut_traitement = 'pending'; │
└────────────────────────┬────────────────────────────────────┘
│ Résultats
┌─────────────────────────────────────────────────────────────┐
│ REACT FRONTEND │
│ │
│ { │
│ "omop_records": {"person": 100, ...}, │
│ "staging_pending": 662, │
│ "executions_24h": {"total": 5, ...} │
│ } │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux de Validation
```
┌─────────────────────────────────────────────────────────────┐
│ UTILISATEUR CLIQUE "VALIDER" │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ POST /api/validation/run │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ VALIDATOR │
│ ┌──────────────────────────────────────────────┐ │
│ │ 1. Vérification intégrité référentielle │ │
│ │ • person_id existe ? │ │
│ │ • concept_id existe ? │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 2. Validation des dates │ │
│ │ • start_date <= end_date ? │ │
│ │ • dates dans le futur ? │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 3. Validation des valeurs │ │
│ │ • valeurs numériques dans les ranges ? │ │
│ │ • champs requis présents ? │ │
│ └──────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ENREGISTREMENT DES ERREURS │
│ │
│ INSERT INTO audit.validation_errors ( │
│ table_name, record_id, error_type, error_message │
│ ) │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ CALCUL DES MÉTRIQUES │
│ │
│ INSERT INTO audit.data_quality_metrics ( │
│ table_name, metric_name, metric_value │
│ ) │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ AFFICHAGE DES RÉSULTATS │
│ │
│ • Nombre d'erreurs │
│ • Codes non mappés │
│ • Métriques de qualité │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux de Création de Schéma
```
┌─────────────────────────────────────────────────────────────┐
│ UTILISATEUR CLIQUE "CRÉER TOUS LES SCHÉMAS" │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ POST /api/schema/create │
│ {"schema_type": "all"} │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ SCHEMA MANAGER │
│ ┌──────────────────────────────────────────────┐ │
│ │ 1. Créer schéma OMOP │ │
│ │ • Lecture de omop_cdm_5.4.sql │ │
│ │ • Exécution des CREATE TABLE │ │
│ │ • Création des indexes │ │
│ │ • Création des foreign keys │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 2. Créer schéma Staging │ │
│ │ • Lecture de staging.sql │ │
│ │ • Exécution des CREATE TABLE │ │
│ │ • Création des indexes │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 3. Créer schéma Audit │ │
│ │ • Lecture de audit.sql │ │
│ │ • Exécution des CREATE TABLE │ │
│ │ • Création des indexes │ │
│ │ • Création des views │ │
│ └──────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ VALIDATION DES SCHÉMAS │
│ │
│ SELECT COUNT(*) FROM pg_tables │
│ WHERE schemaname IN ('omop', 'staging', 'audit') │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ AFFICHAGE DU RÉSULTAT │
│ │
│ ✓ Schéma OMOP créé (32 tables) │
│ ✓ Schéma Staging créé (12 tables) │
│ ✓ Schéma Audit créé (9 tables) │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux de Monitoring Temps Réel
```
┌─────────────────────────────────────────────────────────────┐
│ DASHBOARD │
│ (Refresh automatique 5s) │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ TanStack Query │
│ │
│ useQuery({ │
│ queryKey: ['stats'], │
│ queryFn: fetchStats, │
│ refetchInterval: 5000 // 5 secondes │
│ }) │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ GET /api/stats/summary │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ POSTGRESQL │
│ │
│ • Compte des records OMOP │
│ • Compte des records en staging │
│ • Statistiques des exécutions │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ MISE À JOUR DE L'INTERFACE │
│ │
│ • Mise à jour des compteurs │
│ • Mise à jour des graphiques │
│ • Mise à jour des tableaux │
│ • Animation des changements │
└─────────────────────────────────────────────────────────────┘
```
---
## Flux d'Erreur
```
┌─────────────────────────────────────────────────────────────┐
│ ERREUR PENDANT L'ETL │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ERROR HANDLER │
│ ┌──────────────────────────────────────────────┐ │
│ │ 1. Classification de l'erreur │ │
│ │ • INFO, WARNING, ERROR, CRITICAL │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 2. Retry avec exponential backoff │ │
│ │ • Tentative 1: attendre 1s │ │
│ │ • Tentative 2: attendre 2s │ │
│ │ • Tentative 3: attendre 4s │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 3. Circuit breaker │ │
│ │ • Si taux d'erreur > 50% │ │
│ │ • Arrêt du pipeline │ │
│ └──────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ LOGGING │
│ │
│ • Log dans fichier (logs/omop_pipeline.log) │
│ • Log dans base (audit.etl_execution) │
│ • Log dans console │
└────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ NOTIFICATION UTILISATEUR │
│ │
│ • Affichage dans l'interface │
│ • Badge rouge "FAILED" │
│ • Message d'erreur détaillé │
└─────────────────────────────────────────────────────────────┘
```
---
## Légende
```
┌─────────┐
│ Étape │ = Processus ou action
└─────────┘
▼ = Flux de données
┌─────────────────────────────────────────────────────────────┐
│ TITRE │
│ • Point 1 │
│ • Point 2 │
└─────────────────────────────────────────────────────────────┘
= Bloc avec détails
```
---
## 🎯 Résumé des Flux
1. **Architecture** : Frontend → API → Database
2. **ETL** : Staging → Extract → Map → Transform → Validate → Load → OMOP
3. **Interface** : User → Dashboard → API → Database → Display
4. **API** : React → Axios → FastAPI → SQLAlchemy → PostgreSQL
5. **Validation** : Trigger → Validator → Checks → Errors → Metrics
6. **Schema** : User → API → SchemaManager → SQL → Database
7. **Monitoring** : Dashboard → Query → API → Database → Update
8. **Erreur** : Error → Handler → Retry → Log → Notify
**Tous les flux sont documentés et fonctionnels ! 🚀**