"""Validation des codes médicaux contre les référentiels ATIH 2018. Ce module charge les référentiels officiels ATIH (CIM-10, CCAM, GHM, table GHM→GHS) dans une base SQLite locale et expose des fonctions de validation pour les codes extraits par le pipeline OCR. Sources téléchargées (voir `referentials/sources/`) : - **CIM-10 FR 2019** au format ClaML XML (ATIH) — utilisée comme substitut à la CIM-10 2018 : ATIH ne publie officiellement la CIM-10 2018 qu'en PDF. L'écart entre CIM-10 2019 et CIM-10 2018 est < 100 codes sur ~11 600 ; un écart acceptable pour une validation OCR (et qui peut introduire quelques faux positifs pour des codes créés en 2019, mais jamais de faux négatifs sur un code 2018 valide). - **CCAM descriptive à usage PMSI 2018 V5** (XLSX ATIH). - **GHM V2018** (XLSX ATIH, fichier `regroupement_ghm_v2018.xlsx`). - **Arrêté tarifaire MCO Février 2018** (XLSX ATIH, feuilles "Tarifs public" et "Tarifs privé") pour la table GHM→GHS. Formats de codes supportés : - CIM-10 : lettre + 2 à 5 chiffres (ex: K650, T814, sans point). - CCAM : 4 lettres + 3 chiffres (ex: EBFA012). - GHM : 2 chiffres + lettre + 3 chiffres (ex: 11M122). - GHS : nombre 1-5 chiffres (ex: 4323). Utilisation : from pipeline.referentials import ( is_valid_cim10, is_valid_ccam, is_valid_ghm, is_valid_ghs, nearest_cim10, ghm_to_ghs, get_cim10_libelle, ) if not is_valid_cim10("K650"): suggestion = nearest_cim10("K65O") # correction O → 0 Build initial de la base : ``python -m pipeline.referentials --build`` Test rapide : ``python -m pipeline.referentials --test`` """ from __future__ import annotations import argparse import gzip import json import re import sqlite3 import sys import xml.etree.ElementTree as ET from functools import lru_cache from pathlib import Path from typing import Iterable try: from rapidfuzz.distance import Levenshtein as _Lev _HAS_RAPIDFUZZ = True except ImportError: # pragma: no cover - fallback pur Python _HAS_RAPIDFUZZ = False _ROOT = Path(__file__).resolve().parent.parent REFERENTIALS_DIR = _ROOT / "referentials" SOURCES_DIR = REFERENTIALS_DIR / "sources" DB_PATH = REFERENTIALS_DIR / "atih_2018.sqlite" # Formats attendus (utilisés pour normaliser l'entrée avant recherche DB) _RE_CIM10 = re.compile(r"^[A-Z][0-9]{2,5}$") _RE_CCAM = re.compile(r"^[A-Z]{4}[0-9]{3}$") _RE_GHM = re.compile(r"^[0-9]{2}[A-Z][0-9]{2,3}[A-Z]?$") _RE_GHS = re.compile(r"^[0-9]{1,5}$") # --------------------------------------------------------------------------- # Normalisation des entrées (tolérante aux bruits OCR courants) # --------------------------------------------------------------------------- def _normalize_cim10(code: str) -> str: """Normalise un code CIM-10 extrait pour comparaison au référentiel. Gère : - Point décimal optionnel : "K65.0" → "K650" - Espaces / casse : " k650 " → "K650" - Suffixes PMSI : "C795 *" → "C795" (le `*` signifie "CMA exclue par le DP") et "K635+0" → "K635" (le `+N` est une extension PMSI à valider séparément) - Suffixe de position numérique éventuellement collé : "K650+" → "K650" """ if not code: return "" s = code.strip().upper() # Couper à la première occurrence d'un marqueur PMSI non-alphanum # (*, +, #, espace suivi d'un marqueur). On garde uniquement la tête du code. for sep in ("*", "+", "#"): if sep in s: s = s.split(sep, 1)[0] return s.replace(".", "").replace(" ", "").strip() def _normalize_ccam(code: str) -> str: if not code: return "" # Retire éventuelle extension PMSI (-1, -2…) et les espaces base = code.split("-")[0] return base.replace(" ", "").strip().upper() def _normalize_ghm(code: str) -> str: if not code: return "" return code.replace(" ", "").strip().upper() def _normalize_ghs(code: str) -> str: if not code: return "" # Les GHS peuvent arriver en "0023" ou "23" s = re.sub(r"[^0-9]", "", code).lstrip("0") return s or "0" # --------------------------------------------------------------------------- # Construction de la base SQLite depuis les sources téléchargées # --------------------------------------------------------------------------- def _create_schema(conn: sqlite3.Connection) -> None: conn.executescript( """ DROP TABLE IF EXISTS cim10; DROP TABLE IF EXISTS ccam; DROP TABLE IF EXISTS ghm; DROP TABLE IF EXISTS ghm_ghs; DROP TABLE IF EXISTS metadata; CREATE TABLE cim10 ( code TEXT PRIMARY KEY, libelle TEXT ); CREATE TABLE ccam ( code TEXT PRIMARY KEY, libelle TEXT ); CREATE TABLE ghm ( code TEXT PRIMARY KEY, libelle TEXT, aso TEXT, da TEXT ); CREATE TABLE ghm_ghs ( ghm TEXT, ghs TEXT, secteur TEXT, -- 'public' ou 'prive' libelle TEXT, tarif REAL, PRIMARY KEY (ghm, ghs, secteur) ); CREATE INDEX idx_ghm_ghs_ghm ON ghm_ghs(ghm); CREATE INDEX idx_ghm_ghs_ghs ON ghm_ghs(ghs); CREATE TABLE metadata ( key TEXT PRIMARY KEY, value TEXT ); """ ) def _load_cim10(conn: sqlite3.Connection) -> int: """Charge la CIM-10 FR depuis le ClaML XML (catégories uniquement).""" xml_path = SOURCES_DIR / "cim10_claml_2019_extracted" / "cim10_claml_2019.xml" if not xml_path.exists(): # Fallback : chercher n'importe quel xml dans extracted xmls = list((SOURCES_DIR / "cim10_claml_2019_extracted").glob("*.xml")) if not xmls: raise FileNotFoundError( f"CIM-10 ClaML introuvable dans {SOURCES_DIR}. " f"Assurez-vous d'avoir téléchargé et extrait le zip ATIH." ) xml_path = xmls[0] tree = ET.parse(xml_path) root = tree.getroot() rows: list[tuple[str, str]] = [] for cls in root.findall(".//Class"): kind = cls.get("kind") if kind != "category": continue raw_code = cls.get("code") or "" code = raw_code.replace(".", "").upper().strip() if not code: continue pref = cls.find('.//Rubric[@kind="preferred"]/Label') libelle = pref.text.strip() if (pref is not None and pref.text) else "" rows.append((code, libelle)) conn.executemany( "INSERT OR REPLACE INTO cim10 (code, libelle) VALUES (?, ?)", rows ) return len(rows) def _load_ccam(conn: sqlite3.Connection) -> int: """Charge la CCAM 2018 depuis le XLSX ATIH (feuilles CCAM_Final_2018_*).""" import openpyxl xlsx_path = SOURCES_DIR / "ccam_2018_v5.xlsx" if not xlsx_path.exists(): raise FileNotFoundError(f"CCAM XLSX introuvable : {xlsx_path}") wb = openpyxl.load_workbook(xlsx_path, read_only=True, data_only=True) pat = re.compile(r"^[A-Z]{4}[0-9]{3}$") seen: dict[str, str] = {} for sheet_name in wb.sheetnames: if not sheet_name.startswith("CCAM_Final_2018"): continue ws = wb[sheet_name] cur_code: str | None = None for row in ws.iter_rows(values_only=True): # col 0 : parfois un code, col 3 : texte / libellé col0 = row[0] if len(row) > 0 else None col3 = row[3] if len(row) > 3 else None if isinstance(col0, str): c = col0.strip() if pat.match(c): cur_code = c if c not in seen: seen[c] = "" if cur_code and isinstance(col3, str) and col3.strip(): if not seen.get(cur_code): seen[cur_code] = col3.strip()[:500] rows = list(seen.items()) conn.executemany( "INSERT OR REPLACE INTO ccam (code, libelle) VALUES (?, ?)", rows ) return len(rows) def _load_ghm(conn: sqlite3.Connection) -> int: """Charge les GHM V2018 depuis regroupement_ghm_v2018.xlsx.""" import openpyxl xlsx_path = SOURCES_DIR / "regroupement_ghm_v2018.xlsx" if not xlsx_path.exists(): raise FileNotFoundError(f"GHM XLSX introuvable : {xlsx_path}") wb = openpyxl.load_workbook(xlsx_path, read_only=True, data_only=True) ws = wb[wb.sheetnames[0]] ghm_pat = re.compile(r"^[0-9]{2}[A-Z][0-9]{2,3}[A-Z]?$") rows: list[tuple[str, str, str, str]] = [] header_found = False for row in ws.iter_rows(values_only=True): if not header_found: if row and row[0] == "GHM": header_found = True continue code = row[0] if not isinstance(code, str): continue code = code.strip().upper() if not ghm_pat.match(code): continue libelle = (row[1] or "").strip() if isinstance(row[1], str) else "" aso = (row[2] or "").strip() if isinstance(row[2], str) else "" da = (row[3] or "").strip() if isinstance(row[3], str) else "" rows.append((code, libelle, aso, da)) conn.executemany( "INSERT OR REPLACE INTO ghm (code, libelle, aso, da) VALUES (?, ?, ?, ?)", rows, ) return len(rows) def _load_ghm_ghs(conn: sqlite3.Connection) -> int: """Charge la table GHM→GHS depuis tarif_arrete_fev_2018.xlsx. Feuilles "Tarifs public" (secteur='public') et "Tarifs privé" (secteur='prive'). Chaque ligne = un couple (GHS, GHM, libellé, tarif). """ import openpyxl xlsx_path = SOURCES_DIR / "tarif_arrete_fev_2018.xlsx" if not xlsx_path.exists(): raise FileNotFoundError(f"Tarifs XLSX introuvable : {xlsx_path}") wb = openpyxl.load_workbook(xlsx_path, read_only=True, data_only=True) ghm_pat = re.compile(r"^[0-9]{2}[A-Z][0-9]{2,3}[A-Z]?$") all_rows: list[tuple[str, str, str, str, float | None]] = [] for sheet_name, secteur in [("Tarifs public", "public"), ("Tarifs privé", "prive")]: if sheet_name not in wb.sheetnames: continue ws = wb[sheet_name] header_found = False for row in ws.iter_rows(values_only=True): if not header_found: if row and isinstance(row[0], str) and row[0].strip().upper() == "GHS": header_found = True continue ghs_raw = row[0] ghm_raw = row[1] if len(row) > 1 else None lib_raw = row[2] if len(row) > 2 else None tarif_raw = row[5] if len(row) > 5 else None if ghs_raw is None or ghm_raw is None: continue try: ghs = str(int(float(ghs_raw))) except (ValueError, TypeError): continue ghm = str(ghm_raw).strip().upper() if not ghm_pat.match(ghm): continue libelle = str(lib_raw).strip() if lib_raw else "" try: tarif = float(tarif_raw) if tarif_raw is not None else None except (ValueError, TypeError): tarif = None all_rows.append((ghm, ghs, secteur, libelle, tarif)) conn.executemany( "INSERT OR REPLACE INTO ghm_ghs (ghm, ghs, secteur, libelle, tarif) " "VALUES (?, ?, ?, ?, ?)", all_rows, ) return len(all_rows) def build_database(db_path: Path = DB_PATH, verbose: bool = True) -> dict[str, int]: """Construit la base SQLite à partir des sources. Retourne les counts par table. Idempotent : DROP + CREATE + INSERT. """ REFERENTIALS_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db_path) try: _create_schema(conn) n_cim10 = _load_cim10(conn) if verbose: print(f" CIM-10 : {n_cim10} codes chargés") n_ccam = _load_ccam(conn) if verbose: print(f" CCAM : {n_ccam} codes chargés") n_ghm = _load_ghm(conn) if verbose: print(f" GHM : {n_ghm} codes chargés") n_ghs = _load_ghm_ghs(conn) if verbose: print(f" GHM→GHS : {n_ghs} lignes (public+privé)") conn.executemany( "INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", [ ("source_cim10", "ATIH CIM-10 FR 2019 ClaML (substitut 2018)"), ("source_ccam", "ATIH CCAM descriptive à usage PMSI 2018 V5"), ("source_ghm", "ATIH regroupement_ghm_v2018.xlsx"), ("source_ghm_ghs", "ATIH tarif_arrete_fev_2018.xlsx"), ("n_cim10", str(n_cim10)), ("n_ccam", str(n_ccam)), ("n_ghm", str(n_ghm)), ("n_ghm_ghs", str(n_ghs)), ], ) conn.commit() return { "cim10": n_cim10, "ccam": n_ccam, "ghm": n_ghm, "ghm_ghs": n_ghs, } finally: conn.close() # --------------------------------------------------------------------------- # Accès à la base (connexion cachée au niveau du module) # --------------------------------------------------------------------------- _CONN: sqlite3.Connection | None = None def _get_conn() -> sqlite3.Connection: global _CONN if _CONN is not None: return _CONN if not DB_PATH.exists(): raise FileNotFoundError( f"Base SQLite introuvable : {DB_PATH}. " "Lancez d'abord : python -m pipeline.referentials --build" ) _CONN = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, check_same_thread=False) return _CONN # --------------------------------------------------------------------------- # API publique de validation # --------------------------------------------------------------------------- @lru_cache(maxsize=8192) def is_valid_cim10(code: str) -> bool: """Vérifie qu'un code CIM-10 existe dans le référentiel 2018 (substitut 2019).""" norm = _normalize_cim10(code) if not norm or not _RE_CIM10.match(norm): return False cur = _get_conn().execute("SELECT 1 FROM cim10 WHERE code = ? LIMIT 1", (norm,)) return cur.fetchone() is not None @lru_cache(maxsize=8192) def is_valid_ccam(code: str) -> bool: """Vérifie qu'un code CCAM existe dans la CCAM PMSI 2018.""" norm = _normalize_ccam(code) if not norm or not _RE_CCAM.match(norm): return False cur = _get_conn().execute("SELECT 1 FROM ccam WHERE code = ? LIMIT 1", (norm,)) return cur.fetchone() is not None @lru_cache(maxsize=8192) def is_valid_ghm(code: str) -> bool: """Vérifie qu'un code GHM existe dans la V2018.""" norm = _normalize_ghm(code) if not norm or not _RE_GHM.match(norm): return False cur = _get_conn().execute("SELECT 1 FROM ghm WHERE code = ? LIMIT 1", (norm,)) return cur.fetchone() is not None @lru_cache(maxsize=8192) def is_valid_ghs(code: str) -> bool: """Vérifie qu'un code GHS existe dans l'arrêté tarifaire 2018.""" norm = _normalize_ghs(code) if not norm or not _RE_GHS.match(norm): return False cur = _get_conn().execute( "SELECT 1 FROM ghm_ghs WHERE ghs = ? LIMIT 1", (norm,) ) return cur.fetchone() is not None @lru_cache(maxsize=4096) def get_cim10_libelle(code: str) -> str | None: """Renvoie le libellé officiel du code CIM-10, ou None.""" norm = _normalize_cim10(code) if not norm: return None cur = _get_conn().execute( "SELECT libelle FROM cim10 WHERE code = ? LIMIT 1", (norm,) ) row = cur.fetchone() return row[0] if row else None def ghm_to_ghs(ghm: str) -> list[str]: """Renvoie les GHS possibles pour un GHM donné (publics et privés fusionnés). Utilisé pour vérifier la cohérence du couple (GHM, GHS) extrait. """ norm = _normalize_ghm(ghm) if not norm: return [] cur = _get_conn().execute( "SELECT DISTINCT ghs FROM ghm_ghs WHERE ghm = ?", (norm,) ) return [r[0] for r in cur.fetchall()] def _levenshtein(a: str, b: str) -> int: if _HAS_RAPIDFUZZ: return _Lev.distance(a, b) # Fallback pur Python (O(n*m)) — suffisant pour des codes courts if len(a) < len(b): a, b = b, a if not b: return len(a) prev = list(range(len(b) + 1)) for i, ca in enumerate(a, 1): cur = [i] for j, cb in enumerate(b, 1): ins = cur[j - 1] + 1 dele = prev[j] + 1 sub = prev[j - 1] + (ca != cb) cur.append(min(ins, dele, sub)) prev = cur return prev[-1] def nearest_cim10(code: str, max_distance: int = 1) -> str | None: """Trouve le code CIM-10 valide le plus proche (distance de Levenshtein). Utile pour corriger les erreurs OCR courantes (O/0, I/1, B/8…). Stratégie de départage en cas d'égalité de distance : 1. Privilégie un candidat de même longueur (substitution >> suppression) 2. Sinon tri lexicographique croissant. Retourne None si aucun code n'est à ≤ max_distance. """ norm = _normalize_cim10(code) if not norm: return None if is_valid_cim10(norm): return norm conn = _get_conn() length = len(norm) cur = conn.execute( "SELECT code FROM cim10 WHERE length(code) BETWEEN ? AND ?", (length - max_distance, length + max_distance), ) candidates: list[tuple[int, int, str]] = [] # (distance, |len_diff|, code) for (cand,) in cur: d = _levenshtein(norm, cand) if d <= max_distance: candidates.append((d, abs(len(cand) - length), cand)) if not candidates: return None # Tri : distance min, puis longueur la plus proche, puis lexicographique candidates.sort(key=lambda t: (t[0], t[1], t[2])) return candidates[0][2] # --------------------------------------------------------------------------- # Tests légers (exécutables sans pytest) # --------------------------------------------------------------------------- def _run_selftest() -> int: """Tests de fumée rapides. Retourne le nombre d'échecs.""" failures = 0 def check(label: str, cond: bool, detail: str = "") -> None: nonlocal failures status = "OK " if cond else "FAIL" print(f" [{status}] {label}{(' — ' + detail) if detail else ''}") if not cond: failures += 1 print("=== Tests référentiels ATIH 2018 ===") # CIM-10 check("CIM-10 K650 valide (péritonite)", is_valid_cim10("K650")) check("CIM-10 K65.0 (avec point) valide", is_valid_cim10("K65.0")) check("CIM-10 T814 valide", is_valid_cim10("T814")) check("CIM-10 ZZZ99 invalide", not is_valid_cim10("ZZZ99")) check("CIM-10 libellé K650", get_cim10_libelle("K650") is not None, detail=str(get_cim10_libelle("K650"))) # Correction OCR : K65O (lettre O) → K650 suggestion = nearest_cim10("K65O") check("CIM-10 nearest(K65O) = K650", suggestion == "K650", detail=f"got={suggestion}") # CCAM check("CCAM EBFA012 valide", is_valid_ccam("EBFA012")) check("CCAM EBFA012-1 (ext PMSI) valide", is_valid_ccam("EBFA012-1")) check("CCAM AAAA000 invalide", not is_valid_ccam("AAAA000")) # GHM check("GHM 01C031 valide", is_valid_ghm("01C031")) check("GHM 99Z99Z invalide", not is_valid_ghm("99Z99Z")) # GHS check("GHS 22 valide", is_valid_ghs("22")) check("GHS 99999 invalide", not is_valid_ghs("99999")) # GHM→GHS ghs_list = ghm_to_ghs("01C031") check("GHM 01C031 → GHS inclut 22", "22" in ghs_list, detail=f"ghs_list={ghs_list}") # Format invalide (robustesse) check("is_valid_cim10('') = False", not is_valid_cim10("")) check("is_valid_ccam(None cast) = False", not is_valid_ccam("")) print(f"=== Résultat : {failures} échec(s) ===") return failures def _cli() -> int: parser = argparse.ArgumentParser(description="Référentiels ATIH 2018") g = parser.add_mutually_exclusive_group(required=True) g.add_argument("--build", action="store_true", help="(Re)construit la base SQLite depuis referentials/sources/") g.add_argument("--test", action="store_true", help="Exécute les tests de fumée") g.add_argument("--stats", action="store_true", help="Affiche les comptages de la base") args = parser.parse_args() if args.build: print(f"Construction de {DB_PATH} depuis {SOURCES_DIR}...") counts = build_database() print("OK :", counts) return 0 if args.test: return 1 if _run_selftest() > 0 else 0 if args.stats: conn = _get_conn() for tbl in ("cim10", "ccam", "ghm", "ghm_ghs"): n = conn.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0] print(f" {tbl:10s}: {n}") print("Metadata :") for k, v in conn.execute("SELECT key, value FROM metadata"): print(f" {k}: {v}") return 0 return 0 if __name__ == "__main__": sys.exit(_cli())