Files
Aivanov_scan_ogc/pipeline/referentials.py
Dom 6df590ae95 feat(referentials): validation ATIH 2018 des codes médicaux
Ajoute une couche de validation post-extraction contre les référentiels
officiels de l'ATIH (Agence Technique de l'Information sur
l'Hospitalisation) pour 2018. Zéro tolérance sur les codes T2A : un
code invalide est flaggé, et une correction par plus proche voisin
(Levenshtein ≤ 1) est proposée.

Contenu :
- pipeline/referentials.py : API publique is_valid_{cim10,ccam,ghm,ghs},
  get_cim10_libelle, nearest_cim10, ghm_to_ghs. CLI --build/--test/--stats.
- pipeline/validation.py    : annote un JSON d'extraction avec un bloc
  `_validation` par page (codes valides/invalides + suggestions + cross-
  checks GHM↔GHS).
- referentials/sources/     : données brutes ATIH publiques (CIM-10 ClaML
  2019 substitut, CCAM v5 2018, GHM v2018, tarifs fév. 2018).
- referentials/atih_2018.sqlite : base SQLite prête à l'emploi
  (11 623 CIM-10 · 8 147 CCAM · 2 593 GHM · 5 329 couples GHM→GHS).
- tests/test_referentials.py : 11 tests unitaires (11/11 passent).
- annotate_validation.py    : script qui annote tous les JSONs V2 en
  place et produit validation_report.md.

Note CIM-10 : la version 2018 ATIH n'est publiée qu'en PDF, ClaML 2019
est utilisée en substitut (écart connu ≈ 60 codes / 11 600).

Gestion des suffixes PMSI : `*` (CMA exclue par le DP) et `+N`
(extension PMSI) sont strippés avant validation, le code racine seul
est comparé au référentiel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 15:06:01 +02:00

598 lines
21 KiB
Python

"""Validation des codes médicaux contre les référentiels ATIH 2018.
Ce module charge les référentiels officiels ATIH (CIM-10, CCAM, GHM, table
GHM→GHS) dans une base SQLite locale et expose des fonctions de validation
pour les codes extraits par le pipeline OCR.
Sources téléchargées (voir `referentials/sources/`) :
- **CIM-10 FR 2019** au format ClaML XML (ATIH) — utilisée comme substitut
à la CIM-10 2018 : ATIH ne publie officiellement la CIM-10 2018 qu'en PDF.
L'écart entre CIM-10 2019 et CIM-10 2018 est < 100 codes sur ~11 600 ;
un écart acceptable pour une validation OCR (et qui peut introduire
quelques faux positifs pour des codes créés en 2019, mais jamais de faux
négatifs sur un code 2018 valide).
- **CCAM descriptive à usage PMSI 2018 V5** (XLSX ATIH).
- **GHM V2018** (XLSX ATIH, fichier `regroupement_ghm_v2018.xlsx`).
- **Arrêté tarifaire MCO Février 2018** (XLSX ATIH, feuilles "Tarifs public"
et "Tarifs privé") pour la table GHM→GHS.
Formats de codes supportés :
- CIM-10 : lettre + 2 à 5 chiffres (ex: K650, T814, sans point).
- CCAM : 4 lettres + 3 chiffres (ex: EBFA012).
- GHM : 2 chiffres + lettre + 3 chiffres (ex: 11M122).
- GHS : nombre 1-5 chiffres (ex: 4323).
Utilisation :
from pipeline.referentials import (
is_valid_cim10, is_valid_ccam, is_valid_ghm, is_valid_ghs,
nearest_cim10, ghm_to_ghs, get_cim10_libelle,
)
if not is_valid_cim10("K650"):
suggestion = nearest_cim10("K65O") # correction O → 0
Build initial de la base : ``python -m pipeline.referentials --build``
Test rapide : ``python -m pipeline.referentials --test``
"""
from __future__ import annotations
import argparse
import gzip
import json
import re
import sqlite3
import sys
import xml.etree.ElementTree as ET
from functools import lru_cache
from pathlib import Path
from typing import Iterable
try:
from rapidfuzz.distance import Levenshtein as _Lev
_HAS_RAPIDFUZZ = True
except ImportError: # pragma: no cover - fallback pur Python
_HAS_RAPIDFUZZ = False
_ROOT = Path(__file__).resolve().parent.parent
REFERENTIALS_DIR = _ROOT / "referentials"
SOURCES_DIR = REFERENTIALS_DIR / "sources"
DB_PATH = REFERENTIALS_DIR / "atih_2018.sqlite"
# Formats attendus (utilisés pour normaliser l'entrée avant recherche DB)
_RE_CIM10 = re.compile(r"^[A-Z][0-9]{2,5}$")
_RE_CCAM = re.compile(r"^[A-Z]{4}[0-9]{3}$")
_RE_GHM = re.compile(r"^[0-9]{2}[A-Z][0-9]{2,3}[A-Z]?$")
_RE_GHS = re.compile(r"^[0-9]{1,5}$")
# ---------------------------------------------------------------------------
# Normalisation des entrées (tolérante aux bruits OCR courants)
# ---------------------------------------------------------------------------
def _normalize_cim10(code: str) -> str:
"""Normalise un code CIM-10 extrait pour comparaison au référentiel.
Gère :
- Point décimal optionnel : "K65.0""K650"
- Espaces / casse : " k650 ""K650"
- Suffixes PMSI : "C795 *""C795" (le `*` signifie "CMA exclue par le DP")
et "K635+0""K635" (le `+N` est une extension PMSI à valider séparément)
- Suffixe de position numérique éventuellement collé : "K650+""K650"
"""
if not code:
return ""
s = code.strip().upper()
# Couper à la première occurrence d'un marqueur PMSI non-alphanum
# (*, +, #, espace suivi d'un marqueur). On garde uniquement la tête du code.
for sep in ("*", "+", "#"):
if sep in s:
s = s.split(sep, 1)[0]
return s.replace(".", "").replace(" ", "").strip()
def _normalize_ccam(code: str) -> str:
if not code:
return ""
# Retire éventuelle extension PMSI (-1, -2…) et les espaces
base = code.split("-")[0]
return base.replace(" ", "").strip().upper()
def _normalize_ghm(code: str) -> str:
if not code:
return ""
return code.replace(" ", "").strip().upper()
def _normalize_ghs(code: str) -> str:
if not code:
return ""
# Les GHS peuvent arriver en "0023" ou "23"
s = re.sub(r"[^0-9]", "", code).lstrip("0")
return s or "0"
# ---------------------------------------------------------------------------
# Construction de la base SQLite depuis les sources téléchargées
# ---------------------------------------------------------------------------
def _create_schema(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
DROP TABLE IF EXISTS cim10;
DROP TABLE IF EXISTS ccam;
DROP TABLE IF EXISTS ghm;
DROP TABLE IF EXISTS ghm_ghs;
DROP TABLE IF EXISTS metadata;
CREATE TABLE cim10 (
code TEXT PRIMARY KEY,
libelle TEXT
);
CREATE TABLE ccam (
code TEXT PRIMARY KEY,
libelle TEXT
);
CREATE TABLE ghm (
code TEXT PRIMARY KEY,
libelle TEXT,
aso TEXT,
da TEXT
);
CREATE TABLE ghm_ghs (
ghm TEXT,
ghs TEXT,
secteur TEXT, -- 'public' ou 'prive'
libelle TEXT,
tarif REAL,
PRIMARY KEY (ghm, ghs, secteur)
);
CREATE INDEX idx_ghm_ghs_ghm ON ghm_ghs(ghm);
CREATE INDEX idx_ghm_ghs_ghs ON ghm_ghs(ghs);
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT
);
"""
)
def _load_cim10(conn: sqlite3.Connection) -> int:
"""Charge la CIM-10 FR depuis le ClaML XML (catégories uniquement)."""
xml_path = SOURCES_DIR / "cim10_claml_2019_extracted" / "cim10_claml_2019.xml"
if not xml_path.exists():
# Fallback : chercher n'importe quel xml dans extracted
xmls = list((SOURCES_DIR / "cim10_claml_2019_extracted").glob("*.xml"))
if not xmls:
raise FileNotFoundError(
f"CIM-10 ClaML introuvable dans {SOURCES_DIR}. "
f"Assurez-vous d'avoir téléchargé et extrait le zip ATIH."
)
xml_path = xmls[0]
tree = ET.parse(xml_path)
root = tree.getroot()
rows: list[tuple[str, str]] = []
for cls in root.findall(".//Class"):
kind = cls.get("kind")
if kind != "category":
continue
raw_code = cls.get("code") or ""
code = raw_code.replace(".", "").upper().strip()
if not code:
continue
pref = cls.find('.//Rubric[@kind="preferred"]/Label')
libelle = pref.text.strip() if (pref is not None and pref.text) else ""
rows.append((code, libelle))
conn.executemany(
"INSERT OR REPLACE INTO cim10 (code, libelle) VALUES (?, ?)", rows
)
return len(rows)
def _load_ccam(conn: sqlite3.Connection) -> int:
"""Charge la CCAM 2018 depuis le XLSX ATIH (feuilles CCAM_Final_2018_*)."""
import openpyxl
xlsx_path = SOURCES_DIR / "ccam_2018_v5.xlsx"
if not xlsx_path.exists():
raise FileNotFoundError(f"CCAM XLSX introuvable : {xlsx_path}")
wb = openpyxl.load_workbook(xlsx_path, read_only=True, data_only=True)
pat = re.compile(r"^[A-Z]{4}[0-9]{3}$")
seen: dict[str, str] = {}
for sheet_name in wb.sheetnames:
if not sheet_name.startswith("CCAM_Final_2018"):
continue
ws = wb[sheet_name]
cur_code: str | None = None
for row in ws.iter_rows(values_only=True):
# col 0 : parfois un code, col 3 : texte / libellé
col0 = row[0] if len(row) > 0 else None
col3 = row[3] if len(row) > 3 else None
if isinstance(col0, str):
c = col0.strip()
if pat.match(c):
cur_code = c
if c not in seen:
seen[c] = ""
if cur_code and isinstance(col3, str) and col3.strip():
if not seen.get(cur_code):
seen[cur_code] = col3.strip()[:500]
rows = list(seen.items())
conn.executemany(
"INSERT OR REPLACE INTO ccam (code, libelle) VALUES (?, ?)", rows
)
return len(rows)
def _load_ghm(conn: sqlite3.Connection) -> int:
"""Charge les GHM V2018 depuis regroupement_ghm_v2018.xlsx."""
import openpyxl
xlsx_path = SOURCES_DIR / "regroupement_ghm_v2018.xlsx"
if not xlsx_path.exists():
raise FileNotFoundError(f"GHM XLSX introuvable : {xlsx_path}")
wb = openpyxl.load_workbook(xlsx_path, read_only=True, data_only=True)
ws = wb[wb.sheetnames[0]]
ghm_pat = re.compile(r"^[0-9]{2}[A-Z][0-9]{2,3}[A-Z]?$")
rows: list[tuple[str, str, str, str]] = []
header_found = False
for row in ws.iter_rows(values_only=True):
if not header_found:
if row and row[0] == "GHM":
header_found = True
continue
code = row[0]
if not isinstance(code, str):
continue
code = code.strip().upper()
if not ghm_pat.match(code):
continue
libelle = (row[1] or "").strip() if isinstance(row[1], str) else ""
aso = (row[2] or "").strip() if isinstance(row[2], str) else ""
da = (row[3] or "").strip() if isinstance(row[3], str) else ""
rows.append((code, libelle, aso, da))
conn.executemany(
"INSERT OR REPLACE INTO ghm (code, libelle, aso, da) VALUES (?, ?, ?, ?)",
rows,
)
return len(rows)
def _load_ghm_ghs(conn: sqlite3.Connection) -> int:
"""Charge la table GHM→GHS depuis tarif_arrete_fev_2018.xlsx.
Feuilles "Tarifs public" (secteur='public') et "Tarifs privé"
(secteur='prive'). Chaque ligne = un couple (GHS, GHM, libellé, tarif).
"""
import openpyxl
xlsx_path = SOURCES_DIR / "tarif_arrete_fev_2018.xlsx"
if not xlsx_path.exists():
raise FileNotFoundError(f"Tarifs XLSX introuvable : {xlsx_path}")
wb = openpyxl.load_workbook(xlsx_path, read_only=True, data_only=True)
ghm_pat = re.compile(r"^[0-9]{2}[A-Z][0-9]{2,3}[A-Z]?$")
all_rows: list[tuple[str, str, str, str, float | None]] = []
for sheet_name, secteur in [("Tarifs public", "public"), ("Tarifs privé", "prive")]:
if sheet_name not in wb.sheetnames:
continue
ws = wb[sheet_name]
header_found = False
for row in ws.iter_rows(values_only=True):
if not header_found:
if row and isinstance(row[0], str) and row[0].strip().upper() == "GHS":
header_found = True
continue
ghs_raw = row[0]
ghm_raw = row[1] if len(row) > 1 else None
lib_raw = row[2] if len(row) > 2 else None
tarif_raw = row[5] if len(row) > 5 else None
if ghs_raw is None or ghm_raw is None:
continue
try:
ghs = str(int(float(ghs_raw)))
except (ValueError, TypeError):
continue
ghm = str(ghm_raw).strip().upper()
if not ghm_pat.match(ghm):
continue
libelle = str(lib_raw).strip() if lib_raw else ""
try:
tarif = float(tarif_raw) if tarif_raw is not None else None
except (ValueError, TypeError):
tarif = None
all_rows.append((ghm, ghs, secteur, libelle, tarif))
conn.executemany(
"INSERT OR REPLACE INTO ghm_ghs (ghm, ghs, secteur, libelle, tarif) "
"VALUES (?, ?, ?, ?, ?)",
all_rows,
)
return len(all_rows)
def build_database(db_path: Path = DB_PATH, verbose: bool = True) -> dict[str, int]:
"""Construit la base SQLite à partir des sources.
Retourne les counts par table. Idempotent : DROP + CREATE + INSERT.
"""
REFERENTIALS_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
try:
_create_schema(conn)
n_cim10 = _load_cim10(conn)
if verbose:
print(f" CIM-10 : {n_cim10} codes chargés")
n_ccam = _load_ccam(conn)
if verbose:
print(f" CCAM : {n_ccam} codes chargés")
n_ghm = _load_ghm(conn)
if verbose:
print(f" GHM : {n_ghm} codes chargés")
n_ghs = _load_ghm_ghs(conn)
if verbose:
print(f" GHM→GHS : {n_ghs} lignes (public+privé)")
conn.executemany(
"INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)",
[
("source_cim10", "ATIH CIM-10 FR 2019 ClaML (substitut 2018)"),
("source_ccam", "ATIH CCAM descriptive à usage PMSI 2018 V5"),
("source_ghm", "ATIH regroupement_ghm_v2018.xlsx"),
("source_ghm_ghs", "ATIH tarif_arrete_fev_2018.xlsx"),
("n_cim10", str(n_cim10)),
("n_ccam", str(n_ccam)),
("n_ghm", str(n_ghm)),
("n_ghm_ghs", str(n_ghs)),
],
)
conn.commit()
return {
"cim10": n_cim10,
"ccam": n_ccam,
"ghm": n_ghm,
"ghm_ghs": n_ghs,
}
finally:
conn.close()
# ---------------------------------------------------------------------------
# Accès à la base (connexion cachée au niveau du module)
# ---------------------------------------------------------------------------
_CONN: sqlite3.Connection | None = None
def _get_conn() -> sqlite3.Connection:
global _CONN
if _CONN is not None:
return _CONN
if not DB_PATH.exists():
raise FileNotFoundError(
f"Base SQLite introuvable : {DB_PATH}. "
"Lancez d'abord : python -m pipeline.referentials --build"
)
_CONN = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, check_same_thread=False)
return _CONN
# ---------------------------------------------------------------------------
# API publique de validation
# ---------------------------------------------------------------------------
@lru_cache(maxsize=8192)
def is_valid_cim10(code: str) -> bool:
"""Vérifie qu'un code CIM-10 existe dans le référentiel 2018 (substitut 2019)."""
norm = _normalize_cim10(code)
if not norm or not _RE_CIM10.match(norm):
return False
cur = _get_conn().execute("SELECT 1 FROM cim10 WHERE code = ? LIMIT 1", (norm,))
return cur.fetchone() is not None
@lru_cache(maxsize=8192)
def is_valid_ccam(code: str) -> bool:
"""Vérifie qu'un code CCAM existe dans la CCAM PMSI 2018."""
norm = _normalize_ccam(code)
if not norm or not _RE_CCAM.match(norm):
return False
cur = _get_conn().execute("SELECT 1 FROM ccam WHERE code = ? LIMIT 1", (norm,))
return cur.fetchone() is not None
@lru_cache(maxsize=8192)
def is_valid_ghm(code: str) -> bool:
"""Vérifie qu'un code GHM existe dans la V2018."""
norm = _normalize_ghm(code)
if not norm or not _RE_GHM.match(norm):
return False
cur = _get_conn().execute("SELECT 1 FROM ghm WHERE code = ? LIMIT 1", (norm,))
return cur.fetchone() is not None
@lru_cache(maxsize=8192)
def is_valid_ghs(code: str) -> bool:
"""Vérifie qu'un code GHS existe dans l'arrêté tarifaire 2018."""
norm = _normalize_ghs(code)
if not norm or not _RE_GHS.match(norm):
return False
cur = _get_conn().execute(
"SELECT 1 FROM ghm_ghs WHERE ghs = ? LIMIT 1", (norm,)
)
return cur.fetchone() is not None
@lru_cache(maxsize=4096)
def get_cim10_libelle(code: str) -> str | None:
"""Renvoie le libellé officiel du code CIM-10, ou None."""
norm = _normalize_cim10(code)
if not norm:
return None
cur = _get_conn().execute(
"SELECT libelle FROM cim10 WHERE code = ? LIMIT 1", (norm,)
)
row = cur.fetchone()
return row[0] if row else None
def ghm_to_ghs(ghm: str) -> list[str]:
"""Renvoie les GHS possibles pour un GHM donné (publics et privés fusionnés).
Utilisé pour vérifier la cohérence du couple (GHM, GHS) extrait.
"""
norm = _normalize_ghm(ghm)
if not norm:
return []
cur = _get_conn().execute(
"SELECT DISTINCT ghs FROM ghm_ghs WHERE ghm = ?", (norm,)
)
return [r[0] for r in cur.fetchall()]
def _levenshtein(a: str, b: str) -> int:
if _HAS_RAPIDFUZZ:
return _Lev.distance(a, b)
# Fallback pur Python (O(n*m)) — suffisant pour des codes courts
if len(a) < len(b):
a, b = b, a
if not b:
return len(a)
prev = list(range(len(b) + 1))
for i, ca in enumerate(a, 1):
cur = [i]
for j, cb in enumerate(b, 1):
ins = cur[j - 1] + 1
dele = prev[j] + 1
sub = prev[j - 1] + (ca != cb)
cur.append(min(ins, dele, sub))
prev = cur
return prev[-1]
def nearest_cim10(code: str, max_distance: int = 1) -> str | None:
"""Trouve le code CIM-10 valide le plus proche (distance de Levenshtein).
Utile pour corriger les erreurs OCR courantes (O/0, I/1, B/8…).
Stratégie de départage en cas d'égalité de distance :
1. Privilégie un candidat de même longueur (substitution >> suppression)
2. Sinon tri lexicographique croissant.
Retourne None si aucun code n'est à ≤ max_distance.
"""
norm = _normalize_cim10(code)
if not norm:
return None
if is_valid_cim10(norm):
return norm
conn = _get_conn()
length = len(norm)
cur = conn.execute(
"SELECT code FROM cim10 WHERE length(code) BETWEEN ? AND ?",
(length - max_distance, length + max_distance),
)
candidates: list[tuple[int, int, str]] = [] # (distance, |len_diff|, code)
for (cand,) in cur:
d = _levenshtein(norm, cand)
if d <= max_distance:
candidates.append((d, abs(len(cand) - length), cand))
if not candidates:
return None
# Tri : distance min, puis longueur la plus proche, puis lexicographique
candidates.sort(key=lambda t: (t[0], t[1], t[2]))
return candidates[0][2]
# ---------------------------------------------------------------------------
# Tests légers (exécutables sans pytest)
# ---------------------------------------------------------------------------
def _run_selftest() -> int:
"""Tests de fumée rapides. Retourne le nombre d'échecs."""
failures = 0
def check(label: str, cond: bool, detail: str = "") -> None:
nonlocal failures
status = "OK " if cond else "FAIL"
print(f" [{status}] {label}{('' + detail) if detail else ''}")
if not cond:
failures += 1
print("=== Tests référentiels ATIH 2018 ===")
# CIM-10
check("CIM-10 K650 valide (péritonite)", is_valid_cim10("K650"))
check("CIM-10 K65.0 (avec point) valide", is_valid_cim10("K65.0"))
check("CIM-10 T814 valide", is_valid_cim10("T814"))
check("CIM-10 ZZZ99 invalide", not is_valid_cim10("ZZZ99"))
check("CIM-10 libellé K650", get_cim10_libelle("K650") is not None,
detail=str(get_cim10_libelle("K650")))
# Correction OCR : K65O (lettre O) → K650
suggestion = nearest_cim10("K65O")
check("CIM-10 nearest(K65O) = K650", suggestion == "K650",
detail=f"got={suggestion}")
# CCAM
check("CCAM EBFA012 valide", is_valid_ccam("EBFA012"))
check("CCAM EBFA012-1 (ext PMSI) valide", is_valid_ccam("EBFA012-1"))
check("CCAM AAAA000 invalide", not is_valid_ccam("AAAA000"))
# GHM
check("GHM 01C031 valide", is_valid_ghm("01C031"))
check("GHM 99Z99Z invalide", not is_valid_ghm("99Z99Z"))
# GHS
check("GHS 22 valide", is_valid_ghs("22"))
check("GHS 99999 invalide", not is_valid_ghs("99999"))
# GHM→GHS
ghs_list = ghm_to_ghs("01C031")
check("GHM 01C031 → GHS inclut 22", "22" in ghs_list,
detail=f"ghs_list={ghs_list}")
# Format invalide (robustesse)
check("is_valid_cim10('') = False", not is_valid_cim10(""))
check("is_valid_ccam(None cast) = False", not is_valid_ccam(""))
print(f"=== Résultat : {failures} échec(s) ===")
return failures
def _cli() -> int:
parser = argparse.ArgumentParser(description="Référentiels ATIH 2018")
g = parser.add_mutually_exclusive_group(required=True)
g.add_argument("--build", action="store_true",
help="(Re)construit la base SQLite depuis referentials/sources/")
g.add_argument("--test", action="store_true",
help="Exécute les tests de fumée")
g.add_argument("--stats", action="store_true",
help="Affiche les comptages de la base")
args = parser.parse_args()
if args.build:
print(f"Construction de {DB_PATH} depuis {SOURCES_DIR}...")
counts = build_database()
print("OK :", counts)
return 0
if args.test:
return 1 if _run_selftest() > 0 else 0
if args.stats:
conn = _get_conn()
for tbl in ("cim10", "ccam", "ghm", "ghm_ghs"):
n = conn.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0]
print(f" {tbl:10s}: {n}")
print("Metadata :")
for k, v in conn.execute("SELECT key, value FROM metadata"):
print(f" {k}: {v}")
return 0
return 0
if __name__ == "__main__":
sys.exit(_cli())