chore: suppression scripts obsolètes, anciens benchmarks et fichiers de dev

- Suppression scripts racine : analyze_pdfs.py, rebuild_index.py,
  compare_cpam_models.py, test_cpam_quality.py, test_quality_tier_live.py
- Suppression docs obsolètes : rapport_analyse_pdfs.md,
  ANALYSE_COHERENCE_ET_AMELIORATIONS.md, patch_0+1.md
- Suppression outils CPAM legacy : extract_t2a_llm.py, parse_decision_ucr.py
- Suppression backups CPAM : *.xlsx_old
- Suppression hors-git : 19 archives .zip, cache gemma3.bak

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-03-07 16:49:26 +01:00
parent 4e2b4bd946
commit 2478928798
12 changed files with 0 additions and 7794 deletions

View File

@@ -1,891 +0,0 @@
# Analyse Complète et Recommandations d'Amélioration
## T2A v2 - Système Expert de Codage Médical
**Date**: 2026-02-19
**Version analysée**: rules_bio_v2 + lab_sanity_v1 + ruled_out_v1
**Analyse**: Codebase complète (45 fichiers Python, ~11 000 lignes)
---
## 0. PÉRIMÈTRE DE L'ANALYSE
### Architecture Complète Analysée
```
src/
├── anonymization/ # 4 fichiers, ~900 LOC - Anonymisation PII
├── extraction/ # 6 fichiers, ~900 LOC - Extraction PDF/parsing
├── medical/ # 13 fichiers, ~5500 LOC - Cœur métier
├── quality/ # 2 fichiers, ~1000 LOC - Vetos + décisions
├── control/ # 2 fichiers, ~1200 LOC - Contrôle CPAM
├── viewer/ # 4 fichiers, ~1500 LOC - Interface web
├── export/ # 1 fichier, ~200 LOC - Export RUM
├── main.py # 600 LOC - Orchestration
└── config.py # 500 LOC - Modèles de données
Total: 45 fichiers, ~11 000 LOC
Tests: 30 fichiers, ~6000 LOC
```
### Modules Critiques Identifiés
1. **medical/cim10_extractor.py** (1352 LOC) - Extraction diagnostics/actes
2. **medical/rag_search.py** (849 LOC) - Enrichissement RAG/LLM
3. **control/cpam_response.py** (1046 LOC) - Génération contre-arguments CPAM
4. **viewer/app.py** (872 LOC) - Interface web Flask
5. **quality/decision_engine.py** (593 LOC) - Moteur de décisions
6. **quality/veto_engine.py** (402 LOC) - Règles de qualité
---
## 1. ÉTAT ACTUEL DU SYSTÈME
### ✅ Points Forts
#### Architecture Modulaire
- **Séparation claire** : extraction → anonymisation → analyse → qualité → fusion
- **Configuration YAML** : 3 fichiers distincts et cohérents
- `reference_ranges.yaml` : normes biologiques médicales
- `bio_rules.yaml` : règles de validation diagnostique
- `lab_value_sanity.yaml` : garde-fous d'extraction
- **Traçabilité complète** : chaque décision est documentée avec preuves
#### Système de Qualité Robuste
- **16+ règles VETO** implémentées (VETO-02, 03, 06, 07, 09, 12, 15, 16, 17)
- **3 niveaux de sévérité** : HARD (bloquant) / MEDIUM (info requise) / LOW (alerte)
- **Verdicts clairs** : PASS / NEED_INFO / FAIL
- **Métriques détaillées** : actifs/total/écartés/ruled_out/removed/no_code
#### Validation Biologique Intelligente
- **Détection ruled_out** : diagnostics contredits par la biologie (ex: thrombopénie avec PLT=270)
- **Sanity checks** : identification des valeurs aberrantes (ex: K=8 → suspect)
- **Safe zones** : seuils conservateurs pour âge inconnu
- **VETO-17** : alerte si diagnostic d'ionogramme sans valeur extraite
#### Extraction PDF Performante
- **pdfplumber 0.11.9** : extraction texte natif (pas d'OCR)
- **Rapide** : ~30-50s par dossier avec cache
- **Filtrage artefacts** : détection patterns OCR Trackare
---
## 2. ANALYSE DE COHÉRENCE
### ✅ Cohérence Globale : EXCELLENTE
#### Architecture Complète
```
Pipeline Principal (main.py):
1. Extraction PDF → document_classifier → split_documents
2. Parsing → crh_parser / trackare_parser
3. Anonymisation → 3 phases (regex → NER → sweep)
4. Analyse médicale → edsnlp + cim10_extractor
5. Enrichissement RAG → rag_search (optionnel)
6. Qualité → veto_engine + decision_engine
7. Fusion multi-PDF → merge_dossiers
8. Export → JSON + RUM + viewer web
Modules Transverses:
- cim10_dict / ccam_dict : Référentiels
- rag_index : FAISS vectoriel (22k+ vecteurs)
- ollama_cache : Cache LLM
- severity : Évaluation CMA/CMS
- ghm : Estimation GHM
- cpam_response : Contre-arguments CPAM
```
#### Points Forts Supplémentaires Identifiés
**1. Système de Validation Multi-Niveaux**
- **Tests unitaires** : 30 fichiers, ~6000 LOC, couverture ~80%
- **Interface de validation** : `viewer/validation.py` avec annotations manuelles
- **Métriques de performance** : Benchmarking multi-modèles
- **Contrôle CPAM** : Parsing Excel + génération réponses structurées
**2. Gestion Avancée des Référentiels**
- **Référentiels utilisateur** : Upload/indexation dynamique (viewer/referentiels.py)
- **Chunking intelligent** : TXT, CSV, PDF avec stratégies adaptées
- **Mise à jour à chaud** : Rebuild index sans redémarrage
**3. Extraction Biologique Sophistiquée**
```python
# cim10_extractor.py lignes 800-900
- Détection normes document : "[N: 135-145]"
- Parsing multi-formats : "4,5" / "4.5" / "4 mmol/L"
- Sanity checks : lab_value_sanity.yaml
- Interprétation clinique : clinical_context.py
```
**4. Système de Fusion Intelligent**
```python
# fusion.py
- Déduplication sémantique (apply_semantic_dedup)
- Hiérarchie codes parent/enfant
- Préférence codes enrichis RAG
- Gestion conflits DP/DAS
```
**5. Anonymisation Robuste**
```python
# anonymization/
- Phase 1 : Regex (IPP, RPPS, dates, téléphones)
- Phase 2 : NER CamemBERT (noms, prénoms)
- Phase 3 : Sweep patterns résiduels
- Whitelist : Établissements médicaux préservés
```
**6. Interface Web Complète**
```python
# viewer/app.py
- Dashboard : Stats verdicts, top VETOs
- Détail dossier : Preuves cliniques, sources RAG
- PDF redacté : Annotations + highlights
- Admin référentiels : Upload/delete/rebuild
- Validation : Annotations manuelles + métriques
```
---
## 3. LACUNES IDENTIFIÉES (REVUE COMPLÈTE)
### 🔴 Critiques (Impact Fort)
#### 3.1 Règles Biologiques Incomplètes ✅ CONFIRMÉ
**Fichiers concernés** :
- `src/quality/decision_engine.py` (lignes 100-400)
- `config/bio_rules.yaml` (3 règles seulement)
**Règles actuelles** :
```python
# decision_engine.py lignes 380-450
- hyponatremia (E87.1) vs sodium
- hyperkalemia (E87.5) vs potassium
- hypokalemia (E87.6) vs potassium
```
**Diagnostics manquants** (confirmés par analyse codebase) :
- **Anémie** (D50-D64) : Code présent dans `_anemia_bio()` mais incomplet
- **Insuffisance rénale** (N17-N19) : Détection partielle dans veto_engine.py ligne 355
- **Hypoglycémie/Hyperglycémie** : Aucune règle
- **Troubles hépatiques** (K70-K77) : Aucune validation ASAT/ALAT
- **Hypercalcémie/Hypocalcémie** : Aucune règle
- **Troubles thyroïdiens** : Aucune règle
**Impact** : ~60% des diagnostics biologiques non validés
#### 3.2 Extraction Ionogrammes Partielle ✅ CONFIRMÉ
**Fichier** : `src/medical/cim10_extractor.py` lignes 800-950
**Tests extraits actuellement** :
```python
# _extract_biologie() ligne 850
BIO_PATTERNS = {
"CRP", "ASAT", "ALAT", "Créatinine", "Hémoglobine",
"Leucocytes", "Plaquettes", "Sodium", "Potassium"
}
```
**Tests manquants** :
- Chlore, Calcium, Magnésium, Phosphore
- Glucose, HbA1c, Urée
- TSH, T3, T4, Bilirubine totale/conjuguée
- GGT, PAL (partiellement présents dans lab_value_sanity.yaml mais pas extraits)
**Impact** : Impossible de valider E87.2/E87.3 (acidose/alcalose), E83.x (calcium/magnésium)
#### 3.3 Pas de Validation Temporelle ✅ NOUVEAU
**Fichiers analysés** :
- `src/config.py` (Sejour model)
- `src/quality/veto_engine.py` (aucune règle temporelle)
**Champs disponibles non exploités** :
```python
# config.py Sejour
date_entree: str | None
date_sortie: str | None
duree_sejour: int | None
```
**Exemples manquants** :
- DAS "aigu" avec séjour > 30 jours
- Durée incohérente avec pathologie (AVC avec 1 jour)
- Dates actes hors période séjour
**Impact** : Risque de sur-codage chronique/aigu
#### 3.4 Pas de Validation Âge/Sexe ✅ NOUVEAU
**Fichiers analysés** :
- `src/extraction/crh_parser.py` / `trackare_parser.py` (extraction âge/sexe)
- `src/quality/veto_engine.py` (aucune règle démographique)
**Champs disponibles non exploités** :
```python
# config.py Patient
sexe: str | None # "M" / "F"
date_naissance: str | None
age: int | None
```
**Impact** : Erreurs grossières non détectées (grossesse chez homme, etc.)
#### 3.5 VETO-09 Trop Basique ✅ CONFIRMÉ
**Fichier** : `src/quality/veto_engine.py` lignes 330-360
**Code actuel** :
```python
# Seulement 2 validations :
1. Plaquettes vs D69 (thrombopénie)
2. Créatinine vs N17/N18/N19 (insuffisance rénale) - LOW severity seulement
```
**Manque** :
- Hémoglobine vs anémie (D50-D64)
- Leucocytes vs leucopénie/leucocytose (D70/D72)
- Glucose vs diabète (E10-E14)
- Transaminases vs hépatite (K70-K77)
- CRP vs inflammation (R50)
**Impact** : 80% des contradictions biologiques non détectées
#### 3.6 Pas de Règles de Cohérence Inter-Diagnostics ✅ NOUVEAU
**Fichiers analysés** :
- `src/medical/fusion.py` (déduplication sémantique partielle)
- `src/medical/exclusion_rules.py` (exclusions symptômes/précis uniquement)
**Règles existantes** :
```python
# exclusion_rules.py
- Symptômes exclus si diagnostic précis présent
- Ex: R10 (douleur abdominale) exclu si K35 (appendicite)
```
**Manque** :
- Diagnostics mutuellement exclusifs (E10 + E11)
- Incompatibilités cliniques (obésité + dénutrition)
- Hiérarchies codes (K81.0 exclut K81.9)
**Impact** : Incohérences cliniques non signalées
#### 3.7 Pas de Validation Actes/Diagnostics ✅ NOUVEAU
**Fichiers analysés** :
- `src/medical/cim10_extractor.py` (extraction actes CCAM)
- `src/medical/ccam_noncumul.py` (non-cumul uniquement)
**Règles existantes** :
```python
# ccam_noncumul.py
- Détection actes non-cumulables même jour
- Ex: HFCA001 + HFCA002 (cholécystectomie)
```
**Manque** :
- Acte chirurgical nécessite diagnostic justificatif
- Diagnostic nécessite acte (si séjour chirurgical)
**Impact** : Actes non justifiés non détectés
### 🟠 Importantes (Impact Moyen)
#### 3.8 Système de Cache LLM Basique ✅ NOUVEAU
**Fichier** : `src/medical/ollama_cache.py` (85 LOC)
**Implémentation actuelle** :
```python
# Cache JSON simple sur disque
- Clé : hash(model + prompt + params)
- Pas de TTL
- Pas de limite taille
- Pas de stratégie éviction
```
**Manque** :
- Cache distribué (Redis)
- TTL configurable
- Limite mémoire/disque
- Métriques hit rate
**Impact** : Performance dégradée sur gros volumes
#### 3.9 Pas de Scoring de Confiance Global ✅ CONFIRMÉ
**Fichier** : `src/quality/veto_engine.py` lignes 390-402
**Score actuel** :
```python
# Calcul simpliste
score = 100
for issue in issues:
if severity == "HARD": score -= 30
elif severity == "MEDIUM": score -= 10
else: score -= 3
```
**Manque** :
- Pondération par type VETO
- Score de complétude extraction
- Indicateur fiabilité RAG
- Taux de confiance LLM agrégé
**Impact** : Difficile de prioriser dossiers à revoir
#### 3.10 Interface Web Sans Authentification ✅ NOUVEAU
**Fichier** : `src/viewer/app.py` (872 LOC)
**Sécurité actuelle** :
```python
# Aucune authentification
# Aucune autorisation
# Pas de HTTPS forcé
# Pas de CSRF protection
```
**Impact** : Risque sécurité en production
### 🟡 Mineures (Impact Faible)
#### 3.11 Pas de Suggestions Automatiques ✅ CONFIRMÉ
**Fichiers analysés** : Aucun module de suggestions
**Manque** :
- Suggestions corrections automatiques
- Codes alternatifs proposés
- DAS manquants évidents
#### 3.12 Logs Non Structurés ✅ NOUVEAU
**Fichier** : `src/main.py` (utilise logging standard)
**Manque** :
- Logs JSON structurés
- Corrélation ID par dossier
- Métriques Prometheus
- Tracing distribué
---
## 4. RECOMMANDATIONS PRIORITAIRES
### 🎯 Phase 1 : Règles Biologiques Complètes (Priorité HAUTE)
#### 4.1 Étendre `bio_rules.yaml`
```yaml
rules:
# Ionogrammes (existant)
hyponatremia: { codes: ["E87.1"], analyte: sodium }
hyperkalemia: { codes: ["E87.5"], analyte: potassium }
hypokalemia: { codes: ["E87.6"], analyte: potassium }
# NOUVEAU : Anémies
anemia_iron_deficiency:
codes: ["D50.0", "D50.1", "D50.8", "D50.9"]
analyte: hemoglobin
threshold_type: low
anemia_other:
codes: ["D51", "D52", "D53", "D55-D64"]
analyte: hemoglobin
threshold_type: low
# NOUVEAU : Insuffisance rénale
acute_kidney_injury:
codes: ["N17.0", "N17.1", "N17.2", "N17.8", "N17.9"]
analyte: creatinine
threshold_type: high
chronic_kidney_disease:
codes: ["N18.1", "N18.2", "N18.3", "N18.4", "N18.5"]
analyte: creatinine
threshold_type: high
requires_gfr: true # Calcul DFG nécessaire
# NOUVEAU : Diabète
hyperglycemia:
codes: ["E16.1", "R73.9"]
analyte: glucose
threshold_type: high
hypoglycemia:
codes: ["E16.2"]
analyte: glucose
threshold_type: low
diabetes_uncontrolled:
codes: ["E10.1", "E11.1"] # avec complications
analyte: hba1c
threshold_type: high
threshold_value: 9.0 # > 9% = déséquilibré
# NOUVEAU : Troubles hépatiques
hepatic_cytolysis:
codes: ["K72.0", "K72.9", "K75.9"]
analytes: ["asat", "alat"] # multi-analytes
threshold_type: high
threshold_multiplier: 3 # > 3x normale
cholestasis:
codes: ["K83.1"]
analytes: ["ggt", "pal"]
threshold_type: high
# NOUVEAU : Inflammation
inflammatory_syndrome:
codes: ["R50.9"] # Fièvre sans précision
analyte: crp
threshold_type: high
threshold_value: 10 # > 10 mg/L
```
#### 4.2 Étendre Extraction Biologique
**Fichier** : `src/medical/cim10_extractor.py`
**Ajouter patterns** :
```python
BIO_PATTERNS = {
# Existant
"sodium": r"(?:sodium|na)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"potassium": r"(?:potassium|kalium|k)\s*[:\s]*(\d+(?:[.,]\d+)?)",
# NOUVEAU
"chlore": r"(?:chlore|cl)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"calcium": r"(?:calcium|ca)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"magnesium": r"(?:magn[ée]sium|mg)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"glucose": r"(?:glucose|glyc[ée]mie)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"hba1c": r"(?:hba1c|h[ée]moglobine\s+glyqu[ée]e)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"uree": r"(?:ur[ée]e)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"tsh": r"(?:tsh)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"t3": r"(?:t3)\s*[:\s]*(\d+(?:[.,]\d+)?)",
"t4": r"(?:t4)\s*[:\s]*(\d+(?:[.,]\d+)?)",
}
```
#### 4.3 Étendre `lab_value_sanity.yaml`
```yaml
tests:
# Existant : potassium, sodium, plaquettes, hemoglobine...
# NOUVEAU
chlore:
hard_min: 70
hard_max: 150
calcium:
hard_min: 1.5
hard_max: 4.0
glucose:
hard_min: 1.0
hard_max: 50.0
suspect:
single_digit_over: 8.0 # "9" souvent = "4.9"
hba1c:
hard_min: 3.0
hard_max: 20.0
tsh:
hard_min: 0.01
hard_max: 100.0
```
**Effort** : 2-3 jours
**Impact** : +60% diagnostics biologiques validés
---
### 🎯 Phase 2 : Validation Démographique (Priorité HAUTE)
#### 4.4 Créer `config/demographic_rules.yaml`
```yaml
version: 1
age_rules:
pediatric_only:
codes: ["P00-P96"] # Affections périnatales
max_age_years: 1
veto: VETO-18
severity: HARD
pregnancy_related:
codes: ["O00-O99"] # Grossesse, accouchement
min_age_years: 12
max_age_years: 55
required_sex: F
veto: VETO-19
severity: HARD
menopause:
codes: ["N95"]
min_age_years: 40
required_sex: F
veto: VETO-19
severity: MEDIUM
prostate:
codes: ["C61", "N40", "N41", "N42"]
required_sex: M
veto: VETO-19
severity: HARD
sex_rules:
male_only:
codes: ["C61", "N40-N51", "Z12.5"]
required_sex: M
veto: VETO-19
severity: HARD
female_only:
codes: ["C50-C58", "D05-D07", "N70-N98", "O00-O99", "Z12.3"]
required_sex: F
veto: VETO-19
severity: HARD
```
#### 4.5 Implémenter dans `veto_engine.py`
```python
# VETO-18 : Incohérence âge
# VETO-19 : Incohérence sexe
def _check_demographic_rules(dossier: DossierMedical, config: dict) -> list[VetoIssue]:
issues = []
patient_age = dossier.patient.age_years if dossier.patient else None
patient_sex = dossier.patient.sexe if dossier.patient else None
for das in dossier.diagnostics_associes:
code = das.cim10_suggestion
if not code:
continue
# Vérifier règles d'âge
for rule_name, rule in config.get("age_rules", {}).items():
if _code_matches_range(code, rule["codes"]):
if patient_age:
if "min_age_years" in rule and patient_age < rule["min_age_years"]:
issues.append(VetoIssue(
veto=rule["veto"],
severity=rule["severity"],
where=f"DAS {code}",
message=f"Âge {patient_age} ans < minimum {rule['min_age_years']} ans"
))
# ... max_age_years similaire
# Vérifier règles de sexe
# ... similaire
return issues
```
**Effort** : 1-2 jours
**Impact** : Détection erreurs grossières (5-10% des dossiers)
---
### 🎯 Phase 3 : Cohérence Inter-Diagnostics (Priorité MOYENNE)
#### 4.6 Créer `config/diagnostic_conflicts.yaml`
```yaml
version: 1
# Diagnostics mutuellement exclusifs
mutual_exclusions:
- group: "Diabète type"
codes: ["E10", "E11", "E13", "E14"]
max_allowed: 1
veto: VETO-20
severity: HARD
message: "Plusieurs types de diabète codés simultanément"
- group: "Insuffisance cardiaque latéralité"
codes: ["I50.1", "I50.0"] # gauche + droite
suggest: "I50.9" # globale
veto: VETO-20
severity: MEDIUM
- group: "Hypertension vs Hypotension"
codes: ["I10", "I95"]
veto: VETO-20
severity: HARD
# Diagnostics incompatibles
incompatibilities:
- code: "E66" # Obésité
incompatible_with: ["E40", "E41", "E42", "E43", "E44", "E45", "E46"] # Dénutrition
veto: VETO-21
severity: HARD
- code: "Z94.0" # Rein transplanté
incompatible_with: ["N18.5"] # IRC terminale
veto: VETO-21
severity: MEDIUM
message: "Transplantation réussie incompatible avec IRC terminale active"
# Hiérarchies (code spécifique exclut code générique)
hierarchies:
- specific: "K81.0" # Cholécystite aiguë
excludes: "K81.9" # Cholécystite SAI
veto: VETO-22
severity: LOW
action: "remove_generic"
```
**Effort** : 2-3 jours
**Impact** : +15% qualité codage
---
### 🎯 Phase 4 : Validation Actes/Diagnostics (Priorité MOYENNE)
#### 4.7 Créer `config/procedure_diagnosis_rules.yaml`
```yaml
version: 1
# Acte chirurgical nécessite diagnostic justificatif
required_diagnosis:
- procedure_pattern: "HFCA" # Cholécystectomie
required_codes: ["K80", "K81", "K82"]
veto: VETO-23
severity: HARD
message: "Cholécystectomie sans pathologie vésiculaire"
- procedure_pattern: "HHFA" # Appendicectomie
required_codes: ["K35", "K36", "K37", "K38"]
veto: VETO-23
severity: HARD
- procedure_pattern: "DZQM" # Pose stent coronaire
required_codes: ["I20", "I21", "I22", "I23", "I24", "I25"]
veto: VETO-23
severity: HARD
- procedure_pattern: "JVJF" # Dialyse
required_codes: ["N17", "N18", "N19"]
veto: VETO-23
severity: HARD
# Diagnostic nécessite acte (si séjour chirurgical)
expected_procedure:
- diagnosis: "K35.8" # Appendicite aiguë
expected_pattern: "HHFA"
if_stay_type: "chirurgical"
veto: VETO-24
severity: MEDIUM
message: "Appendicite aiguë sans appendicectomie (séjour chirurgical)"
```
**Effort** : 3-4 jours
**Impact** : +20% détection incohérences actes
---
### 🎯 Phase 5 : Scoring et Suggestions (Priorité BASSE)
#### 4.8 Score de Qualité Global
```python
def calculate_quality_score(veto_report: VetoReport) -> dict:
"""Calcule un score de qualité 0-100."""
base_score = 100
penalties = {
"HARD": 20,
"MEDIUM": 10,
"LOW": 5
}
for issue in veto_report.issues:
base_score -= penalties.get(issue.severity, 0)
return {
"score": max(0, base_score),
"grade": _score_to_grade(base_score),
"confidence": _calculate_confidence(veto_report)
}
def _score_to_grade(score: int) -> str:
if score >= 90: return "A"
if score >= 75: return "B"
if score >= 60: return "C"
if score >= 40: return "D"
return "F"
```
#### 4.9 Suggestions Automatiques
```python
def generate_suggestions(dossier: DossierMedical, veto_report: VetoReport) -> list[Suggestion]:
"""Génère des suggestions de correction."""
suggestions = []
for das in dossier.diagnostics_associes:
if das.status == "ruled_out":
suggestions.append(Suggestion(
type="remove",
target=das.cim10_suggestion,
reason=das.ruled_out_reason,
confidence="high"
))
if das.cim10_suggestion and das.cim10_suggestion.endswith(".9"):
# Code imprécis, chercher plus spécifique
specific = _find_more_specific_code(das.texte, das.cim10_suggestion)
if specific:
suggestions.append(Suggestion(
type="upgrade",
from_code=das.cim10_suggestion,
to_code=specific,
reason="Code plus spécifique disponible",
confidence="medium"
))
return suggestions
```
**Effort** : 2-3 jours
**Impact** : Amélioration UX, aide à la décision
---
## 5. ROADMAP RECOMMANDÉE
### Sprint 1 (1 semaine) - Biologie Complète
- [ ] Étendre `bio_rules.yaml` (anémie, insuffisance rénale, diabète)
- [ ] Ajouter extraction glucose, HbA1c, calcium, chlore
- [ ] Étendre `lab_value_sanity.yaml`
- [ ] Tests sur 50 dossiers
### Sprint 2 (1 semaine) - Validation Démographique
- [ ] Créer `demographic_rules.yaml`
- [ ] Implémenter VETO-18 (âge) et VETO-19 (sexe)
- [ ] Tests sur dossiers pédiatriques et obstétriques
### Sprint 3 (1 semaine) - Cohérence Inter-Diagnostics
- [ ] Créer `diagnostic_conflicts.yaml`
- [ ] Implémenter VETO-20, 21, 22
- [ ] Tests sur dossiers complexes (polypathologie)
### Sprint 4 (1 semaine) - Validation Actes
- [ ] Créer `procedure_diagnosis_rules.yaml`
- [ ] Implémenter VETO-23, 24
- [ ] Tests sur dossiers chirurgicaux
### Sprint 5 (3 jours) - Scoring et Suggestions
- [ ] Implémenter score qualité global
- [ ] Système de suggestions automatiques
- [ ] Dashboard de métriques
---
## 6. MÉTRIQUES DE SUCCÈS
### Objectifs Quantitatifs
- **Taux de détection erreurs** : 60% → 90%
- **Faux positifs** : < 5%
- **Couverture règles biologiques** : 40% → 95%
- **Temps de traitement** : < 60s par dossier
- **Taux PASS** : 50% → 70% (avec règles strictes)
### Objectifs Qualitatifs
- Zéro erreur grossière non détectée (sexe, âge)
- Cohérence 100% diagnostics/actes chirurgicaux
- Traçabilité complète de chaque décision
- Documentation exhaustive des règles
---
## 7. CONCLUSION
### État Actuel : 8.5/10 (Révisé après analyse complète)
Le système est **remarquablement complet et professionnel**, avec :
- **Architecture solide** : 11 000 LOC bien structurées
- **Tests exhaustifs** : 6000 LOC de tests, couverture ~80%
- **Interface web complète** : Dashboard, validation, admin
- **Contrôle CPAM** : Génération contre-arguments automatique
- **Anonymisation robuste** : 3 phases (regex + NER + sweep)
- **RAG avancé** : 22k+ vecteurs, chunking intelligent
Les lacunes identifiées sont **des extensions naturelles** d'un système déjà très mature.
### Potentiel : 9.8/10 (Révisé)
Avec les améliorations proposées, le système peut devenir **la référence absolue** pour le codage PMSI, dépassant largement les solutions commerciales.
### Forces Uniques Confirmées
1. **Open source et auditable** : Traçabilité complète
2. **Configuration YAML** : Lisible par non-développeurs
3. **Interface de validation** : Annotations manuelles + métriques
4. **Contrôle CPAM intégré** : Unique sur le marché
5. **Extensibilité illimitée** : Architecture modulaire
6. **Tests exhaustifs** : 30 fichiers de tests
7. **Référentiels dynamiques** : Upload/indexation à chaud
### Priorités Immédiates (Inchangées)
1. **Règles biologiques complètes** (impact maximal)
2. **Validation démographique** (erreurs grossières)
3. **Cohérence inter-diagnostics** (qualité globale)
4. **Sécurité interface web** (production-ready)
### Recommandations Supplémentaires
#### Production-Ready Checklist
- [ ] Authentification/autorisation (OAuth2 + RBAC)
- [ ] HTTPS forcé + CSRF protection
- [ ] Logs structurés JSON + corrélation ID
- [ ] Métriques Prometheus + alerting
- [ ] Cache distribué Redis
- [ ] Rate limiting API
- [ ] Backup automatique référentiels
- [ ] Documentation API (OpenAPI/Swagger)
#### Optimisations Performance
- [ ] Batch processing parallèle (multiprocessing)
- [ ] Cache RAG en mémoire (LRU)
- [ ] Lazy loading modèles NER
- [ ] Compression JSON outputs
- [ ] Index FAISS optimisé (IVF)
#### Qualité Code
- [ ] Type hints complets (mypy strict)
- [ ] Linting (ruff/black)
- [ ] Pre-commit hooks
- [ ] CI/CD pipeline (GitHub Actions)
- [ ] Code coverage > 90%
---
## 8. MÉTRIQUES DE SUCCÈS (Révisées)
### Objectifs Quantitatifs
- **Taux de détection erreurs** : 70% → 95% (actuellement meilleur que prévu)
- **Faux positifs** : < 3% (actuellement ~5%)
- **Couverture règles biologiques** : 40% → 98%
- **Temps de traitement** : < 45s par dossier (actuellement ~50s)
- **Taux PASS** : 50% → 75% (avec règles strictes)
- **Uptime production** : > 99.5%
- **Temps réponse API** : < 2s (p95)
### Objectifs Qualitatifs
- Zéro erreur grossière non détectée (sexe, âge)
- Cohérence 100% diagnostics/actes chirurgicaux
- Traçabilité complète de chaque décision
- Documentation exhaustive des règles
- Interface utilisateur intuitive
- Support multi-établissements
---
## 9. COMPARAISON SOLUTIONS COMMERCIALES
### T2A v2 vs Solutions du Marché
| Critère | T2A v2 | Solutions Commerciales |
|---------|--------|------------------------|
| **Prix** | Open source | 50k-200k€/an |
| **Traçabilité** | Complète (JSON) | Boîte noire |
| **Extensibilité** | Illimitée (YAML) | Limitée |
| **Contrôle CPAM** | Intégré | Absent |
| **Validation manuelle** | Interface dédiée | Externe |
| **RAG/LLM** | Configurable | Propriétaire |
| **Tests** | 6000 LOC | Non accessible |
| **Anonymisation** | 3 phases robustes | Variable |
| **Export RUM** | Natif | Souvent payant |
| **Référentiels** | Upload dynamique | Mise à jour éditeur |
**Verdict** : T2A v2 est déjà **supérieur** sur 8/10 critères.
---
**Auteur** : Kiro AI Assistant
**Contact** : AWS Support
**Dernière mise à jour** : 2026-02-19 17:10

View File

@@ -1,254 +0,0 @@
#!/usr/bin/env python3
"""
Analyse structurelle detaillee des PDFs dans /home/dom/ai/t2a/input/
Utilise pdfplumber pour extraire texte, tableaux, headers et donnees personnelles.
"""
import pdfplumber
import os
import re
INPUT_DIR = "/home/dom/ai/t2a/input/"
REPORT_FILE = "/home/dom/ai/t2a/rapport_analyse_pdfs.md"
# Patterns pour detecter des donnees personnelles
PATTERNS = {
"telephone": re.compile(r'(?:\+?\d{1,3}[\s.-]?)?\(?\d{2,4}\)?[\s.-]?\d{2,4}[\s.-]?\d{2,4}[\s.-]?\d{0,4}'),
"email": re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'),
"code_postal": re.compile(r'\b\d{5}\b'),
"numero_dossier": re.compile(r'\b\d{7,10}\b'),
"date": re.compile(r'\b\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4}\b'),
"montant_euro": re.compile(r'\d+[\s.,]?\d*\s*[€]|\d+[\s.,]?\d*\s*EUR'),
}
def analyze_pdf(filepath):
"""Analyse complete d'un PDF."""
result = {
"filename": os.path.basename(filepath),
"filepath": filepath,
"pages": [],
"tables_all": [],
"full_text": "",
"headers_detected": [],
"personal_data": {},
"metadata": {},
}
with pdfplumber.open(filepath) as pdf:
result["metadata"] = {
"num_pages": len(pdf.pages),
"pdf_metadata": pdf.metadata if pdf.metadata else {},
}
for i, page in enumerate(pdf.pages):
page_info = {
"page_num": i + 1,
"width": page.width,
"height": page.height,
"text": "",
"tables": [],
"lines_count": 0,
"chars_count": 0,
"rects_count": 0,
"images_count": 0,
}
text = page.extract_text() or ""
page_info["text"] = text
page_info["lines_count"] = len(text.split('\n')) if text else 0
page_info["chars_count"] = len(page.chars) if page.chars else 0
page_info["rects_count"] = len(page.rects) if page.rects else 0
page_info["images_count"] = len(page.images) if page.images else 0
tables = page.extract_tables() or []
for t_idx, table in enumerate(tables):
table_info = {
"table_index": t_idx,
"page": i + 1,
"rows": len(table),
"cols": max(len(row) for row in table) if table else 0,
"data": table,
"header_row": table[0] if table else [],
}
page_info["tables"].append(table_info)
result["tables_all"].append(table_info)
result["pages"].append(page_info)
result["full_text"] += f"\n--- PAGE {i+1} ---\n{text}\n"
# Detecter les headers/sections
for line in result["full_text"].split('\n'):
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("--- PAGE"):
continue
if len(stripped) >= 3 and stripped == stripped.upper() and any(c.isalpha() for c in stripped):
result["headers_detected"].append(stripped)
elif len(stripped) < 80 and stripped[0].isupper() and ':' in stripped:
result["headers_detected"].append(stripped)
# Detecter les donnees personnelles
for pattern_name, pattern in PATTERNS.items():
matches = pattern.findall(result["full_text"])
if matches:
unique_matches = list(set(m.strip() for m in matches if len(m.strip()) > 3))
if unique_matches:
result["personal_data"][pattern_name] = unique_matches
return result
def format_table_for_md(table_data, max_rows=30):
"""Formate un tableau en Markdown."""
if not table_data:
return "_Tableau vide_"
lines = []
max_cols = max(len(row) for row in table_data)
normalized = []
for row in table_data[:max_rows]:
norm_row = []
for j in range(max_cols):
if j < len(row) and row[j] is not None:
cell = str(row[j]).replace('\n', ' ').replace('|', '/').strip()
norm_row.append(cell if cell else "")
else:
norm_row.append("")
normalized.append(norm_row)
lines.append("| " + " | ".join(normalized[0]) + " |")
lines.append("| " + " | ".join(["---"] * max_cols) + " |")
for row in normalized[1:]:
lines.append("| " + " | ".join(row) + " |")
if len(table_data) > max_rows:
lines.append(f"\n_... ({len(table_data) - max_rows} lignes supplementaires non affichees)_")
return "\n".join(lines)
def generate_report(analyses):
"""Genere le rapport Markdown."""
report = []
report.append("# Rapport d'analyse structurelle des PDFs")
report.append(f"\n**Repertoire analyse :** `{INPUT_DIR}`")
report.append(f"**Nombre de fichiers :** {len(analyses)}")
report.append("")
for idx, analysis in enumerate(analyses, 1):
report.append(f"\n{'='*80}")
report.append(f"## {idx}. {analysis['filename']}")
report.append(f"{'='*80}\n")
meta = analysis["metadata"]
report.append("### Metadonnees du PDF")
report.append(f"- **Nombre de pages :** {meta['num_pages']}")
if meta.get("pdf_metadata"):
for k, v in meta["pdf_metadata"].items():
if v:
report.append(f"- **{k} :** {v}")
report.append("")
report.append("### Structure par page")
for page in analysis["pages"]:
report.append(f"\n#### Page {page['page_num']}")
report.append(f"- **Dimensions :** {page['width']} x {page['height']} pts")
report.append(f"- **Lignes de texte :** {page['lines_count']}")
report.append(f"- **Caracteres (objets) :** {page['chars_count']}")
report.append(f"- **Rectangles :** {page['rects_count']}")
report.append(f"- **Images :** {page['images_count']}")
report.append(f"- **Tableaux detectes :** {len(page['tables'])}")
report.append("")
report.append("### Texte complet extrait")
report.append("```")
report.append(analysis["full_text"].strip())
report.append("```")
report.append("")
if analysis["tables_all"]:
report.append(f"### Tableaux detectes ({len(analysis['tables_all'])} au total)")
for t in analysis["tables_all"]:
report.append(f"\n#### Tableau {t['table_index']+1} (Page {t['page']}) - {t['rows']} lignes x {t['cols']} colonnes")
report.append("")
report.append(format_table_for_md(t["data"]))
report.append("")
else:
report.append("### Tableaux detectes")
report.append("_Aucun tableau detecte par pdfplumber._\n")
report.append("### Sections / Headers identifies")
if analysis["headers_detected"]:
seen = set()
for h in analysis["headers_detected"]:
if h not in seen:
report.append(f"- `{h}`")
seen.add(h)
else:
report.append("_Aucun header identifie._")
report.append("")
report.append("### Donnees personnelles detectees")
if analysis["personal_data"]:
for category, values in analysis["personal_data"].items():
report.append(f"\n**{category.replace('_', ' ').title()} :**")
for v in sorted(values):
report.append(f"- `{v}`")
else:
report.append("_Aucune donnee personnelle detectee._")
report.append("")
report.append(f"\n{'='*80}")
report.append("## Resume comparatif")
report.append(f"{'='*80}\n")
report.append("| Caracteristique | " + " | ".join(a["filename"] for a in analyses) + " |")
report.append("| --- | " + " | ".join(["---"] * len(analyses)) + " |")
report.append("| Pages | " + " | ".join(str(a["metadata"]["num_pages"]) for a in analyses) + " |")
report.append("| Tableaux | " + " | ".join(str(len(a["tables_all"])) for a in analyses) + " |")
report.append("| Headers | " + " | ".join(str(len(set(a["headers_detected"]))) for a in analyses) + " |")
report.append("| Longueur texte | " + " | ".join(str(len(a["full_text"])) + " chars" for a in analyses) + " |")
return "\n".join(report)
def main():
pdf_files = sorted([
os.path.join(INPUT_DIR, f)
for f in os.listdir(INPUT_DIR)
if f.lower().endswith('.pdf')
])
print(f"Fichiers PDF trouves : {len(pdf_files)}")
for f in pdf_files:
print(f" - {f}")
analyses = []
for filepath in pdf_files:
print(f"\nAnalyse de : {os.path.basename(filepath)} ...")
analysis = analyze_pdf(filepath)
analyses.append(analysis)
print(f" Pages: {analysis['metadata']['num_pages']}")
print(f" Tableaux: {len(analysis['tables_all'])}")
print(f" Headers: {len(set(analysis['headers_detected']))}")
print(f" Texte: {len(analysis['full_text'])} chars")
report = generate_report(analyses)
with open(REPORT_FILE, "w", encoding="utf-8") as f:
f.write(report)
print(f"\n{'='*60}")
print(f"Rapport ecrit dans : {REPORT_FILE}")
print(f"{'='*60}")
print("\n")
print(report)
if __name__ == "__main__":
main()

View File

@@ -1,472 +0,0 @@
#!/usr/bin/env python3
"""Comparaison qualité CPAM : multi-modèles sur 3 dossiers.
Génère la contre-argumentation CPAM avec plusieurs modèles et compare :
- Longueur et densité des arguments
- Présence des 3 axes (médical, asymétrie, réglementaire)
- Citations de preuves du dossier
- Références aux sources RAG
- Mots-clés d'asymétrie d'information
"""
import json
import re
import sys
import time
from pathlib import Path
import requests
STRUCTURED_DIR = Path("output/structured")
OLLAMA_URL = "http://localhost:11434"
MODELS = ["gemma3:12b-v2"] # 12b avec nouveau prompt nuancé
TIMEOUTS = {
"gemma3:12b": 120,
"gemma3:27b": 300,
"qwen3:14b": 180,
"mistral-small3.2:24b": 300,
}
# 3 dossiers variés : DP+DA, DAS long, DP court
TEST_DOSSIERS = [
"183_23087212", # DP+DA contestés
"228_23176885", # DAS seul, arg long (1921c)
"153_23102610", # DP seul, arg court
]
def load_dossier(dossier_name: str) -> dict | None:
dossier_dir = STRUCTURED_DIR / dossier_name
if not dossier_dir.exists():
return None
for f in list(dossier_dir.glob("*_fusionne_cim10.json")) + sorted(dossier_dir.glob("*_cim10.json")):
return json.loads(f.read_text())
return None
def build_prompt(data: dict, controle: dict, sources: list[dict]) -> str:
"""Reconstruit le prompt CPAM (identique au pipeline)."""
# Import du vrai builder pour garantir la cohérence
sys.path.insert(0, str(Path(__file__).parent))
from src.config import ControleCPAM, DossierMedical
from src.control.cpam_response import _build_cpam_prompt
dossier = DossierMedical.model_validate(data)
ctrl = ControleCPAM.model_validate(controle)
return _build_cpam_prompt(dossier, ctrl, sources)
# Modèles incompatibles avec format:json d'Ollama (mode thinking)
NO_FORMAT_JSON_MODELS = {"qwen3:14b", "qwen3:8b", "qwen3:32b"}
def _parse_json_from_text(raw: str) -> dict | None:
"""Parse du JSON depuis une réponse brute (avec ou sans markdown)."""
text = raw.strip()
# Retirer bloc markdown ```json ... ```
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl != -1:
text = text[first_nl + 1:]
if text.rstrip().endswith("```"):
text = text.rstrip()[:-3]
text = text.strip()
# Essayer tel quel
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Trouver le premier { ... dernier }
brace_start = text.find("{")
brace_end = text.rfind("}")
if brace_start != -1 and brace_end > brace_start:
try:
return json.loads(text[brace_start:brace_end + 1])
except json.JSONDecodeError:
pass
return None
def call_ollama(prompt: str, model: str) -> tuple[dict | None, float, str]:
"""Appelle Ollama et retourne (parsed_json, duration_s, raw_text)."""
timeout = TIMEOUTS.get(model, 180)
use_format_json = model not in NO_FORMAT_JSON_MODELS
# Pour Qwen3 : ajouter /no_think pour désactiver le mode thinking
actual_prompt = prompt
if model in NO_FORMAT_JSON_MODELS:
actual_prompt = prompt + "\n/no_think"
payload = {
"model": model,
"prompt": actual_prompt,
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 4000,
},
}
if use_format_json:
payload["format"] = "json"
t0 = time.time()
try:
response = requests.post(
f"{OLLAMA_URL}/api/generate",
json=payload,
timeout=timeout,
)
response.raise_for_status()
duration = time.time() - t0
raw = response.json().get("response", "")
parsed = _parse_json_from_text(raw)
return parsed, duration, raw
except json.JSONDecodeError:
duration = time.time() - t0
return None, duration, raw
except Exception as e:
duration = time.time() - t0
return None, duration, str(e)
def compute_metrics(parsed: dict | None) -> dict:
"""Calcule les métriques de qualité."""
if parsed is None:
return {"valid_json": False}
full_text = json.dumps(parsed, ensure_ascii=False)
# 3 axes présents ?
has_med = bool(parsed.get("contre_arguments_medicaux"))
has_asym = bool(parsed.get("contre_arguments_asymetrie"))
has_regl = bool(parsed.get("contre_arguments_reglementaires"))
has_3axes = has_med and has_asym and has_regl
# Longueurs par axe
len_med = len(str(parsed.get("contre_arguments_medicaux", "")))
len_asym = len(str(parsed.get("contre_arguments_asymetrie", "")))
len_regl = len(str(parsed.get("contre_arguments_reglementaires", "")))
len_total_args = len_med + len_asym + len_regl
# Fallback ancien format
if not has_3axes:
len_total_args = max(len_total_args, len(str(parsed.get("contre_arguments", ""))))
# Preuves du dossier
preuves = parsed.get("preuves_dossier", [])
n_preuves = len(preuves) if isinstance(preuves, list) else 0
# Références structurées
refs = parsed.get("references", [])
n_refs = len(refs) if isinstance(refs, list) else 0
# Références avec citation verbatim
n_refs_citation = 0
if isinstance(refs, list):
for r in refs:
if isinstance(r, dict) and r.get("citation") and len(str(r["citation"])) > 20:
n_refs_citation += 1
# Mots-clés d'asymétrie
full_lower = full_text.lower()
asymetrie_kw = [
"biologie", "imagerie", "scanner", "irm", "échographie",
"traitement", "médicament", "posologie",
"asymétrie", "non transmis", "n'avait pas", "n'a pas eu accès",
"imc", "antécédent", "crp", "hémoglobine", "leucocytes",
]
n_asymetrie = sum(1 for kw in asymetrie_kw if kw in full_lower)
# Points d'accord réels
accord = str(parsed.get("points_accord", ""))
accord_real = bool(accord) and accord.lower().strip() not in ("aucun", "aucun.", "n/a", "")
# Conclusion non vide
conclusion = str(parsed.get("conclusion", ""))
has_conclusion = len(conclusion) > 20
return {
"valid_json": True,
"has_3axes": has_3axes,
"len_med": len_med,
"len_asym": len_asym,
"len_regl": len_regl,
"len_total_args": len_total_args,
"n_preuves": n_preuves,
"n_refs": n_refs,
"n_refs_citation": n_refs_citation,
"n_asymetrie": n_asymetrie,
"accord_real": accord_real,
"has_conclusion": has_conclusion,
"total_len": len(full_text),
}
def model_key(model: str) -> str:
"""Clé courte pour un modèle (ex: 'gemma3:12b''gemma3_12b')."""
return model.replace(":", "_").replace(".", "_")
def print_multi_model(results: list[dict], models: list[str]):
"""Affiche la comparaison multi-modèles."""
W = 140
col_w = 18
print("\n" + "=" * W)
print(f"COMPARAISON CPAM : {' vs '.join(models)}")
print("=" * W)
metric_labels = [
("Durée (s)", "duration", True),
("3 axes", "has_3axes", False),
("Args médicaux", "len_med", False),
("Args asymétrie", "len_asym", False),
("Args réglementaires", "len_regl", False),
("Total args (car.)", "len_total_args", False),
("Preuves structurées", "n_preuves", False),
("Références RAG", "n_refs", False),
("Refs verbatim", "n_refs_citation", False),
("Mots-clés asymétrie", "n_asymetrie", False),
("Points d'accord", "accord_real", False),
("Conclusion étayée", "has_conclusion", False),
("Longueur totale", "total_len", False),
]
for r in results:
print(f"\n{'' * W}")
print(f" {r['dossier']} / OGC {r['ogc']}{r['titre']}")
print(f" Argument CPAM : {r['arg_len']} car. | Prompt : {r['prompt_len']} car.")
print(f"{'' * W}")
# Vérifier validité
all_valid = True
for m in models:
mk = model_key(m)
metrics = r.get(f"metrics_{mk}", {})
if not metrics.get("valid_json", False):
dur = r.get(f"duration_{mk}", 0)
print(f" {m} : JSON INVALIDE ({dur:.1f}s)")
all_valid = False
if not all_valid:
continue
# Header
header = f" {'Métrique':<25}"
for m in models:
short = m.split(":")[0][:6] + ":" + m.split(":")[-1] if ":" in m else m[:col_w]
header += f" {short:>{col_w}}"
print(header)
print(f" {'' * (25 + (col_w + 1) * len(models))}")
for label, key, is_duration in metric_labels:
row = f" {label:<25}"
for m in models:
mk = model_key(m)
if is_duration:
val = r.get(f"duration_{mk}", 0)
row += f" {val:>{col_w - 1}.1f}s"
else:
metrics = r.get(f"metrics_{mk}", {})
val = metrics.get(key, 0)
if isinstance(val, bool):
row += f" {'Oui' if val else 'Non':>{col_w}}"
else:
row += f" {val:>{col_w}}"
print(row)
# Synthèse globale
print(f"\n{'=' * W}")
print("SYNTHÈSE GLOBALE")
print(f"{'=' * W}")
# Filtrer les résultats valides pour tous les modèles
valid = []
for r in results:
all_ok = all(r.get(f"metrics_{model_key(m)}", {}).get("valid_json", False) for m in models)
if all_ok:
valid.append(r)
if not valid:
print(" Aucun résultat valide pour tous les modèles.")
return
n = len(valid)
print(f" Dossiers comparés : {n}")
# Header synthèse
header = f"\n {'Métrique':<25}"
for m in models:
short = m.split(":")[0][:6] + ":" + m.split(":")[-1] if ":" in m else m[:col_w]
header += f" {short:>{col_w}}"
header += f" {'Meilleur':>{col_w}}"
print(header)
print(f" {'' * (25 + (col_w + 1) * (len(models) + 1))}")
# Durée
row = f" {'Durée moy. (s)':<25}"
dur_vals = {}
for m in models:
mk = model_key(m)
avg_dur = sum(r.get(f"duration_{mk}", 0) for r in valid) / n
dur_vals[m] = avg_dur
row += f" {avg_dur:>{col_w - 1}.1f}s"
best = min(dur_vals, key=dur_vals.get)
row += f" {best:>{col_w}}"
print(row)
# Métriques (higher is better)
for label, key in [
("Total args (car.)", "len_total_args"),
("Preuves structurées", "n_preuves"),
("Références RAG", "n_refs"),
("Refs verbatim", "n_refs_citation"),
("Mots-clés asymétrie", "n_asymetrie"),
]:
row = f" {label:<25}"
vals = {}
for m in models:
mk = model_key(m)
avg_val = sum(r.get(f"metrics_{mk}", {}).get(key, 0) for r in valid) / n
vals[m] = avg_val
row += f" {avg_val:>{col_w}.1f}"
best = max(vals, key=vals.get)
row += f" {best:>{col_w}}"
print(row)
# Booléens (count True)
for label, key in [
("3 axes", "has_3axes"),
("Points d'accord", "accord_real"),
]:
row = f" {label:<25}"
vals = {}
for m in models:
mk = model_key(m)
cnt = sum(1 for r in valid if r.get(f"metrics_{mk}", {}).get(key, False))
vals[m] = cnt
row += f" {f'{cnt}/{n}':>{col_w}}"
best = max(vals, key=vals.get)
row += f" {best:>{col_w}}"
print(row)
# Durées totales
print()
fastest = min(models, key=lambda m: sum(r.get(f"duration_{model_key(m)}", 0) for r in valid))
fastest_dur = sum(r.get(f"duration_{model_key(fastest)}", 0) for r in valid)
for m in models:
mk = model_key(m)
total = sum(r.get(f"duration_{mk}", 0) for r in valid)
ratio = total / fastest_dur if fastest_dur > 0 else 0
print(f" {m:<25} total={total:.0f}s (x{ratio:.1f})")
print()
def main():
# Charger les résultats précédents (all_models)
prev_file = Path("output/compare_cpam_all_models.json")
prev_data = {}
if prev_file.exists():
for entry in json.loads(prev_file.read_text()):
prev_data[entry["dossier"]] = entry
# On compare l'ancien 12b (ancien prompt) vs le nouveau 12b-v2 (nouveau prompt nuancé)
# + 27b comme référence nuance
ref_models = ["gemma3:12b", "gemma3:27b"]
all_models = ref_models + MODELS
print("=" * 100)
print(f"Comparaison qualité CPAM : {' / '.join(all_models)}")
print(f"Dossiers : {', '.join(TEST_DOSSIERS)}")
print(f"Test : gemma3:12b avec NOUVEAU prompt nuancé (v2)")
print(f"Résultats précédents : {'oui' if prev_data else 'non'}")
print("=" * 100)
results = []
for dossier_name in TEST_DOSSIERS:
data = load_dossier(dossier_name)
if not data:
print(f"\nERREUR : {dossier_name} non trouvé")
continue
controles = [c for c in data.get("controles_cpam", []) if c.get("arg_ucr")]
if not controles:
print(f"\nERREUR : {dossier_name} — pas de contrôle CPAM")
continue
controle = controles[0]
sources = [
{
"document": s.get("document", ""),
"page": s.get("page"),
"code": s.get("code"),
"extrait": s.get("extrait", ""),
}
for s in controle.get("sources_reponse", [])
]
prompt = build_prompt(data, controle, sources)
print(f"\n[{dossier_name}] OGC {controle['numero_ogc']}{controle.get('titre', '')}")
print(f" Prompt : {len(prompt)} car. | Arg CPAM : {len(controle.get('arg_ucr', ''))} car.")
result = {
"dossier": dossier_name,
"ogc": controle["numero_ogc"],
"titre": controle.get("titre", ""),
"arg_len": len(controle.get("arg_ucr", "")),
"prompt_len": len(prompt),
}
# Réutiliser les résultats précédents pour les modèles de référence
prev = prev_data.get(dossier_name)
if prev:
for old_model in ref_models:
mk = model_key(old_model)
result[f"duration_{mk}"] = prev.get(f"duration_{mk}", 0)
result[f"metrics_{mk}"] = prev.get(f"metrics_{mk}", {})
result[f"response_{mk}"] = prev.get(f"response_{mk}")
dur = result[f"duration_{mk}"]
is_valid = result[f"metrics_{mk}"].get("valid_json", False)
print(f"{old_model} ... (précédent) {'OK' if is_valid else 'FAIL'} ({dur:.1f}s)")
# Tester le 12b-v2 (nouveau prompt) — appelle gemma3:12b avec le prompt modifié
for model_label in MODELS:
mk = model_key(model_label)
actual_model = "gemma3:12b" # même modèle, nouveau prompt
print(f"{model_label} (nouveau prompt) ...", end=" ", flush=True)
parsed, dur, raw = call_ollama(prompt, actual_model)
status = "OK" if parsed else "FAIL"
print(f"{status} ({dur:.1f}s)")
result[f"duration_{mk}"] = dur
result[f"metrics_{mk}"] = compute_metrics(parsed)
result[f"response_{mk}"] = parsed
results.append(result)
# Affichage
print_multi_model(results, all_models)
# Sauvegarde
output_file = Path("output/compare_cpam_prompt_v2.json")
output_file.parent.mkdir(parents=True, exist_ok=True)
save_data = []
for r in results:
entry = {
"dossier": r["dossier"],
"ogc": r["ogc"],
"titre": r["titre"],
}
for m in all_models:
mk = model_key(m)
entry[f"duration_{mk}"] = r.get(f"duration_{mk}", 0)
entry[f"metrics_{mk}"] = r.get(f"metrics_{mk}", {})
entry[f"response_{mk}"] = r.get(f"response_{mk}")
save_data.append(entry)
output_file.write_text(json.dumps(save_data, ensure_ascii=False, indent=2))
print(f"Résultats sauvegardés dans {output_file}")
if __name__ == "__main__":
main()

Binary file not shown.

View File

@@ -1,738 +0,0 @@
#!/usr/bin/env python3
"""
extract_t2a_llm.py — Extracteur T2A généraliste via OCR + LLM (Ollama)
Entrée : PDF (scanné ou natif) de document T2A (décision UCR, notification CPAM, rapport ARS…)
Sortie : Fichier Excel (.xlsx) avec les données structurées
Architecture :
PDF → OCR/texte natif → Détection type (1 appel LLM) → Extraction bloc par bloc (N appels LLM) → Excel
Usage :
python extract_t2a_llm.py FICHIER.pdf [--model gemma3:27b-it-qat] [--output out.xlsx] [--verbose]
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from pathlib import Path
import pymupdf
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
# ---------------------------------------------------------------------------
# 0. Normalisation texte OCR
# ---------------------------------------------------------------------------
def normalize_text(text: str) -> str:
"""Normalise les apostrophes, guillemets et espaces issus de l'OCR."""
text = text.replace("\u2018", "'").replace("\u2019", "'")
text = text.replace("\u201C", '"').replace("\u201D", '"')
text = text.replace("\u00AB", '"').replace("\u00BB", '"')
text = text.replace("''", "'")
text = text.replace("\u00A0", " ").replace("\u202F", " ")
text = re.sub(r"\bF'UCR\b", "l'UCR", text)
text = re.sub(r"\bl''UCR\b", "l'UCR", text)
return text
# ---------------------------------------------------------------------------
# 1. OCR / Extraction texte (docTR — deep learning, GPU)
# ---------------------------------------------------------------------------
_doctr_model = None
def _get_doctr_model():
"""Lazy-init du modèle docTR (chargé une seule fois, GPU si VRAM libre, sinon CPU)."""
global _doctr_model
if _doctr_model is not None:
return _doctr_model
from doctr.models import ocr_predictor
print(" Chargement du modèle docTR (première utilisation)...")
t0 = time.time()
_doctr_model = ocr_predictor(
det_arch="db_resnet50",
reco_arch="crnn_vgg16_bn",
pretrained=True,
)
# Déplacer sur GPU si disponible et assez de VRAM libre
try:
import torch
if torch.cuda.is_available():
free_vram = torch.cuda.mem_get_info()[0] / (1024 ** 3)
if free_vram > 1.0:
try:
_doctr_model = _doctr_model.cuda()
print(f" docTR sur GPU ({torch.cuda.get_device_name(0)}, "
f"{free_vram:.1f} Go libres) — {time.time() - t0:.1f}s")
except torch.cuda.OutOfMemoryError:
_doctr_model = _doctr_model.cpu()
torch.cuda.empty_cache()
print(f" GPU VRAM insuffisante, docTR sur CPU — {time.time() - t0:.1f}s")
else:
print(f" GPU VRAM trop basse ({free_vram:.1f} Go libres, Ollama ?), "
f"docTR sur CPU — {time.time() - t0:.1f}s")
else:
print(f" docTR sur CPU — {time.time() - t0:.1f}s")
except ImportError:
print(f" docTR sur CPU — {time.time() - t0:.1f}s")
return _doctr_model
def ocr_pdf(pdf_path: str, dpi: int = 300) -> str:
"""Extrait le texte du PDF : texte natif si disponible, sinon OCR docTR (GPU)."""
doc = pymupdf.open(pdf_path)
total = len(doc)
# Détection : texte natif vs scanné (sur la première page)
first_page_text = doc[0].get_text() if total > 0 else ""
is_native = len(first_page_text.strip()) > 100
if is_native:
print(" Mode : extraction texte natif (pymupdf)")
full_text = []
for i, page in enumerate(doc):
print(f" Extraction page {i+1}/{total}...", end="\r")
full_text.append(page.get_text())
print(f" Extraction terminée : {total} pages. ")
return normalize_text("\n\n".join(full_text))
# OCR docTR
print(" Mode : OCR docTR (deep learning, GPU)")
from doctr.io import DocumentFile
model = _get_doctr_model()
print(f" Lecture du PDF ({total} pages)...")
doc_pages = DocumentFile.from_pdf(pdf_path)
print(f" OCR en cours sur {len(doc_pages)} pages...")
t0 = time.time()
result = model(doc_pages)
elapsed = time.time() - t0
print(f" OCR terminé : {total} pages en {elapsed:.1f}s "
f"({elapsed/total:.1f}s/page)")
full_text = result.render()
return normalize_text(full_text)
# ---------------------------------------------------------------------------
# 2. Client Ollama
# ---------------------------------------------------------------------------
NO_FORMAT_JSON_PREFIXES = ("qwen3", "qwen2.5")
OLLAMA_URL = "http://localhost:11434"
def parse_json_response(raw: str) -> dict | list | None:
"""Parse une réponse JSON, en gérant les blocs markdown et le texte parasite."""
text = raw.strip()
# Supprimer les blocs <think>...</think> (Qwen3)
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
# Supprimer les blocs markdown ```json ... ```
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl != -1:
text = text[first_nl + 1:]
if text.rstrip().endswith("```"):
text = text.rstrip()[:-3]
text = text.strip()
# Tentative directe
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Extraire le premier objet ou tableau JSON
for start_char, end_char in [("{", "}"), ("[", "]")]:
start = text.find(start_char)
if start == -1:
continue
depth = 0
for i in range(start, len(text)):
if text[i] == start_char:
depth += 1
elif text[i] == end_char:
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
break
return None
def call_ollama(
prompt: str,
model: str,
temperature: float = 0.1,
max_tokens: int = 4000,
timeout: int = 120,
verbose: bool = False,
) -> dict | list | None:
"""Appelle Ollama. Utilise l'API chat avec think=false pour Qwen3."""
is_qwen = any(model.startswith(p) for p in NO_FORMAT_JSON_PREFIXES)
if is_qwen:
# API chat + think:false pour Qwen3 (pas de format JSON natif)
endpoint = f"{OLLAMA_URL}/api/chat"
body = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"think": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
}
else:
# API generate + format JSON natif pour les autres modèles
endpoint = f"{OLLAMA_URL}/api/generate"
body = {
"model": model,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
}
if verbose:
print(f"\n--- PROMPT ({model}) ---")
print(prompt[:500] + ("..." if len(prompt) > 500 else ""))
print("--- FIN PROMPT ---\n")
for attempt in range(2):
try:
t0 = time.time()
response = requests.post(endpoint, json=body, timeout=timeout)
elapsed = time.time() - t0
response.raise_for_status()
data = response.json()
# Extraire le texte de la réponse selon l'API utilisée
if is_qwen:
raw = data.get("message", {}).get("content", "")
else:
raw = data.get("response", "")
if verbose:
print(f"--- RÉPONSE ({elapsed:.1f}s) ---")
print(raw[:500] + ("..." if len(raw) > 500 else ""))
print("--- FIN RÉPONSE ---\n")
result = parse_json_response(raw)
if result is not None:
return result
if attempt == 0:
print(f" [warn] JSON invalide, retry... (raw: {raw[:100]})")
except requests.ConnectionError:
print("[ERREUR] Ollama non disponible sur localhost:11434")
sys.exit(1)
except requests.Timeout:
print(f" [warn] Timeout ({timeout}s) — tentative {attempt + 1}/2")
if attempt == 1:
return None
except requests.RequestException as e:
print(f" [warn] Erreur requête : {e}")
return None
return None
# ---------------------------------------------------------------------------
# 3. Phase 1 — Détection du type de document
# ---------------------------------------------------------------------------
PROMPT_PHASE1 = """\
Tu es un expert en codage PMSI et contrôle T2A. Analyse le début de ce document et identifie sa structure.
TEXTE (début du document) :
---
{text_preview}
---
Réponds UNIQUEMENT en JSON avec ces champs :
{{
"type_document": "decision_ucr | notification_cpam | rapport_controle | autre",
"organisme": "nom de l'organisme (CPAM, UCR, ARS...)",
"date_document": "date au format YYYY-MM-DD si trouvée, sinon vide",
"objet": "résumé en une phrase de l'objet du document",
"separateur_blocs": "regex Python pour séparer les dossiers individuels (ex: OGC \\\\d+ :)",
"colonnes_detectees": ["liste des champs/colonnes détectés dans la structure"]
}}
IMPORTANT :
- Le separateur_blocs doit être un regex Python valide
- Il doit capturer le motif qui sépare chaque dossier/cas individuel
- Si c'est un document UCR, le séparateur est typiquement "OGC \\\\d+ :"
- Si tu ne trouves pas de séparateur clair, mets une chaîne vide ""
"""
def detect_document_type(full_text: str, model: str, timeout: int, verbose: bool) -> dict:
"""Phase 1 : détection du type de document via LLM."""
preview = full_text[:3000]
prompt = PROMPT_PHASE1.format(text_preview=preview)
result = call_ollama(prompt, model=model, timeout=timeout, verbose=verbose)
if result is None:
print(" [warn] Phase 1 : détection échouée, utilisation des valeurs par défaut")
return {
"type_document": "autre",
"organisme": "",
"date_document": "",
"objet": "",
"separateur_blocs": "",
"colonnes_detectees": [],
}
return result
# ---------------------------------------------------------------------------
# 4. Découpage en blocs
# ---------------------------------------------------------------------------
def split_into_blocks(full_text: str, separator_pattern: str) -> list[str]:
"""Découpe le texte en blocs logiques (dossiers individuels)."""
blocks = []
# Tentative avec le séparateur détecté par le LLM
if separator_pattern:
try:
regex = re.compile(separator_pattern, re.MULTILINE | re.IGNORECASE)
parts = regex.split(full_text)
# Recombiner : le séparateur fait partie du bloc suivant
matches = list(regex.finditer(full_text))
if len(matches) >= 3:
for i, match in enumerate(matches):
start = match.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(full_text)
block = full_text[start:end].strip()
if block:
blocks.append(block)
print(f" Découpage par séparateur : {len(blocks)} blocs trouvés")
return blocks
else:
print(f" [warn] Séparateur '{separator_pattern}' → seulement {len(matches)} blocs, fallback")
except re.error as e:
print(f" [warn] Regex invalide '{separator_pattern}' : {e}, fallback")
# Fallback : découpage par taille (~6000 chars, chevauchement 500)
chunk_size = 6000
overlap = 500
text_len = len(full_text)
if text_len <= chunk_size:
return [full_text]
pos = 0
while pos < text_len:
end = min(pos + chunk_size, text_len)
# Essayer de couper à une fin de ligne
if end < text_len:
newline_pos = full_text.rfind("\n", pos + chunk_size - 200, end + 200)
if newline_pos > pos:
end = newline_pos
blocks.append(full_text[pos:end].strip())
pos = end - overlap if end < text_len else text_len
print(f" Découpage par taille : {len(blocks)} blocs ({chunk_size} chars, chevauchement {overlap})")
return blocks
# ---------------------------------------------------------------------------
# 5. Phase 2 — Extraction bloc par bloc
# ---------------------------------------------------------------------------
SCHEMA_FIELDS = """\
Champs à extraire (JSON) — remplis chaque champ ou laisse une chaîne vide "" si non trouvé :
- "champ": numéro de champ (entier, 0 si non trouvé)
- "ogc": numéro OGC / numéro de dossier (entier, 0 si non trouvé)
- "type_desaccord": type de désaccord — "DP", "DAS", "DP + DAS", ou ""
- "code_etablissement": code(s) CIM-10 de l'établissement (ex: "G40.0 + F10.2")
- "libelle_etablissement": libellé(s) correspondant aux codes établissement
- "code_controleurs": code(s) CIM-10 des contrôleurs (ou "non repris")
- "libelle_controleurs": libellé(s) correspondant aux codes contrôleurs
- "codes_retenus_final": code(s) finalement retenus par l'UCR/la décision
- "decision": classification — "Favorable établissement", "Défavorable établissement", "Mixte", ou "Indéterminé"
* "Favorable établissement" = la décision retient l'avis/le codage de l'établissement
* "Défavorable établissement" = la décision confirme l'avis des contrôleurs
* "Mixte" = partiellement favorable et partiellement défavorable
* "Indéterminé" = impossible à classifier clairement
- "texte_decision_complet": texte intégral de la décision/conclusion
- "resume_motif": résumé en 1-2 phrases du motif de la décision
- "regles_citees": règles de codage citées (ex: "T3, T7")
- "references_guide": références documentaires (guide méthodologique, fascicules ATIH, avis Agora…)
- "ghm_mentionne": tous les GHM mentionnés (ex: "05M09 / 05M092")
- "ghs_mentionne": tous les GHS mentionnés
- "ghm_final": le GHM final retenu
- "ghs_final": le GHS final retenu
- "impact_groupage": impact sur le groupage — "Mieux valorisé", "Pas de changement", ou ""
"""
PROMPT_PHASE2 = """\
Tu es un expert en codage PMSI et contrôle T2A.
CONTEXTE DOCUMENT :
- Type : {type_document}
- Organisme : {organisme}
- Objet : {objet}
BLOC DE TEXTE À ANALYSER :
---
{block_text}
---
CONSIGNES :
1. Extrais les informations de chaque dossier/cas présent dans ce bloc.
2. Si le bloc contient UN SEUL dossier, retourne un objet JSON.
3. Si le bloc contient PLUSIEURS dossiers, retourne une LISTE d'objets JSON.
4. Si le bloc ne contient aucun dossier exploitable (en-tête, pied de page, texte administratif sans cas individuel), retourne : {{"skip": true}}
{schema}
IMPORTANT :
- Sois précis sur les codes CIM-10 (format X00.0)
- Pour la décision, analyse attentivement le texte : "retient l'avis de l'établissement" = Favorable, "confirme l'avis des contrôleurs" = Défavorable
- Ne laisse aucun champ sans clé, utilise "" pour les valeurs inconnues
- Retourne UNIQUEMENT du JSON valide, sans texte avant ou après
"""
def extract_block(
block_text: str,
doc_info: dict,
model: str,
timeout: int,
verbose: bool,
) -> list[dict]:
"""Extrait les données d'un bloc via LLM. Retourne une liste de dossiers."""
prompt = PROMPT_PHASE2.format(
type_document=doc_info.get("type_document", "autre"),
organisme=doc_info.get("organisme", ""),
objet=doc_info.get("objet", ""),
block_text=block_text[:8000], # Limiter la taille
schema=SCHEMA_FIELDS,
)
result = call_ollama(prompt, model=model, max_tokens=4000, timeout=timeout, verbose=verbose)
if result is None:
return []
# Skip
if isinstance(result, dict) and result.get("skip"):
return []
# Normaliser en liste
if isinstance(result, dict):
items = [result]
elif isinstance(result, list):
items = [r for r in result if isinstance(r, dict) and not r.get("skip")]
else:
return []
return items
# ---------------------------------------------------------------------------
# 6. Fusion et dédoublonnage
# ---------------------------------------------------------------------------
# Mapping clés LLM (snake_case) → clés Excel (TitleCase)
KEY_MAP = {
"champ": "Champ",
"ogc": "OGC",
"type_desaccord": "Type_desaccord",
"code_etablissement": "Code_etablissement",
"libelle_etablissement": "Libelle_etablissement",
"code_controleurs": "Code_controleurs",
"libelle_controleurs": "Libelle_controleurs",
"codes_retenus_final": "Codes_retenus_final",
"decision": "Decision",
"texte_decision_complet": "Texte_decision_complet",
"resume_motif": "Resume_motif",
"regles_citees": "Regles_citees",
"references_guide": "References_guide",
"ghm_mentionne": "GHM_mentionne",
"ghs_mentionne": "GHS_mentionne",
"ghm_final": "GHM_final",
"ghs_final": "GHS_final",
"impact_groupage": "Impact_groupage",
}
def normalize_row(raw: dict) -> dict:
"""Convertit les clés LLM en clés Excel et normalise les types."""
row = {}
for llm_key, excel_key in KEY_MAP.items():
val = raw.get(llm_key, raw.get(excel_key, ""))
# Convertir en int pour Champ et OGC
if excel_key in ("Champ", "OGC"):
try:
val = int(val) if val else 0
except (ValueError, TypeError):
val = 0
elif not isinstance(val, str):
val = str(val) if val is not None else ""
row[excel_key] = val
return row
def merge_and_deduplicate(all_items: list[dict]) -> list[dict]:
"""Fusionne, déduplique par OGC, et trie les résultats."""
rows = [normalize_row(item) for item in all_items]
# Filtrer les lignes sans contenu utile
rows = [r for r in rows if r["OGC"] > 0 or r["Code_etablissement"] or r["Decision"]]
# Dédoublonnage par OGC (garder la version la plus complète)
seen: dict[int, dict] = {}
deduped: list[dict] = []
for r in rows:
key = r["OGC"]
if key == 0:
deduped.append(r)
continue
if key in seen:
old = seen[key]
old_score = sum(1 for v in old.values() if v and v != 0)
new_score = sum(1 for v in r.values() if v and v != 0)
if new_score > old_score:
deduped = [x for x in deduped if x["OGC"] != key]
deduped.append(r)
seen[key] = r
else:
seen[key] = r
deduped.append(r)
deduped.sort(key=lambda r: (r["Champ"], r["OGC"]))
return deduped
# ---------------------------------------------------------------------------
# 7. Export Excel
# ---------------------------------------------------------------------------
HEADERS = [
"Champ", "OGC", "Type_desaccord",
"Code_etablissement", "Libelle_etablissement",
"Code_controleurs", "Libelle_controleurs",
"Codes_retenus_final",
"Decision", "Texte_decision_complet", "Resume_motif",
"Regles_citees", "References_guide",
"GHM_mentionne", "GHS_mentionne", "GHM_final", "GHS_final",
"Impact_groupage",
]
HEADER_LABELS = [
"Champ", "N° OGC", "Type désaccord",
"Code(s) Établissement", "Libellé Établissement",
"Code(s) Contrôleurs", "Libellé Contrôleurs",
"Code(s) retenus (final)",
"Décision UCR", "Texte décision complet", "Résumé du motif",
"Règles codage citées", "Références (guide, fascicules, avis)",
"GHM mentionné(s)", "GHS mentionné(s)", "GHM final", "GHS final",
"Impact groupage",
]
def write_excel(rows: list[dict], output_path: str):
"""Écrit les résultats dans un fichier Excel (feuille unique)."""
wb = Workbook()
ws = wb.active
ws.title = "Décisions UCR"
# Styles
header_font = Font(bold=True, color="FFFFFF", size=11)
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin"), right=Side(style="thin"),
top=Side(style="thin"), bottom=Side(style="thin"),
)
fav_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
defav_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
mixte_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
# En-têtes
for col, label in enumerate(HEADER_LABELS, 1):
cell = ws.cell(row=1, column=col, value=label)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# Données
for row_idx, data in enumerate(rows, 2):
for col_idx, key in enumerate(HEADERS, 1):
val = data.get(key, "")
cell = ws.cell(row=row_idx, column=col_idx, value=val)
cell.border = thin_border
cell.alignment = Alignment(vertical="top", wrap_text=True)
# Colorer la colonne Décision
dec_col = HEADERS.index("Decision") + 1
decision_cell = ws.cell(row=row_idx, column=dec_col)
dv = str(decision_cell.value or "")
if "Favorable" in dv and "Défavorable" not in dv:
decision_cell.fill = fav_fill
elif "Défavorable" in dv:
decision_cell.fill = defav_fill
elif "Mixte" in dv:
decision_cell.fill = mixte_fill
# Largeurs de colonnes
col_widths = {
"Champ": 8, "OGC": 8, "Type_desaccord": 14,
"Code_etablissement": 22, "Libelle_etablissement": 40,
"Code_controleurs": 22, "Libelle_controleurs": 40,
"Codes_retenus_final": 22,
"Decision": 24, "Texte_decision_complet": 80,
"Resume_motif": 60,
"Regles_citees": 16, "References_guide": 50,
"GHM_mentionne": 16, "GHS_mentionne": 16,
"GHM_final": 12, "GHS_final": 10,
"Impact_groupage": 20,
}
for i, key in enumerate(HEADERS, 1):
ws.column_dimensions[ws.cell(row=1, column=i).column_letter].width = col_widths.get(key, 15)
# Filtre automatique + freeze
last_col_letter = ws.cell(row=1, column=len(HEADERS)).column_letter
ws.auto_filter.ref = f"A1:{last_col_letter}{len(rows)+1}"
ws.freeze_panes = "A2"
wb.save(output_path)
print(f"Excel enregistré : {output_path}")
# ---------------------------------------------------------------------------
# 8. CLI / Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Extracteur T2A généraliste via OCR + LLM (Ollama)",
)
parser.add_argument("pdf", help="Fichier PDF à traiter")
parser.add_argument("--model", default="gemma3:27b-it-qat",
help="Modèle Ollama (défaut: gemma3:27b-it-qat)")
parser.add_argument("--timeout", type=int, default=120,
help="Timeout par appel LLM en secondes (défaut: 120)")
parser.add_argument("--output", default=None,
help="Fichier Excel de sortie (défaut: <nom>_llm.xlsx)")
parser.add_argument("--dpi", type=int, default=300,
help="Résolution OCR (défaut: 300)")
parser.add_argument("--no-cache", action="store_true",
help="Désactiver le cache texte OCR")
parser.add_argument("--verbose", action="store_true",
help="Afficher les prompts/réponses LLM")
args = parser.parse_args()
pdf_path = args.pdf
if not Path(pdf_path).exists():
print(f"[ERREUR] Fichier non trouvé : {pdf_path}")
sys.exit(1)
output_path = args.output or str(Path(pdf_path).with_name(
Path(pdf_path).stem + "_llm.xlsx"
))
print(f"Fichier PDF : {pdf_path}")
print(f"Modèle LLM : {args.model}")
print(f"Sortie Excel : {output_path}")
print()
# --- Étape 1 : OCR ---
txt_cache = Path(pdf_path).with_suffix(".txt")
if txt_cache.exists() and not args.no_cache:
print("Étape 1/4 : Chargement du texte depuis le cache...")
full_text = txt_cache.read_text(encoding="utf-8")
full_text = normalize_text(full_text)
print(f" {len(full_text)} caractères chargés depuis {txt_cache}")
else:
print("Étape 1/4 : OCR du document...")
full_text = ocr_pdf(pdf_path, dpi=args.dpi)
if not args.no_cache:
txt_cache.write_text(full_text, encoding="utf-8")
print(f" Cache texte sauvegardé : {txt_cache}")
print(f" Longueur du texte : {len(full_text)} caractères")
print()
# --- Étape 2 : Détection du type de document ---
print("Étape 2/4 : Détection du type de document...")
t0 = time.time()
doc_info = detect_document_type(full_text, model=args.model, timeout=args.timeout, verbose=args.verbose)
print(f" Type : {doc_info.get('type_document', '?')}")
print(f" Organisme : {doc_info.get('organisme', '?')}")
print(f" Objet : {doc_info.get('objet', '?')}")
print(f" Séparateur: {doc_info.get('separateur_blocs', '(aucun)')}")
print(f" Colonnes : {doc_info.get('colonnes_detectees', [])}")
print(f" ({time.time() - t0:.1f}s)")
print()
# --- Étape 3 : Découpage et extraction ---
print("Étape 3/4 : Découpage en blocs et extraction LLM...")
separator = doc_info.get("separateur_blocs", "")
blocks = split_into_blocks(full_text, separator)
print(f" {len(blocks)} blocs à traiter")
all_items = []
t0 = time.time()
for i, block in enumerate(blocks):
print(f" Bloc {i+1}/{len(blocks)}...", end="\r")
items = extract_block(block, doc_info, model=args.model, timeout=args.timeout, verbose=args.verbose)
all_items.extend(items)
# Progression
elapsed = time.time() - t0
avg = elapsed / (i + 1)
remaining = avg * (len(blocks) - i - 1)
print(f" Bloc {i+1}/{len(blocks)}{len(items)} dossier(s) "
f"[{elapsed:.0f}s écoulé, ~{remaining:.0f}s restant] ")
total_elapsed = time.time() - t0
print(f" Extraction terminée : {len(all_items)} dossiers bruts en {total_elapsed:.0f}s")
print()
# --- Étape 4 : Fusion et export ---
print("Étape 4/4 : Fusion, dédoublonnage et export Excel...")
rows = merge_and_deduplicate(all_items)
print(f" {len(rows)} dossiers après dédoublonnage")
# Statistiques
fav = sum(1 for r in rows if "Favorable" in r.get("Decision", "") and "Défavorable" not in r.get("Decision", ""))
defav = sum(1 for r in rows if "Défavorable" in r.get("Decision", ""))
mixte = sum(1 for r in rows if "Mixte" in r.get("Decision", ""))
indet = sum(1 for r in rows if r.get("Decision", "") in ("Indéterminé", ""))
print(f" Favorable établissement : {fav}")
print(f" Défavorable établissement : {defav}")
print(f" Mixte : {mixte}")
print(f" Indéterminé : {indet}")
write_excel(rows, output_path)
print()
print("Terminé.")
if __name__ == "__main__":
main()

View File

@@ -1,690 +0,0 @@
#!/usr/bin/env python3
"""
parse_decision_ucr.py — Extraction des décisions UCR depuis un PDF scanné (contrôle T2A)
Entrée : PDF scanné de décision UCR (CPAM / Assurance Maladie)
Sortie : Fichier Excel (.xlsx) avec une feuille unique
Colonnes extraites (enrichies pour analyse IA) :
Champ, OGC, Type_desaccord,
Code_etablissement, Libelle_etablissement,
Code_controleurs, Libelle_controleurs,
Codes_retenus_final,
Decision, Texte_decision_complet, Resume_motif,
Regles_citees, References_guide,
GHM_mentionne, GHS_mentionne, GHM_final, GHS_final,
Impact_groupage
"""
from __future__ import annotations
import re
import sys
from pathlib import Path
import pymupdf
import pytesseract
from PIL import Image
import io
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
import unicodedata
# ---------------------------------------------------------------------------
# 0. Normalisation texte OCR
# ---------------------------------------------------------------------------
def normalize_text(text: str) -> str:
"""Normalise les apostrophes, guillemets et espaces issus de l'OCR."""
text = text.replace("\u2018", "'").replace("\u2019", "'")
text = text.replace("\u201C", '"').replace("\u201D", '"')
text = text.replace("\u00AB", '"').replace("\u00BB", '"')
text = text.replace("''", "'")
text = text.replace("\u00A0", " ").replace("\u202F", " ")
# Erreurs OCR courantes
text = re.sub(r"\bF'UCR\b", "l'UCR", text)
text = re.sub(r"\bl''UCR\b", "l'UCR", text)
return text
# ---------------------------------------------------------------------------
# 1. OCR
# ---------------------------------------------------------------------------
def ocr_pdf(pdf_path: str, dpi: int = 300) -> str:
"""Extrait le texte de toutes les pages du PDF via Tesseract OCR."""
doc = pymupdf.open(pdf_path)
full_text = []
total = len(doc)
for i, page in enumerate(doc):
print(f" OCR page {i+1}/{total}...", end="\r")
mat = pymupdf.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
img = Image.open(io.BytesIO(pix.tobytes("png")))
text = pytesseract.image_to_string(img, lang="fra")
full_text.append(text)
print(f" OCR terminé : {total} pages. ")
return normalize_text("\n\n".join(full_text))
# ---------------------------------------------------------------------------
# 2. Parsing — Regex
# ---------------------------------------------------------------------------
RE_CHAMP = re.compile(
r"Champ\s*(?:n°\s*)?(\d+)\s*[:\-—]?\s*(?:Séjours|:)",
re.IGNORECASE,
)
RE_OGC_HEADER = re.compile(
r"(?:^|\n)\s*OGC\s+(\d+)\s*:",
re.MULTILINE,
)
RE_TYPE_DESACCORD = re.compile(
r"(?:désaccord|discussion)\s+porte\s+(?:sur\s+)?(?:le\s+|les\s+)?(DP\s+et\s+(?:le\s+)?DAS|DP\s+et\s+DAS|DP|DAS)",
re.IGNORECASE,
)
RE_CIM10 = re.compile(r"\b([A-Z]\d{2}(?:\.\d{1,2})?)\b")
RE_CODAGE_ETS = re.compile(
r"Codage\s+[ée]tablissement\s*:\s*(.*?)(?=Codage\s+contr[ôo]leurs)",
re.IGNORECASE | re.DOTALL,
)
RE_CODAGE_CTRL = re.compile(
r"Codage\s+contr[ôo]leurs\s*:\s*(.*?)(?=D[EÉ]C[I1]?SION\s+UCR|PROPOSITION\s+UCR)",
re.IGNORECASE | re.DOTALL,
)
RE_DECISION = re.compile(
r"(?:D[EÉ]C[I1]?SION|PROPOSITION)\s+UCR\s*:?\s*(.*)",
re.IGNORECASE | re.DOTALL,
)
# --- Classification ---
RE_FAVORABLE = re.compile(
r"(?:"
r"retient\s+(?:la\s+demande|le\s+codage|l'avis)\s+(?:de\s+)?l'[ée]tablissement"
r"|retient\s+en\s+D[PA]S\s+le\s+code"
r"|retient\s+le\s+codage\s+du\s+DP\s+de\s+l'[ée]tablissement"
r"|l'UCR\s+retient\s+l'avis\s+de\s+l'[ée]tablissement"
r"|confirme\s+l'avis\s+(?:de\s+)?l'[ée]tablissement"
r")",
re.IGNORECASE,
)
RE_DEFAVORABLE = re.compile(
r"confirme\s+l'avis\s+des\s+(?:m[ée]decins\s+)?contr[oô]leurs",
re.IGNORECASE,
)
RE_UCR_RETIENT = re.compile(r"l'UCR\s+retient\b", re.IGNORECASE)
RE_UCR_PROPOSE = re.compile(r"l'UCR\s+propose\b", re.IGNORECASE)
RE_NE_RETIENT_PAS = re.compile(r"ne\s+retient\s+pas", re.IGNORECASE)
# --- GHM / GHS ---
RE_GHM = re.compile(r"GHM\s+([A-Z0-9]{5,7})", re.IGNORECASE)
RE_GHS = re.compile(r"GHS\s+(\d{3,5})", re.IGNORECASE)
RE_MIEUX_VALORISE = re.compile(r"mieux\s+valoris[ée]", re.IGNORECASE)
RE_PAS_MODIFIE = re.compile(
r"(?:ne\s+modifie\s+pas|ne\s+change(?:nt)?\s+pas|pas\s+de\s+changement|reste\s+group[ée])",
re.IGNORECASE,
)
# --- Règles et références ---
# Pages du guide méthodologique
RE_GUIDE_PAGE = re.compile(
r"(?:guide\s+m[ée]thodologique|guide)\s*(?:p\.?|page)\s*(\d{1,3})",
re.IGNORECASE,
)
RE_PAGE_GUIDE = re.compile(
r"(?:p\.?|page)\s*(\d{1,3})\s+du\s+guide",
re.IGNORECASE,
)
# Règles T (T3, T7, etc.)
RE_REGLE_T = re.compile(
r"r[èe]gle\s+(T\d+)",
re.IGNORECASE,
)
# Fascicules ATIH
RE_FASCICULE = re.compile(
r"fascicule\s+(?:ATIH\s+)?(?:de\s+codage\s+)?(?:PMSI\s+)?(?:n°\s*)?(\d{1,2})?\s*(?:[-]\s*)?([A-ZÀ-Üa-zà-ü\s]+?)(?:\s+(?:de\s+)?(\d{4}))?(?:\s*(?:,\s*)?(?:p\.?\s*|page\s*)(\d+))?",
re.IGNORECASE,
)
# Avis Agora
RE_AVIS_AGORA = re.compile(
r"avis\s+(?:agora|AGORA)\s*(?:n°\s*)?(\d+)",
re.IGNORECASE,
)
# Consignes de codage avec page
RE_CONSIGNES_CODAGE = re.compile(
r"consignes?\s+de\s+codage\s*(?:p\.?\s*|page\s*)(\d+)",
re.IGNORECASE,
)
# Codage retenu / DP retenu / DAS retenu
RE_CODAGE_RETENU = re.compile(
r"(?:codage\s+retenu|DP\s*(?:retenu|=)|DAS\s*(?:retenu|=)|code\s+retenu|est\s+cod[ée]\s+en|se\s+code)\s*(?:est\s+)?(?::?\s*)([A-Z]\d{2}(?:\.\d{1,2})?)",
re.IGNORECASE,
)
# "est ajouté en DAS" / "ajout du code X"
RE_CODE_AJOUTE = re.compile(
r"(?:est\s+ajout[ée]\s+en\s+D[PA]S|ajout(?:er)?\s+(?:du\s+|en\s+D[PA]S\s+(?:le\s+)?)?(?:code\s+)?)\s*(?::?\s*)([A-Z]\d{2}(?:\.\d{1,2})?)",
re.IGNORECASE,
)
# ---------------------------------------------------------------------------
# 2b. Fonctions d'extraction
# ---------------------------------------------------------------------------
def extract_codes_and_label(text: str) -> tuple[str, str]:
"""Extrait les codes CIM-10 et le libellé depuis un bloc de codage."""
codes = RE_CIM10.findall(text)
labels = re.findall(r'"](.*?)[»"]', text)
code_str = " + ".join(codes) if codes else ""
label_str = " | ".join(labels) if labels else text.strip()[:120]
label_str = re.sub(r"\s+", " ", label_str).strip()
return code_str, label_str
def extract_codes_retenus(decision_text: str) -> str:
"""Extrait les codes finalement retenus par l'UCR."""
codes = set()
for m in RE_CODAGE_RETENU.finditer(decision_text):
codes.add(m.group(1))
for m in RE_CODE_AJOUTE.finditer(decision_text):
codes.add(m.group(1))
return " + ".join(sorted(codes)) if codes else ""
def extract_regles(text: str) -> str:
"""Extrait les règles de codage citées (T3, T7, etc.)."""
regles = set()
for m in RE_REGLE_T.finditer(text):
regles.add(m.group(1).upper())
return ", ".join(sorted(regles)) if regles else ""
def extract_references(text: str) -> str:
"""Extrait toutes les références (guide, fascicules, avis Agora, consignes)."""
refs = []
# Pages du guide méthodologique
pages_guide = set()
for m in RE_GUIDE_PAGE.finditer(text):
pages_guide.add(m.group(1))
for m in RE_PAGE_GUIDE.finditer(text):
pages_guide.add(m.group(1))
if pages_guide:
refs.append("Guide méthodologique p." + ", p.".join(sorted(pages_guide, key=int)))
# Fascicules ATIH
for m in RE_FASCICULE.finditer(text):
num = m.group(1) or ""
sujet = (m.group(2) or "").strip()
annee = m.group(3) or ""
page = m.group(4) or ""
ref = "Fascicule"
if num:
ref += f" {num}"
if sujet:
ref += f" {sujet}"
if annee:
ref += f" ({annee})"
if page:
ref += f" p.{page}"
refs.append(ref.strip())
# Avis Agora
for m in RE_AVIS_AGORA.finditer(text):
refs.append(f"Avis Agora n°{m.group(1)}")
# Consignes de codage
for m in RE_CONSIGNES_CODAGE.finditer(text):
refs.append(f"Consignes de codage p.{m.group(1)}")
# Dédupliquer
seen = set()
unique = []
for r in refs:
r_lower = r.lower()
if r_lower not in seen:
seen.add(r_lower)
unique.append(r)
return " ; ".join(unique) if unique else ""
def extract_ghm_ghs_all(text: str) -> tuple[list[str], list[str]]:
"""Extrait tous les GHM et GHS mentionnés."""
ghms = []
for m in RE_GHM.finditer(text):
v = m.group(1).upper()
if v not in ghms:
ghms.append(v)
ghss = []
for m in RE_GHS.finditer(text):
v = m.group(1)
if v not in ghss:
ghss.append(v)
return ghms, ghss
def classify_decision(decision_text: str) -> str:
"""Classifie la décision : Favorable / Défavorable / Mixte / Indéterminé."""
text = normalize_text(decision_text)
fav = bool(RE_FAVORABLE.search(text))
defav = bool(RE_DEFAVORABLE.search(text))
ucr_retient = bool(RE_UCR_RETIENT.search(text))
ucr_propose = bool(RE_UCR_PROPOSE.search(text))
ne_retient_pas = bool(RE_NE_RETIENT_PAS.search(text))
if ucr_retient and not ne_retient_pas:
fav = True
if ucr_propose and not defav:
fav = True
if (ucr_retient or fav) and defav:
return "Mixte"
if fav and defav:
return "Mixte"
elif fav:
return "Favorable établissement"
elif defav:
return "Défavorable établissement"
else:
return "Indéterminé"
def clean_decision_text(text: str) -> str:
"""Nettoie le texte de décision (supprime artifacts OCR en fin de bloc)."""
# Supprimer les lignes de pied de page UCR
text = re.sub(r"\n\s*(?:UCR\s+NA|CONFIDENTIEL|Page\s+\d+).*$", "", text, flags=re.MULTILINE | re.IGNORECASE)
# Supprimer les artefacts OCR de fin (séquences de caractères isolés)
text = re.sub(r"\n\s*[A-Z]{1,4}\s*(?:—|—|-)\s*[a-zA-Z]{0,3}\s*$", "", text, flags=re.MULTILINE)
text = re.sub(r"\n\s*(?:EE|ESS|2 ae|A D ES|EE nd)\s*$", "", text, flags=re.MULTILINE | re.IGNORECASE)
# Normaliser les espaces
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# ---------------------------------------------------------------------------
# 2c. Parsing des blocs
# ---------------------------------------------------------------------------
def parse_ogc_block(block_text: str, champ: int, ogc_num: int) -> dict:
"""Parse un bloc OGC et retourne un dictionnaire structuré enrichi."""
result = {
"Champ": champ,
"OGC": ogc_num,
"Type_desaccord": "",
"Code_etablissement": "",
"Libelle_etablissement": "",
"Code_controleurs": "",
"Libelle_controleurs": "",
"Codes_retenus_final": "",
"Decision": "",
"Texte_decision_complet": "",
"Resume_motif": "",
"Regles_citees": "",
"References_guide": "",
"GHM_mentionne": "",
"GHS_mentionne": "",
"GHM_final": "",
"GHS_final": "",
"Impact_groupage": "",
}
# Type de désaccord
m = RE_TYPE_DESACCORD.search(block_text)
if m:
raw = m.group(1).upper().strip()
raw = re.sub(r"\s+", " ", raw)
if "DP" in raw and "DAS" in raw:
result["Type_desaccord"] = "DP + DAS"
elif "DAS" in raw:
result["Type_desaccord"] = "DAS"
elif "DP" in raw:
result["Type_desaccord"] = "DP"
# Codage établissement
m = RE_CODAGE_ETS.search(block_text)
if m:
raw_ets = m.group(1).strip()
result["Code_etablissement"], result["Libelle_etablissement"] = extract_codes_and_label(raw_ets)
# Codage contrôleurs
m = RE_CODAGE_CTRL.search(block_text)
if m:
raw_ctrl = m.group(1).strip()
if re.search(r"non\s+repris", raw_ctrl, re.IGNORECASE):
result["Code_controleurs"] = "non repris"
result["Libelle_controleurs"] = ""
else:
result["Code_controleurs"], result["Libelle_controleurs"] = extract_codes_and_label(raw_ctrl)
# Décision UCR — TEXTE COMPLET
m = RE_DECISION.search(block_text)
if m:
decision_text = m.group(1).strip()
decision_clean = clean_decision_text(decision_text)
result["Decision"] = classify_decision(decision_clean)
result["Texte_decision_complet"] = decision_clean
# Résumé court (première phrase significative)
resume = re.sub(r"\s+", " ", decision_clean)[:300].strip()
# Couper à la dernière phrase complète
last_dot = resume.rfind(".")
if last_dot > 100:
resume = resume[:last_dot + 1]
result["Resume_motif"] = resume
# Codes finalement retenus
result["Codes_retenus_final"] = extract_codes_retenus(decision_clean)
# Règles citées (T3, T7, etc.)
result["Regles_citees"] = extract_regles(block_text)
# Références (guide, fascicules, avis Agora)
result["References_guide"] = extract_references(block_text)
# GHM / GHS — tous ceux mentionnés et le dernier (= final)
ghms, ghss = extract_ghm_ghs_all(block_text)
if ghms:
result["GHM_mentionne"] = " / ".join(ghms)
result["GHM_final"] = ghms[-1] # Le dernier mentionné est souvent le final
if ghss:
result["GHS_mentionne"] = " / ".join(ghss)
result["GHS_final"] = ghss[-1]
# Impact groupage
if RE_MIEUX_VALORISE.search(block_text):
result["Impact_groupage"] = "Mieux valorisé"
elif RE_PAS_MODIFIE.search(block_text):
result["Impact_groupage"] = "Pas de changement"
return result
def parse_grouped_ogcs(text_block: str, champ: int, ogc_nums: list[int]) -> list[dict]:
"""Parse un bloc groupé (ex: OGC 14,19,46,50 traités ensemble)."""
template = parse_ogc_block(text_block, champ, ogc_nums[0])
results = []
for num in ogc_nums:
row = dict(template)
row["OGC"] = num
results.append(row)
return results
def parse_document(full_text: str) -> list[dict]:
"""Parse le texte OCR complet et retourne la liste des dossiers."""
rows = []
champ_positions = [(m.start(), int(m.group(1))) for m in RE_CHAMP.finditer(full_text)]
ogc_positions = [(m.start(), int(m.group(1))) for m in RE_OGC_HEADER.finditer(full_text)]
def get_champ_for_position(pos: int) -> int:
ch = 0
for cp, cn in champ_positions:
if cp <= pos:
ch = cn
else:
break
return ch
# Blocs groupés
RE_GROUPED = re.compile(
r"(?:Concernant|Pour)\s+les\s+OGC\s+([\d,\s]+)",
re.IGNORECASE,
)
grouped_ogcs = set()
for m in RE_GROUPED.finditer(full_text):
nums = [int(n.strip()) for n in m.group(1).split(",") if n.strip().isdigit()]
if len(nums) > 1:
start = m.start()
end = len(full_text)
for op, on in ogc_positions:
if op > start + 50 and on not in nums:
end = op
break
block = full_text[start:end]
champ = get_champ_for_position(start)
group_rows = parse_grouped_ogcs(block, champ, nums)
rows.extend(group_rows)
grouped_ogcs.update(nums)
# OGC individuels
for idx, (pos, ogc_num) in enumerate(ogc_positions):
champ = get_champ_for_position(pos)
end = len(full_text)
for next_pos, _ in ogc_positions[idx + 1:]:
if next_pos > pos + 20:
end = next_pos
break
for cp, _ in champ_positions:
if pos < cp < end:
end = cp
break
block = full_text[pos:end]
row = parse_ogc_block(block, champ, ogc_num)
if ogc_num in grouped_ogcs:
if row["Code_etablissement"] and row["Decision"]:
rows = [r for r in rows if r["OGC"] != ogc_num]
rows.append(row)
else:
if row["Code_etablissement"] or row["Decision"]:
rows.append(row)
rows.sort(key=lambda r: (r["Champ"], r["OGC"]))
# Dédupliquer
seen = {}
deduped = []
for r in rows:
key = r["OGC"]
if key in seen:
old = seen[key]
old_score = sum(1 for v in old.values() if v)
new_score = sum(1 for v in r.values() if v)
if new_score > old_score:
deduped = [x for x in deduped if x["OGC"] != key]
deduped.append(r)
seen[key] = r
else:
seen[key] = r
deduped.append(r)
deduped.sort(key=lambda r: (r["Champ"], r["OGC"]))
return deduped
# ---------------------------------------------------------------------------
# 3. Export Excel
# ---------------------------------------------------------------------------
HEADERS = [
"Champ",
"OGC",
"Type_desaccord",
"Code_etablissement",
"Libelle_etablissement",
"Code_controleurs",
"Libelle_controleurs",
"Codes_retenus_final",
"Decision",
"Texte_decision_complet",
"Resume_motif",
"Regles_citees",
"References_guide",
"GHM_mentionne",
"GHS_mentionne",
"GHM_final",
"GHS_final",
"Impact_groupage",
]
HEADER_LABELS = [
"Champ",
"N° OGC",
"Type désaccord",
"Code(s) Établissement",
"Libellé Établissement",
"Code(s) Contrôleurs",
"Libellé Contrôleurs",
"Code(s) retenus (final)",
"Décision UCR",
"Texte décision complet",
"Résumé du motif",
"Règles codage citées",
"Références (guide, fascicules, avis)",
"GHM mentionné(s)",
"GHS mentionné(s)",
"GHM final",
"GHS final",
"Impact groupage",
]
def write_excel(rows: list[dict], output_path: str):
"""Écrit les résultats dans un fichier Excel (feuille unique)."""
wb = Workbook()
ws = wb.active
ws.title = "Décisions UCR"
# Styles
header_font = Font(bold=True, color="FFFFFF", size=11)
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
fav_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
defav_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
mixte_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
# En-têtes
for col, label in enumerate(HEADER_LABELS, 1):
cell = ws.cell(row=1, column=col, value=label)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# Données
for row_idx, data in enumerate(rows, 2):
for col_idx, key in enumerate(HEADERS, 1):
val = data.get(key, "")
cell = ws.cell(row=row_idx, column=col_idx, value=val)
cell.border = thin_border
cell.alignment = Alignment(vertical="top", wrap_text=True)
# Colorer la colonne Décision
dec_col = HEADERS.index("Decision") + 1
decision_cell = ws.cell(row=row_idx, column=dec_col)
dv = str(decision_cell.value or "")
if "Favorable" in dv and "Défavorable" not in dv:
decision_cell.fill = fav_fill
elif "Défavorable" in dv:
decision_cell.fill = defav_fill
elif "Mixte" in dv:
decision_cell.fill = mixte_fill
# Largeurs de colonnes
col_widths = {
"Champ": 8, "OGC": 8, "Type_desaccord": 14,
"Code_etablissement": 22, "Libelle_etablissement": 40,
"Code_controleurs": 22, "Libelle_controleurs": 40,
"Codes_retenus_final": 22,
"Decision": 24, "Texte_decision_complet": 80,
"Resume_motif": 60,
"Regles_citees": 16, "References_guide": 50,
"GHM_mentionne": 16, "GHS_mentionne": 16,
"GHM_final": 12, "GHS_final": 10,
"Impact_groupage": 20,
}
for i, key in enumerate(HEADERS, 1):
ws.column_dimensions[ws.cell(row=1, column=i).column_letter].width = col_widths.get(key, 15)
# Filtre automatique
last_col_letter = ws.cell(row=1, column=len(HEADERS)).column_letter
ws.auto_filter.ref = f"A1:{last_col_letter}{len(rows)+1}"
# Figer la première ligne
ws.freeze_panes = "A2"
wb.save(output_path)
print(f"Excel enregistré : {output_path}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if len(sys.argv) < 2:
pdf_path = str(Path(__file__).parent / "SPHO-FINANC26020915121.pdf")
else:
pdf_path = sys.argv[1]
output_path = str(Path(pdf_path).with_suffix(".xlsx"))
print(f"Fichier PDF : {pdf_path}")
print("Étape 1/3 : OCR du document...")
full_text = ocr_pdf(pdf_path)
txt_path = str(Path(pdf_path).with_suffix(".txt"))
Path(txt_path).write_text(full_text, encoding="utf-8")
print(f" Texte brut sauvegardé : {txt_path}")
print("Étape 2/3 : Extraction des décisions...")
rows = parse_document(full_text)
print(f" {len(rows)} dossiers OGC extraits.")
fav = sum(1 for r in rows if "Favorable" in r.get("Decision", "") and "Défavorable" not in r.get("Decision", ""))
defav = sum(1 for r in rows if "Défavorable" in r.get("Decision", ""))
mixte = sum(1 for r in rows if "Mixte" in r.get("Decision", ""))
indet = sum(1 for r in rows if r.get("Decision", "") in ("Indéterminé", ""))
refs_count = sum(1 for r in rows if r.get("References_guide"))
codes_ret = sum(1 for r in rows if r.get("Codes_retenus_final"))
regles = sum(1 for r in rows if r.get("Regles_citees"))
print(f" Favorable établissement : {fav}")
print(f" Défavorable établissement : {defav}")
print(f" Mixte : {mixte}")
print(f" Indéterminé : {indet}")
print(f" Avec références citées : {refs_count}")
print(f" Avec codes retenus : {codes_ret}")
print(f" Avec règles T : {regles}")
print("Étape 3/3 : Génération du fichier Excel...")
write_excel(rows, output_path)
print("Terminé.")
if __name__ == "__main__":
main()

View File

@@ -1,280 +0,0 @@
# Plan : Patch 0 + Patch 1 — DP shortlist & scoring déterministe
## Contexte
Le pipeline T2A extrait le Diagnostic Principal (DP) de deux types de documents :
- **Trackare** : DP pré-codé CIM-10 → aucun scoring nécessaire (priorité absolue)
- **CRH** : fallback fragile — cherche dans "Au total" via `CIM10_MAP`, puis premier edsnlp
**Problème** : le CRH parser ne capture que 7 sections (dont `conclusion`). Les sections à fort signal DP ("Diagnostic de sortie", "Diagnostics retenus", "Diagnostic principal") sont ignorées. Pas de scoring multi-candidats, pas de détection d'ambiguïté.
**Objectif** : DP plus juste sur les CRH sans ajouter de dépendance LLM obligatoire.
---
## Patch 0 — CRH Parser : capter les sections utiles au DP
### 0.1 Ajouter 3 patterns de section dans `crh_parser.py`
**Fichier** : `src/extraction/crh_parser.py` (ligne 116-124, bloc `section_patterns`)
Ajouter **avant** le pattern `conclusion` (pour que les terminaisons soient correctes) :
| Clé | Patterns capturés |
|-----|-------------------|
| `diag_sortie` | "Diagnostic(s) de sortie", "Diagnostic(s) retenu(s) (à la sortie)" |
| `diag_principal` | "Diagnostic principal", "Problème principal" |
| `synthese` | "Synthèse", "En résumé", "En synthèse" |
Pas besoin d'un alias `motif` séparé : `motif_hospitalisation` existe déjà (ligne 117) et `sejour.motif` est extrait par `_extract_sejour_info()` (ligne 74-80).
### 0.2 Ajuster les terminaisons des patterns existants
Le pattern `conclusion` (ligne 121) se termine sur `(?=\n\s*(?:Devenir|TTT|Traitement)|$)`.
Il faut ajouter les nouveaux en-têtes comme terminaisons possibles pour éviter la capture excessive :
- Ajouter `Diagnostic(?:s)?\s+de\s+sortie|Diagnostic(?:s)?\s+retenu|Synthèse|En résumé` dans les groupes de terminaison de `conclusion`, `histoire_maladie`, `examen_clinique`.
### 0.3 Tests
**Fichier** : `tests/test_extraction.py`
8 tests à ajouter :
1. `test_parse_diag_sortie` — "Diagnostic de sortie :" capturé
2. `test_parse_diagnostics_retenus` — "Diagnostics retenus :" capturé
3. `test_parse_diag_principal` — "Diagnostic principal :" capturé
4. `test_parse_probleme_principal` — "Problème principal :" capturé
5. `test_parse_synthese` — "Synthèse :" capturé
6. `test_existing_sections_preserved` — les 7 sections existantes inchangées
7. `test_diag_sortie_multiline` — section multi-lignes avec codes CIM-10
8. `test_conclusion_does_not_overflow_into_diag_sortie` — terminaisons correctes
---
## Patch 1 — DP shortlist + scoring déterministe + REVIEW
### 1.1 Modèles de données
**Fichier** : `src/config.py`
**Nouveau `DPCandidate(BaseModel)`** (après `CodeDecision`, avant `Diagnostic`) :
```
code: Optional[str] # Code CIM-10 (peut être None si non résolu)
label: str # Texte du diagnostic
source_section: str # "diag_sortie" | "diag_principal" | "conclusion" | "synthese" | "motif_hospitalisation" | "edsnlp" | "regex"
source_excerpt: Optional[str] # ~200 chars du texte source
source_page: Optional[int] # Page 1-indexed
confidence_raw: Optional[str] # "high" | "medium" | "low"
score: int = 0 # Score final
score_details: dict[str, int] # Détail : {"section": +4, "negation": -4, ...}
is_negated: bool = False
is_conditional: bool = False
```
**Nouveau `DPSelection(BaseModel)`** :
```
verdict: str = "confirmed" # "confirmed" | "review"
candidates: list[DPCandidate] # Triés par score décroissant
winner_reason: Optional[str] # Ex: "score 8 vs 4" ou "candidat unique"
llm_tiebreak: Optional[dict] # {"winner": "A"|"B", "reason": "..."}
```
**Ajout sur `DossierMedical`** (après `diagnostic_principal`, ligne 658) :
```
dp_selection: Optional[DPSelection] = None
```
### 1.2 Constantes de scoring
**Fichier** : `src/config.py` (section constantes, après les chemins)
```python
DP_SCORING_WEIGHTS = {
"section_diag_sortie": 4,
"section_diag_principal": 4,
"section_diagnostics_retenus": 4, # alias diag_sortie
"section_motif_hospitalisation": 3,
"section_conclusion": 2,
"section_synthese": 2,
"section_edsnlp": 1,
"section_regex": 1,
"proof_excerpt": 2, # excerpt non-vide + page
"negation": -4, # "pas de", "absence de", "éliminé"
"conditional": -3, # "suspect", "probable", "?"
"z_code_dp": -2, # sauf whitelist
"r_code_dp": -2, # symptôme en DP
"ccam_coherence": 1, # futur
"bio_coherence": 1, # futur
}
DP_REVIEW_THRESHOLD = 2 # delta minimum top1-top2 pour éviter REVIEW
```
### 1.3 Nouveau module `src/medical/dp_scoring.py`
4 fonctions publiques :
#### `build_dp_shortlist(parsed, text, edsnlp_result, dossier) -> list[DPCandidate]`
Collecte les candidats depuis :
1. **Sections CRH** (`parsed["sections"]`) : pour chaque section à poids fort (`diag_sortie`, `diag_principal`, `conclusion`, `synthese`, `motif_hospitalisation`), extraire les diagnostics via :
- `CIM10_MAP` (itération substring normalisé) — **réutiliser** `diagnostic_extraction.CIM10_MAP`
- Regex codes CIM-10 explicites `r"([A-Z]\d{2}(?:\.\d{1,2})?)"` dans le texte de section
- Valider via `cim10_dict.validate_code()`
2. **edsnlp** : chaque entité CIM-10 non-niée, non-hypothétique → candidat `source_section="edsnlp"`
3. **Regex patterns** : réutiliser `_find_diagnostic_principal()` et `CIM10_MAP` sur texte complet → `source_section="regex"`
4. **Dédup** par code CIM-10 : si même code depuis 2 sections, garder la section la plus forte
#### `score_candidates(candidates, dossier) -> list[DPCandidate]`
Pour chaque candidat :
1. Bonus section : `DP_SCORING_WEIGHTS["section_" + source_section]`
2. Bonus preuve : +2 si `source_excerpt` non vide ET `source_page` non None
3. Pénalité négation : chercher dans fenêtre ~200 chars autour du diagnostic. **Réutiliser** les patterns de `veto_engine.py` (lignes 31-41) : "pas de", "absence de", "non retenu", "exclu", "éliminé", "négatif"
4. Pénalité conditionnel : **réutiliser** patterns `veto_engine.py` (lignes 43-53) : "suspect", "probable", "hypothèse", "?", "à confirmer", "éventuel"
5. Pénalité Z/R code : -2, sauf whitelist Z-codes admis en DP (Z51.1, Z51.0, Z38, Z50.1, Z43, Z45, Z09, Z54, Z75, Z03, Z04, Z08) — **même whitelist** que `VETO-20` dans `veto_engine.py` (ligne 376-386)
6. Stocker détail dans `candidate.score_details`
7. Trier par score décroissant
#### `select_dp(candidates, dossier, use_llm=False) -> DPSelection`
1. 0 candidat → `verdict="review"`, candidates vide
2. 1 candidat → `verdict="confirmed"`, `winner_reason="candidat unique"`
3. Delta top1-top2 >= `DP_REVIEW_THRESHOLD``verdict="confirmed"`
4. Delta < seuil → `verdict="review"`, retourner top 3 avec preuves
5. Si `use_llm=True` ET scores identiques → appeler `_llm_tiebreak()`
#### `_llm_tiebreak(candidate_a, candidate_b, dossier) -> dict | None`
- Appel LLM local (`ollama_client.call_ollama`, role="coding", temperature=0.0)
- Prompt dans `src/prompts/templates.py` (nouveau template `DP_TIEBREAK`)
- Input : motif + sections fortes + 2 candidats + preuves
- Output attendu : `{"winner": "A"|"B", "reason": "..."}`
- Si erreur ou réponse invalide → retourner `None` → verdict reste "review"
### 1.4 Template LLM tiebreaker
**Fichier** : `src/prompts/templates.py`
Nouveau template `DP_TIEBREAK` — prompt DIM expert, choix entre 2 candidats, sortie JSON stricte. Critères : motif principal de prise en charge, ressources mobilisées, spécificité du code.
### 1.5 Intégration dans `_extract_diagnostics()`
**Fichier** : `src/medical/diagnostic_extraction.py` (ligne 168-181)
Remplacer le bloc actuel (lignes 168-181) :
```python
if not dossier.diagnostic_principal:
dp = _find_diagnostic_principal(text_lower, conclusion)
if dp:
dossier.diagnostic_principal = dp
elif edsnlp_codes:
...
```
Par :
```python
if not dossier.diagnostic_principal:
candidates = build_dp_shortlist(parsed, text, edsnlp_result, dossier)
candidates = score_candidates(candidates, dossier)
selection = select_dp(candidates, dossier, use_llm=use_rag)
dossier.dp_selection = selection
if selection.candidates:
winner = selection.candidates[0]
dossier.diagnostic_principal = Diagnostic(
texte=winner.label,
cim10_suggestion=winner.code,
source=winner.source_section,
source_page=winner.source_page,
source_excerpt=winner.source_excerpt,
)
```
**Note** : `_find_diagnostic_principal()` est conservée comme utilitaire interne (appelée par `build_dp_shortlist()` pour les candidats regex). `edsnlp_codes` first-entity fallback est absorbé dans le shortlist (source="edsnlp").
**Paramètre `use_rag`** : déjà passé à `_extract_diagnostics()` via `extract_medical_info()` dans `cim10_extractor.py`. Il contrôle le tiebreaker LLM.
### 1.6 Propagation dans la fusion
**Fichier** : `src/medical/fusion.py` (fonction `merge_dossiers()`)
Après sélection du DP fusionné par `_prefer_most_specific_dp()`, propager `dp_selection` depuis le dossier source du DP retenu.
### 1.7 Sérialisation
`DPCandidate` et `DPSelection` héritent de `BaseModel``model_dump()` natif.
Le champ `dp_selection` apparaît dans le JSON de sortie uniquement si non-None (Pydantic `exclude_none`).
Les dossiers Trackare auront `dp_selection=None` (pas de scoring).
---
## Ordre d'implémentation
1. **Patch 0** (prérequis de Patch 1)
- `crh_parser.py` : 3 nouveaux patterns + ajustement terminaisons
- `tests/test_extraction.py` : 8 tests
- Validation : `pytest tests/test_extraction.py -v`
2. **Patch 1a — Modèles** (étapes 1.1, 1.2)
- `config.py` : `DPCandidate`, `DPSelection`, `DP_SCORING_WEIGHTS`, champ `dp_selection`
3. **Patch 1b — Module scoring** (étapes 1.3, 1.4)
- Créer `src/medical/dp_scoring.py`
- Ajouter `DP_TIEBREAK` dans `prompts/templates.py`
4. **Patch 1c — Intégration** (étapes 1.5, 1.6)
- Modifier `diagnostic_extraction.py` (remplacer fallback naïf)
- Modifier `fusion.py` (propager dp_selection)
5. **Patch 1d — Tests**
- Créer `tests/test_dp_scoring.py` (~20 tests)
- Enrichir `tests/test_medical.py` (2 tests intégration)
- Validation : `pytest tests/ -v --ignore=tests/test_integration.py`
---
## Fichiers impactés
| Fichier | Action | Patch |
|---------|--------|-------|
| `src/extraction/crh_parser.py` | Modifier : 3 patterns + terminaisons | 0 |
| `tests/test_extraction.py` | Modifier : 8 tests | 0 |
| `src/config.py` | Modifier : 2 modèles + constantes + champ DossierMedical | 1a |
| `src/medical/dp_scoring.py` | **Créer** : 4 fonctions | 1b |
| `src/prompts/templates.py` | Modifier : template DP_TIEBREAK | 1b |
| `src/medical/diagnostic_extraction.py` | Modifier : remplacer lignes 168-181 | 1c |
| `src/medical/fusion.py` | Modifier : propager dp_selection | 1c |
| `tests/test_dp_scoring.py` | **Créer** : ~20 tests | 1d |
| `tests/test_medical.py` | Modifier : 2 tests intégration | 1d |
## Fonctions existantes réutilisées
| Fonction | Fichier | Usage dans Patch 1 |
|----------|---------|-------------------|
| `CIM10_MAP` | `diagnostic_extraction.py:22` | Lookup candidats par section |
| `normalize_text()` | `cim10_dict.py` | Normalisation texte avant matching |
| `validate_code()` | `cim10_dict.py` | Validation candidats |
| `lookup()` | `cim10_dict.py` | Résolution label → code |
| `is_valid_diagnostic_text()` | `das_filter.py` | Filtrage bruit candidats |
| `clean_diagnostic_text()` | `das_filter.py` | Nettoyage texte |
| `call_ollama()` | `ollama_client.py` | Tiebreaker LLM |
| Patterns négation (lignes 31-41) | `veto_engine.py` | Scoring pénalité négation |
| Patterns conditionnel (lignes 43-53) | `veto_engine.py` | Scoring pénalité conditionnel |
| Whitelist Z-codes (lignes 376-386) | `veto_engine.py` | Exception pénalité Z-code |
## Risques et mitigations
| Risque | Mitigation |
|--------|-----------|
| Régression 250 dossiers | Trackare DP inchangé (priorité absolue). Seuls CRH sans DP Trackare affectés. L'ancien fallback (conclusion) est inclus comme candidat score +2. |
| Bruit sections CRH | `is_valid_diagnostic_text()` filtre les artefacts. Scoring pénalise codes invalides. |
| Performance | `build_dp_shortlist()` = regex/dict, < 1ms. LLM tiebreaker optionnel (flag `use_rag`). |
| Taille JSON | `dp_selection` uniquement si non-None (CRH). ~500 bytes par dossier. |
## Vérification
1. `pytest tests/test_extraction.py -v` — Patch 0 (sections CRH)
2. `pytest tests/test_dp_scoring.py -v` — Patch 1 (scoring)
3. `pytest tests/ -v --ignore=tests/test_integration.py` — non-régression complète
4. Run sur 5 dossiers CRH connus : vérifier que `dp_selection` apparaît dans le JSON et que le verdict est cohérent

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +0,0 @@
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from src.medical.rag_index import build_index
import logging
logging.basicConfig(level=logging.INFO)
build_index(force=True)
print("Indexation terminée avec succès.")

View File

@@ -1,239 +0,0 @@
#!/usr/bin/env python3
"""Test qualité CPAM — format TIM (mémoire en défense) sur dossiers réels.
Charge des dossiers JSON existants et appelle generate_cpam_response()
pour valider le nouveau format TIM sans relancer le pipeline complet.
"""
import json
import logging
import sys
import time
from pathlib import Path
# Ajouter le répertoire racine au path
sys.path.insert(0, str(Path(__file__).parent))
from src.config import DossierMedical, ControleCPAM
from src.control.cpam_response import generate_cpam_response
from src.control.cpam_validation import _is_new_tim_format
# Configurer logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-5s %(name)s%(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("test_cpam_quality")
# Dossiers à tester (variété de cas)
DOSSIERS_TEST = [
"183_23087212", # Désaccord DP+DAS
"116_23065570", # DAS
"143_23096917", # DP+DAS
"132_23080179", # Facturation
]
def load_dossier(name: str) -> DossierMedical | None:
"""Charge un dossier JSON depuis output/structured/."""
base = Path(__file__).parent / "output" / "structured" / name
# Préférer le fichier fusionné
fusionne = list(base.glob("*_fusionne_cim10.json"))
json_files = fusionne if fusionne else sorted(base.glob("*.json"))
if not json_files:
logger.error("Aucun JSON trouvé pour %s", name)
return None
with open(json_files[0], encoding="utf-8") as f:
data = json.load(f)
return DossierMedical(**data)
def test_dossier(name: str) -> dict:
"""Teste generate_cpam_response sur un dossier et retourne les métriques."""
logger.info("=" * 70)
logger.info("DOSSIER : %s", name)
logger.info("=" * 70)
dossier = load_dossier(name)
if not dossier:
return {"name": name, "error": "Dossier non trouvé"}
if not dossier.controles_cpam:
return {"name": name, "error": "Pas de contrôle CPAM"}
controle = dossier.controles_cpam[0]
logger.info("Contrôle : OGC %d%s", controle.numero_ogc, controle.titre)
logger.info("DP UCR : %s | DA UCR : %s", controle.dp_ucr or "-", controle.da_ucr or "-")
# Appeler generate_cpam_response
t0 = time.time()
text, result, rag_sources = generate_cpam_response(dossier, controle)
elapsed = time.time() - t0
metrics = {
"name": name,
"titre": controle.titre,
"elapsed_s": round(elapsed, 1),
"text_len": len(text),
"rag_sources": len(rag_sources),
"tier": controle.quality_tier or "?",
}
if result:
is_tim = _is_new_tim_format(result)
metrics["format"] = "TIM" if is_tim else "legacy"
if is_tim:
# Nouveau format TIM
moyens = result.get("moyens_defense", [])
confrontation = result.get("confrontation_bio", [])
codes_nd = result.get("codes_non_defendables", [])
refs = result.get("references", [])
conclusion = result.get("conclusion_dispositive", "")
# Compter les preuves dans les moyens
total_preuves = 0
preuves_with_ref = 0
for m in moyens:
if isinstance(m, dict):
for p in m.get("preuves", []):
if isinstance(p, dict):
total_preuves += 1
if p.get("ref"):
preuves_with_ref += 1
metrics["moyens_count"] = len(moyens)
metrics["preuves_count"] = total_preuves
metrics["preuves_with_ref"] = preuves_with_ref
metrics["confrontation_count"] = len(confrontation)
metrics["codes_nd_count"] = len(codes_nd)
metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0
metrics["conclusion_len"] = len(conclusion)
metrics["has_rappel_faits"] = bool(result.get("rappel_faits"))
metrics["has_reponse_cpam"] = bool(result.get("reponse_points_cpam"))
logger.info("-" * 40)
logger.info("FORMAT : TIM (mémoire en défense)")
logger.info("RÉSULTAT : %d chars, %.1fs, tier %s", len(text), elapsed, metrics["tier"])
logger.info(" Moyens de défense : %d", len(moyens))
logger.info(" Preuves : %d (dont %d avec tag)", total_preuves, preuves_with_ref)
logger.info(" Confrontation bio : %d entrées", len(confrontation))
logger.info(" Codes non défendables : %d", len(codes_nd))
logger.info(" Références : %d", metrics["refs_count"])
logger.info(" Sources RAG : %d", len(rag_sources))
if confrontation:
for row in confrontation:
if isinstance(row, dict):
logger.info(" Bio: %s%s = %s%s",
row.get("diagnostic", "?"), row.get("test", "?"),
row.get("valeur", "?"), row.get("verdict", "?"))
if codes_nd:
for nd in codes_nd:
if isinstance(nd, dict):
logger.info(" ⚠ Non défendable: %s%s",
nd.get("code", "?"), nd.get("raison", "?")[:80])
# --- Guardian report ---
guardian = result.get("guardian_report", {})
if guardian:
bio_corr = guardian.get("bio_corrections", [])
codes_moved = guardian.get("codes_moved_to_nd", [])
text_repl = guardian.get("text_replacements", 0)
score_f = guardian.get("score_factuel", "?")
metrics["guardian_bio_corrections"] = len(bio_corr)
metrics["guardian_codes_moved"] = len(codes_moved)
metrics["guardian_text_replacements"] = int(text_repl) if text_repl else 0
metrics["guardian_score_factuel"] = score_f
logger.info(" --- GUARDIAN REPORT ---")
logger.info(" Score factuel : %s/10", score_f)
logger.info(" Bio corrections : %d", len(bio_corr))
for c in bio_corr:
logger.info(" %s : LLM=%s → réel=%s",
c.get("test", "?"), c.get("valeur_llm", c.get("llm_value", "?")),
c.get("valeur_reelle", c.get("real_value", "?")))
if codes_moved:
logger.info(" Codes déplacés vers non-défendables : %s",
", ".join(codes_moved))
if text_repl:
logger.info(" Remplacements texte : %s", text_repl)
else:
metrics["guardian_bio_corrections"] = 0
metrics["guardian_codes_moved"] = 0
metrics["guardian_text_replacements"] = 0
metrics["guardian_score_factuel"] = "N/A"
else:
# Ancien format (fallback)
preuves = result.get("preuves_dossier", [])
refs = result.get("references", [])
conclusion = result.get("conclusion", "")
metrics["moyens_count"] = 0
metrics["preuves_count"] = len(preuves) if isinstance(preuves, list) else 0
metrics["preuves_with_ref"] = sum(1 for p in (preuves or []) if isinstance(p, dict) and p.get("ref"))
metrics["confrontation_count"] = 0
metrics["codes_nd_count"] = 0
metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0
metrics["conclusion_len"] = len(conclusion)
logger.info("-" * 40)
logger.info("FORMAT : legacy (ancien)")
logger.info("RÉSULTAT : %d chars, %.1fs, tier %s", len(text), elapsed, metrics["tier"])
else:
metrics["error"] = "LLM a retourné None"
metrics["format"] = "N/A"
logger.error("LLM n'a retourné aucun résultat !")
# Afficher la contre-argumentation complète
print("\n" + "~" * 70)
print("CONTRE-ARGUMENTATION :")
print("~" * 70)
print(text[:5000] if text else "(vide)")
if len(text) > 5000:
print(f"\n... [tronqué, {len(text)} chars au total]")
return metrics
def main():
dossiers = sys.argv[1:] if len(sys.argv) > 1 else DOSSIERS_TEST
results = []
for name in dossiers:
try:
metrics = test_dossier(name)
results.append(metrics)
except Exception as e:
logger.exception("Erreur sur %s", name)
results.append({"name": name, "error": str(e)})
# Résumé final
print("\n" + "=" * 70)
print("RÉSUMÉ — FORMAT TIM")
print("=" * 70)
print(f"{'Dossier':<20} {'Fmt':>5} {'Tier':>4} {'Temps':>6} {'Chars':>6} {'Moyens':>7} {'Bio':>4} {'ND':>3} {'Refs':>5} {'RAG':>4} {'G.Fix':>5} {'G.Mv':>4} {'G.Txt':>5} {'G.Sc':>4}")
print("-" * 105)
for r in results:
if "error" in r:
print(f"{r['name']:<20} ERREUR: {r['error']}")
else:
print(
f"{r['name']:<20} "
f"{r.get('format', '?'):>5} "
f"{r.get('tier', '?'):>4} "
f"{r['elapsed_s']:>5.1f}s "
f"{r['text_len']:>6} "
f"{r.get('moyens_count', 0):>7} "
f"{r.get('confrontation_count', 0):>4} "
f"{r.get('codes_nd_count', 0):>3} "
f"{r.get('refs_count', 0):>5} "
f"{r['rag_sources']:>4} "
f"{r.get('guardian_bio_corrections', 0):>5} "
f"{r.get('guardian_codes_moved', 0):>4} "
f"{r.get('guardian_text_replacements', 0):>5} "
f"{str(r.get('guardian_score_factuel', 'N/A')):>4}"
)
if __name__ == "__main__":
main()

View File

@@ -1,146 +0,0 @@
#!/usr/bin/env python3
"""Test live du quality_tier CPAM sur 3 dossiers existants.
Force l'embedding SentenceTransformer sur CPU pour libérer la VRAM à Ollama Cloud.
"""
import os
# Forcer CPU pour SentenceTransformer — la VRAM reste disponible pour Ollama
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import json
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.config import DossierMedical, ControleCPAM
from src.control.cpam_parser import parse_cpam_excel
from src.control.cpam_response import generate_cpam_response
STRUCTURED_DIR = Path("output/structured")
CPAM_EXCEL = Path("input/Control_cpam/SPHO-FINANC26020915121_ogc_structure.xlsx")
# 3 dossiers avec contrôles CPAM connus
DOSSIERS = [
"116_23065570",
"132_23080179",
"134_23050890",
]
# Délai entre les dossiers pour éviter les 429 sur Ollama Cloud
INTER_DOSSIER_DELAY = 5 # secondes
def load_dossier(subdir: str) -> DossierMedical | None:
fusionne = STRUCTURED_DIR / subdir / f"{subdir}_fusionne_cim10.json"
if not fusionne.exists():
# Fallback sur le premier JSON trouvé
jsons = list((STRUCTURED_DIR / subdir).glob("*_cim10.json"))
if not jsons:
return None
fusionne = jsons[0]
data = json.loads(fusionne.read_text(encoding="utf-8"))
return DossierMedical(**data)
def main():
print("=" * 70)
print("TEST QUALITY_TIER CPAM — Mode Cloud (embedding CPU)")
print("=" * 70)
print()
# Charger les contrôles CPAM
if not CPAM_EXCEL.exists():
print(f"Fichier CPAM introuvable : {CPAM_EXCEL}")
return
cpam_data = parse_cpam_excel(str(CPAM_EXCEL))
print(f"Contrôles CPAM chargés : {len(cpam_data)} OGC\n")
results_summary = []
for i, subdir in enumerate(DOSSIERS):
if i > 0:
print(f" [pause {INTER_DOSSIER_DELAY}s entre les dossiers...]\n")
time.sleep(INTER_DOSSIER_DELAY)
ogc_num = int(subdir.split("_")[0])
dossier = load_dossier(subdir)
if not dossier:
print(f" [{subdir}] Dossier introuvable")
continue
# Trouver le contrôle CPAM correspondant (dict ogc → list[ControleCPAM])
ctrls = cpam_data.get(ogc_num)
if not ctrls:
print(f" [{subdir}] Aucun contrôle OGC {ogc_num} trouvé")
continue
ctrl = ctrls[0] # Premier contrôle pour cet OGC
dp_code = dossier.diagnostic_principal.cim10_suggestion if dossier.diagnostic_principal else ""
n_das = len(dossier.diagnostics_associes)
n_bio = len(dossier.biologie_cle)
print(f"{'='*70}")
print(f"Dossier {subdir} — OGC {ogc_num}")
print(f" DP: {dp_code}, DAS: {n_das}, Bio: {n_bio}")
print(f" Titre: {ctrl.titre}")
print(f" Décision UCR: {ctrl.decision_ucr}")
if ctrl.dp_ucr:
print(f" DP UCR: {ctrl.dp_ucr}")
if ctrl.da_ucr:
print(f" DA UCR: {ctrl.da_ucr}")
print()
t0 = time.time()
try:
text, result, sources = generate_cpam_response(dossier, ctrl)
except Exception as e:
print(f" ERREUR: {e}")
results_summary.append((subdir, "ERREUR", str(e)))
continue
elapsed = time.time() - t0
print(f" Temps: {elapsed:.1f}s")
print(f" Sources RAG: {len(sources)}")
print(f" Longueur texte: {len(text)} chars")
print()
# Quality tier (enrichi par generate_cpam_response)
print(f" >>> QUALITY TIER: {ctrl.quality_tier}")
print(f" >>> REQUIRES REVIEW: {ctrl.requires_review}")
if ctrl.quality_warnings:
print(f" >>> WARNINGS ({len(ctrl.quality_warnings)}):")
for w in ctrl.quality_warnings:
print(f" {w}")
else:
print(f" >>> 0 warnings")
# Score adversarial si disponible
if result:
score_match = [
w for w in ctrl.quality_warnings
if "Score adversarial" in w
]
if not score_match:
print(f" >>> Score adversarial: non extrait des warnings (tier A implicite)")
print()
results_summary.append((subdir, ctrl.quality_tier, ctrl.requires_review))
# Résumé final
print("\n" + "=" * 70)
print("RÉSUMÉ")
print("=" * 70)
for subdir, tier, *rest in results_summary:
if tier == "ERREUR":
print(f" {subdir}: ERREUR — {rest[0][:80]}")
else:
review = rest[0] if rest else "?"
print(f" {subdir}: Tier {tier} | requires_review={review}")
if __name__ == "__main__":
main()