#!/usr/bin/env python3 """ Phase 1E — Parsing du CoCoA 2025 (1113 pages) pour extraction d'exemples ChatML. Le CoCoA (Codage Complet Annoté) est le vademecum des médecins DIM. Il contient des entrées détaillées par code CIM-10 avec : - Indicateurs P/R/A (Diagnostic Principal / Relié / Associé) - Niveaux de sévérité (2, 3, 4) - Descriptions cliniques détaillées - Synonymes - Comprend / À l'exclusion de - Notes AGORA (FAQ ATIH) - Annotations CoCoA (conseils pratiques DIM) Pages traitées : 85-1080 (entrées détaillées, chapitres 1-22) Produit : data/processed/cocoa_chatml.jsonl """ import json import re import random from pathlib import Path random.seed(42) BASE = Path(__file__).resolve().parent.parent RAW = BASE / "data" / "raw" OUT = BASE / "data" / "processed" OUT.mkdir(parents=True, exist_ok=True) SYSTEM_MSG = "Tu es un médecin DIM expert en codage CIM-10 pour le PMSI français. Tu t'appuies sur le CoCoA (Codage Complet Annoté) pour tes décisions de codage." # Pages des entrées détaillées (0-indexed) PAGE_START = 84 # page 85 PAGE_END = 1080 # page 1080 # Regex patterns RE_CIM10_CODE = re.compile( r'^([A-Z]\d{2}(?:\.\d{1,2})?)\s*([†*]?)\s+(.*)' ) RE_CATEGORY_CODE = re.compile( r'^([A-Z]\d{2})\s+(.*)' ) RE_SUBCODE = re.compile( r'^([A-Z]\d{2}\.\d{1,2})\s*([†*]?)\s*(.*)' ) RE_PRA_LINE = re.compile(r'^P\s*R\s*A') RE_SEVERITY = re.compile(r'^(\d)\s*$') RE_CHAPTER_HEADER = re.compile(r'^CHAPITRE\s+([IVX]+)\s*:?\s*(.*)') RE_SECTION_HEADER = re.compile(r'^([A-Z][a-zéèêëàâîïôùûüç].+)\s*\(([A-Z]\d{2}[-–][A-Z]\d{2})\)') RE_EXCLUSION = re.compile(r"^À l['\u2019]exclusion de\s+(.*)", re.IGNORECASE) RE_COMPREND = re.compile(r'^Comprend\s+(.*)', re.IGNORECASE) RE_AGORA = re.compile(r'\(AGORA\s*[-–]\s*#?\s*(\d+).*?\)') RE_FOOTER = re.compile(r'^2025\s*[-–]') RE_NOTE_BRACKET = re.compile(r'^\[voir en début') def extract_text_from_pdf(): """Extraire le texte de toutes les pages détaillées du CoCoA.""" import pdfplumber pdf_path = RAW / "cocoa_2025.pdf" print(f"Ouverture de {pdf_path}...") pages_text = [] with pdfplumber.open(pdf_path) as pdf: total = min(PAGE_END, len(pdf.pages)) for i in range(PAGE_START, total): page = pdf.pages[i] text = page.extract_text() or "" pages_text.append((i + 1, text)) # (page_number, text) if (i - PAGE_START) % 100 == 0: print(f" Extraction page {i+1}/{total}...") print(f" {len(pages_text)} pages extraites") return pages_text def parse_entries(pages_text): """Parser les entrées CIM-10 depuis le texte extrait.""" entries = {} # code -> dict current_chapter = "" current_section = "" current_code = None current_entry = None collecting_exclusion = False collecting_comprend = False collecting_description = False for page_num, page_text in pages_text: lines = page_text.split('\n') for line_idx, line in enumerate(lines): line = line.strip() # Skip empty lines and footers if not line: collecting_exclusion = False collecting_comprend = False collecting_description = False continue if RE_FOOTER.match(line): collecting_exclusion = False collecting_comprend = False collecting_description = False continue if RE_NOTE_BRACKET.match(line): collecting_description = False continue # Chapter header m = RE_CHAPTER_HEADER.match(line) if m: current_chapter = m.group(2).strip() collecting_exclusion = False collecting_comprend = False collecting_description = False continue # Skip P R A indicator lines (standalone) if RE_PRA_LINE.match(line): # Check if there's a code on the same line rest = re.sub(r'^P\s*R\s*A\s*', '', line).strip() # Also remove "AN T" or similar special markers rest = re.sub(r'^AN\s*T?\s*', '', rest).strip() if rest: # P R A followed by code on same line (category code) m_cat = RE_CATEGORY_CODE.match(rest) m_sub = RE_SUBCODE.match(rest) if m_sub: code = m_sub.group(1) dagger_star = m_sub.group(2) desc = m_sub.group(3).strip() _save_entry(entries, current_code, current_entry) current_code = code current_entry = _new_entry(code, desc, dagger_star, current_chapter, page_num, is_category=False) collecting_exclusion = False collecting_comprend = False collecting_description = False elif m_cat: code = m_cat.group(1) desc = m_cat.group(2).strip() _save_entry(entries, current_code, current_entry) current_code = code current_entry = _new_entry(code, desc, "", current_chapter, page_num, is_category=True) collecting_exclusion = False collecting_comprend = False collecting_description = False continue # Severity number on its own line m = RE_SEVERITY.match(line) if m and current_entry: current_entry["severity"] = int(m.group(1)) continue # Sub-code entry m = RE_SUBCODE.match(line) if m: code = m.group(1) dagger_star = m.group(2) desc = m.group(3).strip() _save_entry(entries, current_code, current_entry) current_code = code current_entry = _new_entry(code, desc, dagger_star, current_chapter, page_num, is_category=False) collecting_exclusion = False collecting_comprend = False collecting_description = False continue # Category code (3-char code at start of line, no dot) m = RE_CATEGORY_CODE.match(line) if m and not line[0].islower() and len(m.group(1)) == 3: # Make sure it's actually a code and not part of text potential_code = m.group(1) if re.match(r'^[A-Z]\d{2}$', potential_code): desc = m.group(2).strip() # Avoid false positives - check that desc looks like a title if desc and len(desc) > 3 and not desc[0].isdigit(): _save_entry(entries, current_code, current_entry) current_code = potential_code current_entry = _new_entry(potential_code, desc, "", current_chapter, page_num, is_category=True) collecting_exclusion = False collecting_comprend = False collecting_description = False continue # Section header (e.g., "Autres maladies bactériennes (A30-A49)") m = RE_SECTION_HEADER.match(line) if m: current_section = m.group(1).strip() collecting_exclusion = False collecting_comprend = False collecting_description = False continue # Comprend m = RE_COMPREND.match(line) if m: if current_entry: current_entry["comprend"].append(m.group(1).strip()) collecting_comprend = True collecting_exclusion = False collecting_description = False continue # À l'exclusion de m = RE_EXCLUSION.match(line) if m: if current_entry: current_entry["exclusions"].append(m.group(1).strip()) collecting_exclusion = True collecting_comprend = False collecting_description = False continue # AGORA reference agora_matches = RE_AGORA.findall(line) if agora_matches and current_entry: for ref in agora_matches: current_entry["agora_refs"].append(ref) # Also add the full line as a CoCoA annotation if "AGORA" in line or "Aunis" in line.lower() or "CoCoA" in line: current_entry["cocoa_notes"].append(line) continue # CoCoA/Aunis annotations (highlighted text) if current_entry and ("Aunis" in line or "CoCoA" in line): current_entry["cocoa_notes"].append(line) continue # Continuation lines for exclusions if collecting_exclusion and current_entry: # Exclusion continuation - items with code refs, bullets, lowercase starts if (re.search(r'\([A-Z]\d{2}', line) or line.startswith('•') or line.startswith('-') or line[0].islower() or re.match(r'^[a-zéèêëàâîïôùûüç•\-]', line)): current_entry["exclusions"].append(line) continue else: collecting_exclusion = False # Continuation lines for comprend if collecting_comprend and current_entry: if not re.match(r'^[A-Z]\d', line) and not RE_PRA_LINE.match(line): current_entry["comprend"].append(line) continue else: collecting_comprend = False # Clinical description text (paragraph after a code entry) if current_entry and line and not RE_PRA_LINE.match(line): # Check if it's a synonym or clinical text if len(line) > 60 and not re.match(r'^[A-Z]\d', line): # Long text = clinical description current_entry["clinical_text"].append(line) elif not re.match(r'^[A-Z]\d', line) and not line.startswith('P '): # Short text after a code = synonym current_entry["synonyms"].append(line) # Save last entry _save_entry(entries, current_code, current_entry) return entries def _new_entry(code, description, dagger_star, chapter, page, is_category=False): return { "code": code, "description": description, "dagger_star": dagger_star, "chapter": chapter, "page": page, "is_category": is_category, "severity": None, "synonyms": [], "comprend": [], "exclusions": [], "clinical_text": [], "agora_refs": [], "cocoa_notes": [], } def _save_entry(entries, code, entry): if code and entry and entry["description"]: # Clean up entry["synonyms"] = [s.strip() for s in entry["synonyms"] if s.strip() and len(s.strip()) > 2] entry["comprend"] = [c.strip() for c in entry["comprend"] if c.strip()] entry["exclusions"] = [e.strip() for e in entry["exclusions"] if e.strip()] entry["clinical_text"] = [t.strip() for t in entry["clinical_text"] if t.strip()] entry["cocoa_notes"] = [n.strip() for n in entry["cocoa_notes"] if n.strip()] # Deduplicate entry["synonyms"] = list(dict.fromkeys(entry["synonyms"])) entry["cocoa_notes"] = list(dict.fromkeys(entry["cocoa_notes"])) # Filter out noise from synonyms and move misclassified exclusions filtered_syns = [] re_excl_inline = re.compile(r"^À l['\u2019]exclusion de", re.IGNORECASE) for s in entry["synonyms"]: # Skip severity numbers, P R A markers, etc. if RE_SEVERITY.match(s) or RE_PRA_LINE.match(s) or RE_FOOTER.match(s): continue if s in ("P R A", "P", "R", "A", "AN", "T"): continue # Move misclassified exclusions if re_excl_inline.match(s): excl_text = re.sub(r"^À l['\u2019]exclusion de\s*", '', s, flags=re.IGNORECASE).strip() if excl_text: entry["exclusions"].append(excl_text) continue filtered_syns.append(s) entry["synonyms"] = filtered_syns # Also clean comprend - move misclassified exclusions filtered_comprend = [] for c in entry["comprend"]: if re_excl_inline.match(c): excl_text = re.sub(r"^À l['\u2019]exclusion de\s*", '', c, flags=re.IGNORECASE).strip() if excl_text: entry["exclusions"].append(excl_text) else: filtered_comprend.append(c) entry["comprend"] = filtered_comprend entries[code] = entry def make_chatml(system, user, assistant): return { "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, {"role": "assistant", "content": assistant}, ] } def generate_description_pairs(entries): """Type 1 : Description enrichie CoCoA d'un code (vs FHIR plus basique).""" pairs = [] for code, e in entries.items(): desc = e["description"] if not desc or len(desc) < 3: continue answer_parts = [f"{code} — {desc}"] if e["chapter"]: answer_parts.append(f"Chapitre : {e['chapter']}") if e["synonyms"]: syns = [s for s in e["synonyms"][:8] if len(s) > 2] if syns: answer_parts.append(f"Synonymes : {' ; '.join(syns)}") if e["comprend"]: answer_parts.append(f"Comprend : {' '.join(e['comprend'][:5])}") if e["exclusions"]: excls = [ex for ex in e["exclusions"][:5]] answer_parts.append(f"À l'exclusion de : {' ; '.join(excls)}") if e["severity"]: answer_parts.append(f"Niveau de sévérité CMA : {e['severity']}") if e["dagger_star"]: marker = "étiologique (†)" if e["dagger_star"] == "†" else "manifestation (*)" answer_parts.append(f"Convention dague/astérisque : code {marker}") answer = "\n".join(answer_parts) if len(answer) > 2000: answer = answer[:2000] templates = [ f"Décris le code CIM-10 {code} selon le CoCoA.", f"Que dit le CoCoA sur le code {code} ?", f"Quelles sont les caractéristiques du code {code} d'après le CoCoA ?", ] pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer)) return pairs def generate_clinical_pairs(entries): """Type 2 : Descriptions cliniques détaillées → code + raisonnement.""" pairs = [] for code, e in entries.items(): if not e["clinical_text"]: continue clinical = " ".join(e["clinical_text"]) if len(clinical) < 50: continue desc = e["description"] # Construire un raisonnement structuré reasoning = { "analyse_clinique": clinical[:500], "code": code, "description": desc, "confidence": "high", "justification": f"La description clinique du CoCoA correspond au code {code} ({desc})." } if e["exclusions"]: reasoning["exclusions_a_verifier"] = " ; ".join(e["exclusions"][:3]) answer = json.dumps(reasoning, ensure_ascii=False) # Créer une question à partir du texte clinique (tronqué) clinical_short = clinical[:300] if len(clinical) > 300: clinical_short += "..." question = f"Un patient présente le tableau clinique suivant :\n{clinical_short}\n\nQuel code CIM-10 correspond à cette présentation ?" if len(question) > 1500: continue pairs.append(make_chatml(SYSTEM_MSG, question, answer)) return pairs def generate_synonym_pairs(entries): """Type 3 : Synonyme → code CIM-10.""" pairs = [] for code, e in entries.items(): if not e["synonyms"]: continue desc = e["description"] for syn in e["synonyms"]: if len(syn) < 4 or len(syn) > 200: continue # Skip entries that look like noise if syn.startswith("•") or syn.startswith("[") or syn.startswith("("): syn = syn.lstrip("•[( ").rstrip("])").strip() if not syn or len(syn) < 4: continue answer = json.dumps({ "code": code, "confidence": "high", "justification": f"« {syn} » est un synonyme de {code} ({desc}) selon le CoCoA." }, ensure_ascii=False) templates = [ f"Quel est le code CIM-10 pour : {syn} ?", f"Code CIM-10 correspondant à « {syn} » ?", ] pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer)) return pairs def generate_exclusion_pairs(entries): """Type 4 : Questions sur ce qu'un code exclut (piège de codage).""" pairs = [] for code, e in entries.items(): if not e["exclusions"]: continue desc = e["description"] excls = " ; ".join(e["exclusions"][:8]) if len(excls) < 10: continue answer = f"Le code {code} ({desc}) exclut :\n{excls}\n\nAttention : ces situations doivent être codées avec les codes de renvoi indiqués entre parenthèses." if len(answer) > 1500: answer = answer[:1500] templates = [ f"Quelles sont les exclusions du code CIM-10 {code} ({desc}) ?", f"Que ne faut-il PAS coder en {code} ?", ] pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer)) return pairs def generate_severity_pairs(entries): """Type 5 : Questions sur le niveau de sévérité CMA d'un code.""" pairs = [] for code, e in entries.items(): if not e["severity"]: continue desc = e["description"] sev = e["severity"] sev_text = { 2: "niveau 2 (sévérité modérée)", 3: "niveau 3 (sévérité élevée)", 4: "niveau 4 (sévérité très élevée)", }.get(sev, f"niveau {sev}") answer = f"Le code {code} ({desc}) a un niveau de sévérité CMA de {sev_text}.\n" answer += f"En tant que DAS, ce code peut entraîner une majoration du niveau de sévérité du GHM." if e["is_category"]: answer += f"\nNote : {code} est une catégorie (code à 3 caractères). Les sous-codes peuvent avoir des niveaux différents." pairs.append(make_chatml( SYSTEM_MSG, f"Quel est le niveau de sévérité CMA du code {code} ({desc}) ?", answer )) return pairs def generate_cocoa_tips_pairs(entries): """Type 6 : Notes CoCoA et AGORA (conseils pratiques DIM).""" pairs = [] for code, e in entries.items(): if not e["cocoa_notes"]: continue desc = e["description"] notes = "\n".join(e["cocoa_notes"]) if len(notes) < 10: continue answer = f"Pour le code {code} ({desc}), le CoCoA indique :\n{notes}" if len(answer) > 1500: answer = answer[:1500] pairs.append(make_chatml( SYSTEM_MSG, f"Y a-t-il des conseils pratiques du CoCoA pour le codage de {code} ({desc}) ?", answer )) return pairs def generate_comprend_pairs(entries): """Type 7 : Ce que comprend un code (inclusions).""" pairs = [] for code, e in entries.items(): if not e["comprend"]: continue desc = e["description"] comprend = " ; ".join(e["comprend"][:5]) if len(comprend) < 10: continue answer = f"Le code {code} ({desc}) comprend :\n{comprend}" templates = [ f"Que comprend le code CIM-10 {code} ?", f"Quelles situations sont incluses dans le code {code} ({desc}) ?", ] pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer)) return pairs def main(): # Étape 1 : Extraction du texte pages_text = extract_text_from_pdf() # Étape 2 : Parsing des entrées print("\nParsing des entrées CIM-10...") entries = parse_entries(pages_text) # Stats n_categories = sum(1 for e in entries.values() if e["is_category"]) n_subcodes = sum(1 for e in entries.values() if not e["is_category"]) n_with_clinical = sum(1 for e in entries.values() if e["clinical_text"]) n_with_synonyms = sum(1 for e in entries.values() if e["synonyms"]) n_with_exclusions = sum(1 for e in entries.values() if e["exclusions"]) n_with_comprend = sum(1 for e in entries.values() if e["comprend"]) n_with_severity = sum(1 for e in entries.values() if e["severity"]) n_with_cocoa = sum(1 for e in entries.values() if e["cocoa_notes"]) print(f"\n Entrées parsées : {len(entries)}") print(f" Catégories (3 car.) : {n_categories}") print(f" Sous-codes : {n_subcodes}") print(f" Avec texte clinique : {n_with_clinical}") print(f" Avec synonymes : {n_with_synonyms}") print(f" Avec exclusions : {n_with_exclusions}") print(f" Avec comprend : {n_with_comprend}") print(f" Avec sévérité CMA : {n_with_severity}") print(f" Avec notes CoCoA : {n_with_cocoa}") # Étape 3 : Génération des paires ChatML print("\nGénération des paires ChatML...") print(" Type 1 : Descriptions enrichies CoCoA") desc_pairs = generate_description_pairs(entries) print(f" → {len(desc_pairs)} exemples") print(" Type 2 : Texte clinique → code") clinical_pairs = generate_clinical_pairs(entries) print(f" → {len(clinical_pairs)} exemples") print(" Type 3 : Synonyme → code") synonym_pairs = generate_synonym_pairs(entries) print(f" → {len(synonym_pairs)} exemples") print(" Type 4 : Exclusions") exclusion_pairs = generate_exclusion_pairs(entries) print(f" → {len(exclusion_pairs)} exemples") print(" Type 5 : Sévérité CMA") severity_pairs = generate_severity_pairs(entries) print(f" → {len(severity_pairs)} exemples") print(" Type 6 : Notes CoCoA/AGORA") cocoa_pairs = generate_cocoa_tips_pairs(entries) print(f" → {len(cocoa_pairs)} exemples") print(" Type 7 : Comprend (inclusions)") comprend_pairs = generate_comprend_pairs(entries) print(f" → {len(comprend_pairs)} exemples") # Fusionner et mélanger all_pairs = desc_pairs + clinical_pairs + synonym_pairs + exclusion_pairs + severity_pairs + cocoa_pairs + comprend_pairs random.shuffle(all_pairs) # Écrire le JSONL output_path = OUT / "cocoa_chatml.jsonl" with open(output_path, "w") as f: for pair in all_pairs: f.write(json.dumps(pair, ensure_ascii=False) + "\n") print(f"\n{'='*50}") print(f"Total : {len(all_pairs)} exemples → {output_path}") print(f"Taille : {output_path.stat().st_size / 1024 / 1024:.1f} Mo") # Sauvegarder aussi les entrées parsées en JSON pour debug debug_path = OUT / "cocoa_entries_debug.json" with open(debug_path, "w") as f: json.dump(entries, f, indent=2, ensure_ascii=False) print(f"Debug : {debug_path} ({debug_path.stat().st_size / 1024 / 1024:.1f} Mo)") # Répartition print(f"\nRépartition :") print(f" Descriptions CoCoA : {len(desc_pairs)}") print(f" Texte clinique→code : {len(clinical_pairs)}") print(f" Synonyme→code : {len(synonym_pairs)}") print(f" Exclusions : {len(exclusion_pairs)}") print(f" Sévérité CMA : {len(severity_pairs)}") print(f" Notes CoCoA/AGORA : {len(cocoa_pairs)}") print(f" Comprend (inclusions): {len(comprend_pairs)}") if __name__ == "__main__": main()