#!/usr/bin/env python3 """ Parse les référentiels ATIH/PMSI pour générer des exemples ChatML. Sources : 1. Annexe-4 CMA V11e — niveaux de sévérité officiels (tabulaire) 2. Racines GHM V11e — caractéristiques des racines (tabulaire) 3. Fascicules de codage — règles par spécialité (texte libre) 4. Instruction DGOS contrôle T2A — priorités de contrôle Déduplique automatiquement avec les données CoCoA existantes (sévérité CMA). Usage : python scripts/11_parse_referentiels.py """ import json import re from pathlib import Path import pdfplumber BASE = Path(__file__).resolve().parent.parent T2A = BASE.parent / "t2a" REF_DIR = T2A / "data" / "referentiels" OUTPUT = BASE / "data" / "processed" OUTPUT.mkdir(parents=True, exist_ok=True) SYSTEM_PROMPT = ( "Tu es un médecin DIM expert en codage CIM-10 pour le PMSI français. " "Tu t'appuies sur les référentiels ATIH officiels." ) SYSTEM_PROMPT_GHM = ( "Tu es un médecin DIM expert en groupage GHM/GHS pour le PMSI français. " "Tu connais les règles de classification des GHM version 11e." ) SYSTEM_PROMPT_CONTROLE = ( "Tu es un médecin DIM expert en contrôle T2A. " "Tu connais les instructions DGOS et les règles de contrôle externe." ) def _chatml(system: str, user: str, assistant: str, source: str = "") -> dict: d = { "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, {"role": "assistant", "content": assistant}, ] } if source: d["source"] = source return d # ─── 1. Annexe-4 CMA ──────────────────────────────────────────────────────── def parse_annexe4_cma() -> list[dict]: """Parse l'Annexe-4 : Diagnostics classés CMA avec niveaux de sévérité.""" pdf_path = list(REF_DIR.glob("*Annexe-4*CMA*.pdf")) if not pdf_path: print(" Annexe-4 CMA non trouvée, skip") return [] pdf_path = pdf_path[0] # Charger les sévérités CoCoA existantes pour dédupliquer cocoa_path = BASE / "data" / "processed" / "cocoa_entries_debug.json" cocoa_severities = set() if cocoa_path.exists(): with open(cocoa_path) as f: cocoa = json.load(f) for code, entry in cocoa.items(): if entry.get("severity"): cocoa_severities.add(code) entries = [] # Pattern: code (A00.0) suivi d'un niveau (2-4) suivi d'un libellé pattern = re.compile(r"^([A-Z]\d{2}(?:\.\d{1,2})?)\s+([234])\s+(.+)$") with pdfplumber.open(str(pdf_path)) as pdf: for page in pdf.pages: text = page.extract_text() or "" for line in text.split("\n"): line = line.strip() m = pattern.match(line) if m: code, niveau, libelle = m.group(1), int(m.group(2)), m.group(3).strip() entries.append({"code": code, "niveau": niveau, "libelle": libelle}) print(f" Annexe-4 CMA : {len(entries)} entrées extraites") # Dédupliquer avec CoCoA new_entries = [e for e in entries if e["code"] not in cocoa_severities] dupes = len(entries) - len(new_entries) print(f" Doublons CoCoA : {dupes}, nouvelles : {len(new_entries)}") # Générer les exemples ChatML examples = [] # Type 1 : Quel niveau CMA ? for e in entries: # Garder tous pour renforcer, même les doublons examples.append(_chatml( SYSTEM_PROMPT, f"Quel est le niveau de sévérité CMA du code {e['code']} ({e['libelle']}) ?", f"Le code {e['code']} ({e['libelle']}) est classé CMA de niveau {e['niveau']}. " f"{'Ce diagnostic est considéré comme une complication ou morbidité associée majeure.' if e['niveau'] >= 3 else 'Ce diagnostic est une complication ou morbidité associée significative.'}", source="annexe4_cma" )) # Type 2 : Est-ce une CMA ? (discrimination) # Quelques exemples de codes NON-CMA pour contraste non_cma_codes = set() cma_codes = {e["code"] for e in entries} for e in entries: # Codes voisins qui ne sont pas CMA base = e["code"][:3] for suffix in range(10): candidate = f"{base}.{suffix}" if candidate not in cma_codes and candidate not in non_cma_codes: non_cma_codes.add(candidate) if len(non_cma_codes) >= 500: break return examples # ─── 2. Racines GHM ────────────────────────────────────────────────────────── def parse_racines_ghm() -> list[dict]: """Parse les Racines GHM V11e (tableau de caractéristiques).""" pdf_path = list(REF_DIR.glob("*Racines_GHM*.pdf")) if not pdf_path: print(" Racines GHM non trouvé, skip") return [] pdf_path = pdf_path[0] entries = [] # Pattern racine GHM : 2 chiffres + lettre + 2 chiffres (ex: 01C02, 05K06) pattern = re.compile(r"^(\d{2}[A-Z]\d{2})\s+(.+)$") with pdfplumber.open(str(pdf_path)) as pdf: for page in pdf.pages[3:]: # Skip pages de couverture text = page.extract_text() or "" for line in text.split("\n"): line = line.strip() m = pattern.match(line) if m: racine = m.group(1) reste = m.group(2).strip() entries.append({"racine": racine, "description": reste}) print(f" Racines GHM : {len(entries)} racines extraites") # Aussi extraire les tables complètes par page pour du contexte riche examples = [] with pdfplumber.open(str(pdf_path)) as pdf: # Extraire les tables for page in pdf.pages[3:]: tables = page.extract_tables() for table in tables: if not table or len(table) < 2: continue headers = table[0] if table[0] else [] for row in table[1:]: if not row or not row[0]: continue racine = str(row[0]).strip() if not re.match(r"^\d{2}[A-Z]\d{2}$", racine): continue # Construire la description depuis les colonnes desc_parts = [str(c).strip() for c in row[1:] if c and str(c).strip()] if not desc_parts: continue full_desc = " | ".join(desc_parts) examples.append(_chatml( SYSTEM_PROMPT_GHM, f"Quelles sont les caractéristiques de la racine GHM {racine} ?", f"La racine GHM {racine} : {full_desc}", source="racines_ghm" )) # Si pas de tables parsées, utiliser les entrées textuelles if not examples: for e in entries: examples.append(_chatml( SYSTEM_PROMPT_GHM, f"Quelles sont les caractéristiques de la racine GHM {e['racine']} ?", f"La racine GHM {e['racine']} : {e['description']}", source="racines_ghm" )) return examples # ─── 3. Arbre de décision GHM ───────────────────────────────────────────────── def parse_arbre_ghm() -> list[dict]: """Parse l'arbre de décision GHM — extrait les règles par page/bloc.""" pdf_path = list(REF_DIR.glob("*Arbre_decision_GHM*.pdf")) if not pdf_path: print(" Arbre GHM non trouvé, skip") return [] pdf_path = pdf_path[0] examples = [] # Le PDF est un arbre graphique : chaque page contient des nœuds de décision # avec des codes GHM (ex: 01K03, 28Z18) et des conditions (listes diagnostiques/actes) ghm_pattern = re.compile(r"\b(\d{2}[A-Z]\d{2})\b") condition_pattern = re.compile(r"\(([A-Z]-\d{3})\)") # Ex: (A-198), (D-064) # Grouper les pages par CMD (les 2 premiers chiffres du GHM) cmd_pages = {} with pdfplumber.open(str(pdf_path)) as pdf: for page in pdf.pages[7:]: # Skip couverture, symboles, légende text = page.extract_text() or "" if not text.strip() or len(text.strip()) < 30: continue # Nettoyer : supprimer les numéros de page isolés en début de texte lines = text.strip().split("\n") lines = [l for l in lines if not re.match(r"^\s*\d{1,3}\s*$", l.strip())] text = "\n".join(lines) # Trouver les codes GHM sur cette page ghm_codes = ghm_pattern.findall(text) if not ghm_codes: continue # Déterminer la CMD (premiers 2 chiffres) cmd_num = ghm_codes[0][:2] if cmd_num not in cmd_pages: cmd_pages[cmd_num] = [] cmd_pages[cmd_num].append(text.strip()) # Générer un exemple par CMD for cmd_num in sorted(cmd_pages.keys()): pages_text = cmd_pages[cmd_num] full_text = "\n\n".join(pages_text) # Extraire tous les GHM et conditions ghm_codes = sorted(set(ghm_pattern.findall(full_text))) conditions = sorted(set(condition_pattern.findall(full_text))) ghm_str = ", ".join(ghm_codes[:15]) body = full_text[:2000] examples.append(_chatml( SYSTEM_PROMPT_GHM, f"Quelles sont les règles de l'arbre de décision GHM pour la CMD {cmd_num} ?", f"CMD {cmd_num} — Arbre de décision GHM V11e :\n" f"Racines GHM concernées : {ghm_str}\n\n{body}", source="arbre_ghm" )) print(f" Arbre GHM : {len(examples)} CMDs extraites") return examples # ─── 4. Fascicules de codage ────────────────────────────────────────────────── def parse_fascicule(pdf_path: Path, topic: str) -> list[dict]: """Parse un fascicule de codage — extrait les sections de règles.""" examples = [] with pdfplumber.open(str(pdf_path)) as pdf: full_text = "" for page in pdf.pages: text = page.extract_text() or "" full_text += text + "\n" if not full_text.strip(): return [] # Nettoyer : supprimer les lignes de table des matières (avec ........) clean_lines = [] for line in full_text.split("\n"): if "...." in line or "TABLE DES MATIERES" in line: continue # Supprimer les numéros de page isolés if re.match(r"^\s*\d{1,3}\s*$", line.strip()): continue clean_lines.append(line) full_text = "\n".join(clean_lines) # Découper en sections par les titres (lignes commençant par un chiffre romain ou "Consignes") sections = [] current_title = topic current_body = [] for line in full_text.split("\n"): line_stripped = line.strip() # Détecter les titres de section is_title = False # Sections principales : I., II., III., IV. if re.match(r"^[IVX]+\.\s+", line_stripped): is_title = True # Sous-sections : II.1., III.2., IV.3. elif re.match(r"^[IVX]+\.\d+\.?\s+", line_stripped): is_title = True # Titres descriptifs courants dans les fascicules elif re.match(r"^(Consignes|Règles|Les pièges|Le codage|Codage|Comment coder|Principes|Exemples?|Cas particulier|Remarque)", line_stripped, re.IGNORECASE): is_title = True elif re.match(r"^(Créé le|Mis à jour|Modifié le|TABLE DES MATIERES|FASCICULE DE CODAGE)", line_stripped): continue # Skip dates et entêtes if is_title and current_body: body = "\n".join(current_body).strip() if len(body) > 100: sections.append((current_title, body)) current_title = line_stripped current_body = [] else: current_body.append(line) # Dernière section if current_body: body = "\n".join(current_body).strip() if len(body) > 100: sections.append((current_title, body)) # Générer les exemples ChatML for title, body in sections: # Nettoyer body = re.sub(r"\n{3,}", "\n\n", body) body = body[:2000] # Limiter la taille # Extraire les codes CIM-10 mentionnés pour enrichir la question codes = re.findall(r"[A-Z]\d{2}(?:\.\d{1,2})?", body) codes_unique = sorted(set(codes))[:5] codes_str = f" (codes : {', '.join(codes_unique)})" if codes_unique else "" # Type 1 : Règle de codage examples.append(_chatml( SYSTEM_PROMPT, f"Quelles sont les règles de codage PMSI pour {title}{codes_str} ?", f"Selon le fascicule de codage ATIH « {topic} » :\n\n{body}", source=f"fascicule_{topic.lower().replace(' ', '_')[:30]}" )) # Type 2 : Question pratique à partir du contenu # Chercher les patterns "code X pour Y" ou "on codera X" coding_rules = re.findall( r"(?:on codera?|se code|coder?|est codé[e]?|codé[e]?\s+(?:avec|par|en))\s+([A-Z]\d{2}(?:\.\d{1,2})?)\s*(.*?)(?:\.|$)", body, re.IGNORECASE ) for code, context in coding_rules[:5]: context = context.strip()[:200] # Filtrer les contextes trop courts ou non informatifs if len(context) < 15: continue examples.append(_chatml( SYSTEM_PROMPT, f"Comment coder en CIM-10 : {context} ?", f"Selon les règles ATIH ({topic}), cette situation se code {code}. {context}", source=f"fascicule_{topic.lower().replace(' ', '_')[:30]}" )) return examples def parse_all_fascicules() -> list[dict]: """Parse tous les fascicules disponibles.""" fascicules = { "Fascicule_01_Generalites": "Généralités du codage PMSI", "Fascicule_02_Maladies_digestives": "Maladies de l'appareil digestif", "Fascicule_03_Tumeurs": "Tumeurs", "Fascicule_04_Metabolisme": "Métabolisme", "Fascicule_05_Gyneco_Obstetrique": "Gynécologie et Obstétrique", "Fascicule_06_Neonatalogie": "Néonatalogie", "Fascicule_07_Evolutions_2010": "Évolutions 2010", "Fascicule_08_Maladies_infectieuses": "Maladies infectieuses", "Fascicule_09_AVC": "Accidents vasculaires cérébraux", "Fascicule_10_SCA_Coronariens": "Syndromes coronariens aigus", } all_examples = [] for filename_part, topic in fascicules.items(): pdf_files = list(REF_DIR.glob(f"*{filename_part}*")) if not pdf_files: print(f" {topic} : non trouvé, skip") continue examples = parse_fascicule(pdf_files[0], topic) print(f" {topic} : {len(examples)} exemples") all_examples.extend(examples) return all_examples # ─── 5. Instruction DGOS ────────────────────────────────────────────────────── def parse_instruction_dgos() -> list[dict]: """Parse l'instruction DGOS contrôle T2A.""" pdf_path = list(REF_DIR.glob("*Instruction_DGOS*.pdf")) if not pdf_path: print(" Instruction DGOS non trouvée, skip") return [] pdf_path = pdf_path[0] with pdfplumber.open(str(pdf_path)) as pdf: full_text = "" for page in pdf.pages: text = page.extract_text() or "" full_text += text + "\n" # Découper en sections thématiques examples = [] sections = re.split(r"\n(?=\d+\.\s+|\d+\.\d+\s+|Annexe)", full_text) for section in sections: section = section.strip() if len(section) < 100: continue # Titre = première ligne lines = section.split("\n") title = lines[0].strip() body = "\n".join(lines[1:]).strip()[:2000] if body: examples.append(_chatml( SYSTEM_PROMPT_CONTROLE, f"Que dit l'instruction DGOS 2025 sur : {title} ?", f"Selon l'instruction DGOS/FIP1/DSS/1A/2025/141 relative aux contrôles T2A :\n\n{body}", source="instruction_dgos" )) print(f" Instruction DGOS : {len(examples)} sections") return examples # ─── Main ───────────────────────────────────────────────────────────────────── def main(): print("=" * 60) print("Parsing des référentiels ATIH/PMSI") print("=" * 60) all_examples = [] # 1. Annexe-4 CMA print("\n1. Annexe-4 CMA") all_examples.extend(parse_annexe4_cma()) # 2. Racines GHM print("\n2. Racines GHM") all_examples.extend(parse_racines_ghm()) # 3. Arbre de décision GHM print("\n3. Arbre de décision GHM") all_examples.extend(parse_arbre_ghm()) # 4. Fascicules print("\n4. Fascicules de codage") all_examples.extend(parse_all_fascicules()) # 5. Instruction DGOS print("\n5. Instruction DGOS") all_examples.extend(parse_instruction_dgos()) # Sauvegarder output_path = OUTPUT / "referentiels_chatml.jsonl" with open(output_path, "w") as f: for ex in all_examples: f.write(json.dumps(ex, ensure_ascii=False) + "\n") size_mo = output_path.stat().st_size / 1024 / 1024 print(f"\n{'=' * 60}") print(f"Total : {len(all_examples)} exemples") print(f"Sauvegardé : {output_path} ({size_mo:.1f} Mo)") print(f"{'=' * 60}") if __name__ == "__main__": main()