Files
t2a-finetune/scripts/05_parse_cocoa.py
dom 06100df236 feat: rééquilibrage dataset LoRA — raisonnement DIM vs mémorisation
Passe de 95/3/2 (lookups/raisonnement/règles) à ~31/49/20.
Dataset cible ~16K exemples denses (vs 66K de lookups avant).

Modifiés :
- 03_convert_cache.py : cache complet 1840 entrées (actuel + backup)
- 04_build_dataset.py : subsampling agressif (CIM-10 1.5K, CCAM 1.5K,
  CoCoA 2K) + sélection intelligente priorisant le raisonnement
- 12_generate_pipeline_examples.py : 3 templates (court + long + CPAM),
  cache actuel, cible ~2800 exemples

Créés :
- 13_generate_fascicule_reasoning.py : parsing 10 fascicules ATIH,
  génération Q&A raisonnement via Claude Opus 4.6 (~450 exemples)
- 14_generate_negative_examples.py : 1000 exemples négatifs
  (symptômes/DP, redondances sémantiques, DAS non significatifs)
- 15_generate_discrimination.py : 800 exercices de discrimination
  entre codes siblings CIM-10 via Claude Opus 4.6
- 16_parse_guide_metho.py : extraction Guide Méthodologique MCO 2026,
  Q&A directes + raisonnement via Claude Opus 4.6 (~500 exemples)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:42:33 +01:00

672 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Phase 1E — Parsing du CoCoA 2025 (1113 pages) pour extraction d'exemples ChatML.
Le CoCoA (Codage Complet Annoté) est le vademecum des médecins DIM.
Il contient des entrées détaillées par code CIM-10 avec :
- Indicateurs P/R/A (Diagnostic Principal / Relié / Associé)
- Niveaux de sévérité (2, 3, 4)
- Descriptions cliniques détaillées
- Synonymes
- Comprend / À l'exclusion de
- Notes AGORA (FAQ ATIH)
- Annotations CoCoA (conseils pratiques DIM)
Pages traitées : 85-1080 (entrées détaillées, chapitres 1-22)
Produit : data/processed/cocoa_chatml.jsonl
"""
import json
import re
import random
from pathlib import Path
random.seed(42)
BASE = Path(__file__).resolve().parent.parent
RAW = BASE / "data" / "raw"
OUT = BASE / "data" / "processed"
OUT.mkdir(parents=True, exist_ok=True)
SYSTEM_MSG = "Tu es un médecin DIM expert en codage CIM-10 pour le PMSI français. Tu t'appuies sur le CoCoA (Codage Complet Annoté) pour tes décisions de codage."
# Pages des entrées détaillées (0-indexed)
PAGE_START = 84 # page 85
PAGE_END = 1080 # page 1080
# Regex patterns
RE_CIM10_CODE = re.compile(
r'^([A-Z]\d{2}(?:\.\d{1,2})?)\s*([†*]?)\s+(.*)'
)
RE_CATEGORY_CODE = re.compile(
r'^([A-Z]\d{2})\s+(.*)'
)
RE_SUBCODE = re.compile(
r'^([A-Z]\d{2}\.\d{1,2})\s*([†*]?)\s*(.*)'
)
RE_PRA_LINE = re.compile(r'^P\s*R\s*A')
RE_SEVERITY = re.compile(r'^(\d)\s*$')
RE_CHAPTER_HEADER = re.compile(r'^CHAPITRE\s+([IVX]+)\s*:?\s*(.*)')
RE_SECTION_HEADER = re.compile(r'^([A-Z][a-zéèêëàâîïôùûüç].+)\s*\(([A-Z]\d{2}[-][A-Z]\d{2})\)')
RE_EXCLUSION = re.compile(r"^À l['\u2019]exclusion de\s+(.*)", re.IGNORECASE)
RE_COMPREND = re.compile(r'^Comprend\s+(.*)', re.IGNORECASE)
RE_AGORA = re.compile(r'\(AGORA\s*[-]\s*#?\s*(\d+).*?\)')
RE_FOOTER = re.compile(r'^2025\s*[-]')
RE_NOTE_BRACKET = re.compile(r'^\[voir en début')
def extract_text_from_pdf():
"""Extraire le texte de toutes les pages détaillées du CoCoA."""
import pdfplumber
pdf_path = RAW / "cocoa_2025.pdf"
print(f"Ouverture de {pdf_path}...")
pages_text = []
with pdfplumber.open(pdf_path) as pdf:
total = min(PAGE_END, len(pdf.pages))
for i in range(PAGE_START, total):
page = pdf.pages[i]
text = page.extract_text() or ""
pages_text.append((i + 1, text)) # (page_number, text)
if (i - PAGE_START) % 100 == 0:
print(f" Extraction page {i+1}/{total}...")
print(f" {len(pages_text)} pages extraites")
return pages_text
def parse_entries(pages_text):
"""Parser les entrées CIM-10 depuis le texte extrait."""
entries = {} # code -> dict
current_chapter = ""
current_section = ""
current_code = None
current_entry = None
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
for page_num, page_text in pages_text:
lines = page_text.split('\n')
for line_idx, line in enumerate(lines):
line = line.strip()
# Skip empty lines and footers
if not line:
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
if RE_FOOTER.match(line):
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
if RE_NOTE_BRACKET.match(line):
collecting_description = False
continue
# Chapter header
m = RE_CHAPTER_HEADER.match(line)
if m:
current_chapter = m.group(2).strip()
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
# Skip P R A indicator lines (standalone)
if RE_PRA_LINE.match(line):
# Check if there's a code on the same line
rest = re.sub(r'^P\s*R\s*A\s*', '', line).strip()
# Also remove "AN T" or similar special markers
rest = re.sub(r'^AN\s*T?\s*', '', rest).strip()
if rest:
# P R A followed by code on same line (category code)
m_cat = RE_CATEGORY_CODE.match(rest)
m_sub = RE_SUBCODE.match(rest)
if m_sub:
code = m_sub.group(1)
dagger_star = m_sub.group(2)
desc = m_sub.group(3).strip()
_save_entry(entries, current_code, current_entry)
current_code = code
current_entry = _new_entry(code, desc, dagger_star, current_chapter, page_num, is_category=False)
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
elif m_cat:
code = m_cat.group(1)
desc = m_cat.group(2).strip()
_save_entry(entries, current_code, current_entry)
current_code = code
current_entry = _new_entry(code, desc, "", current_chapter, page_num, is_category=True)
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
# Severity number on its own line
m = RE_SEVERITY.match(line)
if m and current_entry:
current_entry["severity"] = int(m.group(1))
continue
# Sub-code entry
m = RE_SUBCODE.match(line)
if m:
code = m.group(1)
dagger_star = m.group(2)
desc = m.group(3).strip()
_save_entry(entries, current_code, current_entry)
current_code = code
current_entry = _new_entry(code, desc, dagger_star, current_chapter, page_num, is_category=False)
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
# Category code (3-char code at start of line, no dot)
m = RE_CATEGORY_CODE.match(line)
if m and not line[0].islower() and len(m.group(1)) == 3:
# Make sure it's actually a code and not part of text
potential_code = m.group(1)
if re.match(r'^[A-Z]\d{2}$', potential_code):
desc = m.group(2).strip()
# Avoid false positives - check that desc looks like a title
if desc and len(desc) > 3 and not desc[0].isdigit():
_save_entry(entries, current_code, current_entry)
current_code = potential_code
current_entry = _new_entry(potential_code, desc, "", current_chapter, page_num, is_category=True)
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
# Section header (e.g., "Autres maladies bactériennes (A30-A49)")
m = RE_SECTION_HEADER.match(line)
if m:
current_section = m.group(1).strip()
collecting_exclusion = False
collecting_comprend = False
collecting_description = False
continue
# Comprend
m = RE_COMPREND.match(line)
if m:
if current_entry:
current_entry["comprend"].append(m.group(1).strip())
collecting_comprend = True
collecting_exclusion = False
collecting_description = False
continue
# À l'exclusion de
m = RE_EXCLUSION.match(line)
if m:
if current_entry:
current_entry["exclusions"].append(m.group(1).strip())
collecting_exclusion = True
collecting_comprend = False
collecting_description = False
continue
# AGORA reference
agora_matches = RE_AGORA.findall(line)
if agora_matches and current_entry:
for ref in agora_matches:
current_entry["agora_refs"].append(ref)
# Also add the full line as a CoCoA annotation
if "AGORA" in line or "Aunis" in line.lower() or "CoCoA" in line:
current_entry["cocoa_notes"].append(line)
continue
# CoCoA/Aunis annotations (highlighted text)
if current_entry and ("Aunis" in line or "CoCoA" in line):
current_entry["cocoa_notes"].append(line)
continue
# Continuation lines for exclusions
if collecting_exclusion and current_entry:
# Exclusion continuation - items with code refs, bullets, lowercase starts
if (re.search(r'\([A-Z]\d{2}', line) or
line.startswith('') or line.startswith('-') or
line[0].islower() or
re.match(r'^[a-zéèêëàâîïôùûüç•\-]', line)):
current_entry["exclusions"].append(line)
continue
else:
collecting_exclusion = False
# Continuation lines for comprend
if collecting_comprend and current_entry:
if not re.match(r'^[A-Z]\d', line) and not RE_PRA_LINE.match(line):
current_entry["comprend"].append(line)
continue
else:
collecting_comprend = False
# Clinical description text (paragraph after a code entry)
if current_entry and line and not RE_PRA_LINE.match(line):
# Check if it's a synonym or clinical text
if len(line) > 60 and not re.match(r'^[A-Z]\d', line):
# Long text = clinical description
current_entry["clinical_text"].append(line)
elif not re.match(r'^[A-Z]\d', line) and not line.startswith('P '):
# Short text after a code = synonym
current_entry["synonyms"].append(line)
# Save last entry
_save_entry(entries, current_code, current_entry)
return entries
def _new_entry(code, description, dagger_star, chapter, page, is_category=False):
return {
"code": code,
"description": description,
"dagger_star": dagger_star,
"chapter": chapter,
"page": page,
"is_category": is_category,
"severity": None,
"synonyms": [],
"comprend": [],
"exclusions": [],
"clinical_text": [],
"agora_refs": [],
"cocoa_notes": [],
}
def _save_entry(entries, code, entry):
if code and entry and entry["description"]:
# Clean up
entry["synonyms"] = [s.strip() for s in entry["synonyms"] if s.strip() and len(s.strip()) > 2]
entry["comprend"] = [c.strip() for c in entry["comprend"] if c.strip()]
entry["exclusions"] = [e.strip() for e in entry["exclusions"] if e.strip()]
entry["clinical_text"] = [t.strip() for t in entry["clinical_text"] if t.strip()]
entry["cocoa_notes"] = [n.strip() for n in entry["cocoa_notes"] if n.strip()]
# Deduplicate
entry["synonyms"] = list(dict.fromkeys(entry["synonyms"]))
entry["cocoa_notes"] = list(dict.fromkeys(entry["cocoa_notes"]))
# Filter out noise from synonyms and move misclassified exclusions
filtered_syns = []
re_excl_inline = re.compile(r"^À l['\u2019]exclusion de", re.IGNORECASE)
for s in entry["synonyms"]:
# Skip severity numbers, P R A markers, etc.
if RE_SEVERITY.match(s) or RE_PRA_LINE.match(s) or RE_FOOTER.match(s):
continue
if s in ("P R A", "P", "R", "A", "AN", "T"):
continue
# Move misclassified exclusions
if re_excl_inline.match(s):
excl_text = re.sub(r"^À l['\u2019]exclusion de\s*", '', s, flags=re.IGNORECASE).strip()
if excl_text:
entry["exclusions"].append(excl_text)
continue
filtered_syns.append(s)
entry["synonyms"] = filtered_syns
# Also clean comprend - move misclassified exclusions
filtered_comprend = []
for c in entry["comprend"]:
if re_excl_inline.match(c):
excl_text = re.sub(r"^À l['\u2019]exclusion de\s*", '', c, flags=re.IGNORECASE).strip()
if excl_text:
entry["exclusions"].append(excl_text)
else:
filtered_comprend.append(c)
entry["comprend"] = filtered_comprend
entries[code] = entry
def make_chatml(system, user, assistant):
return {
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
{"role": "assistant", "content": assistant},
]
}
def generate_description_pairs(entries):
"""Type 1 : Description enrichie CoCoA d'un code (vs FHIR plus basique)."""
pairs = []
for code, e in entries.items():
desc = e["description"]
if not desc or len(desc) < 3:
continue
answer_parts = [f"{code}{desc}"]
if e["chapter"]:
answer_parts.append(f"Chapitre : {e['chapter']}")
if e["synonyms"]:
syns = [s for s in e["synonyms"][:8] if len(s) > 2]
if syns:
answer_parts.append(f"Synonymes : {' ; '.join(syns)}")
if e["comprend"]:
answer_parts.append(f"Comprend : {' '.join(e['comprend'][:5])}")
if e["exclusions"]:
excls = [ex for ex in e["exclusions"][:5]]
answer_parts.append(f"À l'exclusion de : {' ; '.join(excls)}")
if e["severity"]:
answer_parts.append(f"Niveau de sévérité CMA : {e['severity']}")
if e["dagger_star"]:
marker = "étiologique (†)" if e["dagger_star"] == "" else "manifestation (*)"
answer_parts.append(f"Convention dague/astérisque : code {marker}")
answer = "\n".join(answer_parts)
if len(answer) > 2000:
answer = answer[:2000]
templates = [
f"Décris le code CIM-10 {code} selon le CoCoA.",
f"Que dit le CoCoA sur le code {code} ?",
f"Quelles sont les caractéristiques du code {code} d'après le CoCoA ?",
]
pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer))
return pairs
def generate_clinical_pairs(entries):
"""Type 2 : Descriptions cliniques détaillées → code + raisonnement."""
pairs = []
for code, e in entries.items():
if not e["clinical_text"]:
continue
clinical = " ".join(e["clinical_text"])
if len(clinical) < 50:
continue
desc = e["description"]
# Construire un raisonnement structuré
reasoning = {
"analyse_clinique": clinical[:500],
"code": code,
"description": desc,
"confidence": "high",
"justification": f"La description clinique du CoCoA correspond au code {code} ({desc})."
}
if e["exclusions"]:
reasoning["exclusions_a_verifier"] = " ; ".join(e["exclusions"][:3])
answer = json.dumps(reasoning, ensure_ascii=False)
# Créer une question à partir du texte clinique (tronqué)
clinical_short = clinical[:300]
if len(clinical) > 300:
clinical_short += "..."
question = f"Un patient présente le tableau clinique suivant :\n{clinical_short}\n\nQuel code CIM-10 correspond à cette présentation ?"
if len(question) > 1500:
continue
pairs.append(make_chatml(SYSTEM_MSG, question, answer))
return pairs
def generate_synonym_pairs(entries):
"""Type 3 : Synonyme → code CIM-10."""
pairs = []
for code, e in entries.items():
if not e["synonyms"]:
continue
desc = e["description"]
for syn in e["synonyms"]:
if len(syn) < 4 or len(syn) > 200:
continue
# Skip entries that look like noise
if syn.startswith("") or syn.startswith("[") or syn.startswith("("):
syn = syn.lstrip("•[( ").rstrip("])").strip()
if not syn or len(syn) < 4:
continue
answer = json.dumps({
"code": code,
"confidence": "high",
"justification": f"« {syn} » est un synonyme de {code} ({desc}) selon le CoCoA."
}, ensure_ascii=False)
templates = [
f"Quel est le code CIM-10 pour : {syn} ?",
f"Code CIM-10 correspondant à « {syn} » ?",
]
pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer))
return pairs
def generate_exclusion_pairs(entries):
"""Type 4 : Questions sur ce qu'un code exclut (piège de codage)."""
pairs = []
for code, e in entries.items():
if not e["exclusions"]:
continue
desc = e["description"]
excls = " ; ".join(e["exclusions"][:8])
if len(excls) < 10:
continue
answer = f"Le code {code} ({desc}) exclut :\n{excls}\n\nAttention : ces situations doivent être codées avec les codes de renvoi indiqués entre parenthèses."
if len(answer) > 1500:
answer = answer[:1500]
templates = [
f"Quelles sont les exclusions du code CIM-10 {code} ({desc}) ?",
f"Que ne faut-il PAS coder en {code} ?",
]
pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer))
return pairs
def generate_severity_pairs(entries):
"""Type 5 : Questions sur le niveau de sévérité CMA d'un code."""
pairs = []
for code, e in entries.items():
if not e["severity"]:
continue
desc = e["description"]
sev = e["severity"]
sev_text = {
2: "niveau 2 (sévérité modérée)",
3: "niveau 3 (sévérité élevée)",
4: "niveau 4 (sévérité très élevée)",
}.get(sev, f"niveau {sev}")
answer = f"Le code {code} ({desc}) a un niveau de sévérité CMA de {sev_text}.\n"
answer += f"En tant que DAS, ce code peut entraîner une majoration du niveau de sévérité du GHM."
if e["is_category"]:
answer += f"\nNote : {code} est une catégorie (code à 3 caractères). Les sous-codes peuvent avoir des niveaux différents."
pairs.append(make_chatml(
SYSTEM_MSG,
f"Quel est le niveau de sévérité CMA du code {code} ({desc}) ?",
answer
))
return pairs
def generate_cocoa_tips_pairs(entries):
"""Type 6 : Notes CoCoA et AGORA (conseils pratiques DIM)."""
pairs = []
for code, e in entries.items():
if not e["cocoa_notes"]:
continue
desc = e["description"]
notes = "\n".join(e["cocoa_notes"])
if len(notes) < 10:
continue
answer = f"Pour le code {code} ({desc}), le CoCoA indique :\n{notes}"
if len(answer) > 1500:
answer = answer[:1500]
pairs.append(make_chatml(
SYSTEM_MSG,
f"Y a-t-il des conseils pratiques du CoCoA pour le codage de {code} ({desc}) ?",
answer
))
return pairs
def generate_comprend_pairs(entries):
"""Type 7 : Ce que comprend un code (inclusions)."""
pairs = []
for code, e in entries.items():
if not e["comprend"]:
continue
desc = e["description"]
comprend = " ; ".join(e["comprend"][:5])
if len(comprend) < 10:
continue
answer = f"Le code {code} ({desc}) comprend :\n{comprend}"
templates = [
f"Que comprend le code CIM-10 {code} ?",
f"Quelles situations sont incluses dans le code {code} ({desc}) ?",
]
pairs.append(make_chatml(SYSTEM_MSG, random.choice(templates), answer))
return pairs
def main():
# Étape 1 : Extraction du texte
pages_text = extract_text_from_pdf()
# Étape 2 : Parsing des entrées
print("\nParsing des entrées CIM-10...")
entries = parse_entries(pages_text)
# Stats
n_categories = sum(1 for e in entries.values() if e["is_category"])
n_subcodes = sum(1 for e in entries.values() if not e["is_category"])
n_with_clinical = sum(1 for e in entries.values() if e["clinical_text"])
n_with_synonyms = sum(1 for e in entries.values() if e["synonyms"])
n_with_exclusions = sum(1 for e in entries.values() if e["exclusions"])
n_with_comprend = sum(1 for e in entries.values() if e["comprend"])
n_with_severity = sum(1 for e in entries.values() if e["severity"])
n_with_cocoa = sum(1 for e in entries.values() if e["cocoa_notes"])
print(f"\n Entrées parsées : {len(entries)}")
print(f" Catégories (3 car.) : {n_categories}")
print(f" Sous-codes : {n_subcodes}")
print(f" Avec texte clinique : {n_with_clinical}")
print(f" Avec synonymes : {n_with_synonyms}")
print(f" Avec exclusions : {n_with_exclusions}")
print(f" Avec comprend : {n_with_comprend}")
print(f" Avec sévérité CMA : {n_with_severity}")
print(f" Avec notes CoCoA : {n_with_cocoa}")
# Étape 3 : Génération des paires ChatML
print("\nGénération des paires ChatML...")
print(" Type 1 : Descriptions enrichies CoCoA")
desc_pairs = generate_description_pairs(entries)
print(f"{len(desc_pairs)} exemples")
print(" Type 2 : Texte clinique → code")
clinical_pairs = generate_clinical_pairs(entries)
print(f"{len(clinical_pairs)} exemples")
print(" Type 3 : Synonyme → code")
synonym_pairs = generate_synonym_pairs(entries)
print(f"{len(synonym_pairs)} exemples")
print(" Type 4 : Exclusions")
exclusion_pairs = generate_exclusion_pairs(entries)
print(f"{len(exclusion_pairs)} exemples")
print(" Type 5 : Sévérité CMA")
severity_pairs = generate_severity_pairs(entries)
print(f"{len(severity_pairs)} exemples")
print(" Type 6 : Notes CoCoA/AGORA")
cocoa_pairs = generate_cocoa_tips_pairs(entries)
print(f"{len(cocoa_pairs)} exemples")
print(" Type 7 : Comprend (inclusions)")
comprend_pairs = generate_comprend_pairs(entries)
print(f"{len(comprend_pairs)} exemples")
# Fusionner et mélanger
all_pairs = desc_pairs + clinical_pairs + synonym_pairs + exclusion_pairs + severity_pairs + cocoa_pairs + comprend_pairs
random.shuffle(all_pairs)
# Écrire le JSONL
output_path = OUT / "cocoa_chatml.jsonl"
with open(output_path, "w") as f:
for pair in all_pairs:
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
print(f"\n{'='*50}")
print(f"Total : {len(all_pairs)} exemples → {output_path}")
print(f"Taille : {output_path.stat().st_size / 1024 / 1024:.1f} Mo")
# Sauvegarder aussi les entrées parsées en JSON pour debug
debug_path = OUT / "cocoa_entries_debug.json"
with open(debug_path, "w") as f:
json.dump(entries, f, indent=2, ensure_ascii=False)
print(f"Debug : {debug_path} ({debug_path.stat().st_size / 1024 / 1024:.1f} Mo)")
# Répartition
print(f"\nRépartition :")
print(f" Descriptions CoCoA : {len(desc_pairs)}")
print(f" Texte clinique→code : {len(clinical_pairs)}")
print(f" Synonyme→code : {len(synonym_pairs)}")
print(f" Exclusions : {len(exclusion_pairs)}")
print(f" Sévérité CMA : {len(severity_pairs)}")
print(f" Notes CoCoA/AGORA : {len(cocoa_pairs)}")
print(f" Comprend (inclusions): {len(comprend_pairs)}")
if __name__ == "__main__":
main()