Phase 2 (CCAM) : - Nouveau src/medical/ccam_dict.py : build depuis CCAM_V81.xls via xlrd, lookup 3 niveaux, validation codes - Intégration dans l'extracteur : fallback ccam_lookup + _validate_ccam() avec alertes - CLI : --build-ccam-dict, --rebuild-index Phase 3 (FAISS) : - Chunks CCAM depuis le dictionnaire JSON (priorité sur le PDF) - Chunks CIM-10 index alphabétique (terme → code) - Priorisation cim10_alpha dans la recherche RAG Viewer : endpoint reprocess + bloc scripts Tests : 8 tests CCAM + tests raisonnement RAG (161 passed) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
192 lines
5.8 KiB
Python
192 lines
5.8 KiB
Python
"""Dictionnaire CCAM complet extrait depuis le fichier XLS officiel (CNAM).
|
|
|
|
Fournit un lookup intelligent avec normalisation Unicode pour la recherche
|
|
de codes CCAM à partir de textes d'actes médicaux en français.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from ..config import CCAM_DICT_PATH
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton : dictionnaire chargé une seule fois
|
|
_dict_cache: dict[str, dict] | None = None
|
|
# Cache des labels normalisés pour le substring matching
|
|
_normalized_cache: list[tuple[str, str, str]] | None = None
|
|
|
|
_CCAM_CODE_RE = re.compile(r"^[A-Z]{4}\d{3}$")
|
|
|
|
|
|
def normalize_text(text: str) -> str:
|
|
"""Normalise un texte : accent folding, lowercase, collapse whitespace."""
|
|
text = text.replace("\u2019", "'").replace("\u2018", "'").replace("\u02BC", "'")
|
|
nfkd = unicodedata.normalize("NFKD", text)
|
|
stripped = "".join(c for c in nfkd if unicodedata.category(c) != "Mn")
|
|
return re.sub(r"\s+", " ", stripped.lower()).strip()
|
|
|
|
|
|
def build_dict(source_path: str | Path) -> dict[str, dict]:
|
|
"""Construit le dictionnaire CCAM depuis un fichier XLS et l'écrit en JSON.
|
|
|
|
Format JSON : {code: {description, activite, tarif_s1, regroupement}}
|
|
|
|
Args:
|
|
source_path: Chemin vers le fichier XLS CCAM (ex: CCAM_V81.xls).
|
|
|
|
Returns:
|
|
Le dictionnaire code → infos.
|
|
"""
|
|
import xlrd
|
|
|
|
source_path = Path(source_path)
|
|
if not source_path.exists():
|
|
logger.error("Fichier XLS non trouvé : %s", source_path)
|
|
return {}
|
|
|
|
wb = xlrd.open_workbook(str(source_path))
|
|
sheet = wb.sheet_by_index(0)
|
|
|
|
result: dict[str, dict] = {}
|
|
|
|
for r in range(sheet.nrows):
|
|
code = str(sheet.cell_value(r, 0)).strip()
|
|
if not _CCAM_CODE_RE.match(code):
|
|
continue
|
|
|
|
description = str(sheet.cell_value(r, 2)).strip()
|
|
activite_raw = sheet.cell_value(r, 3)
|
|
activite = int(activite_raw) if isinstance(activite_raw, float) else None
|
|
|
|
tarif_raw = sheet.cell_value(r, 5)
|
|
tarif_s1 = round(tarif_raw, 2) if isinstance(tarif_raw, (int, float)) else None
|
|
|
|
regroupement = str(sheet.cell_value(r, 10)).strip() or None
|
|
|
|
result[code] = {
|
|
"description": description,
|
|
"activite": activite,
|
|
"tarif_s1": tarif_s1,
|
|
"regroupement": regroupement,
|
|
}
|
|
|
|
# Écrire le fichier JSON
|
|
CCAM_DICT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(CCAM_DICT_PATH, "w", encoding="utf-8") as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info("Dictionnaire CCAM généré : %d codes → %s", len(result), CCAM_DICT_PATH)
|
|
return result
|
|
|
|
|
|
def load_dict() -> dict[str, dict]:
|
|
"""Charge le dictionnaire CCAM (singleton lazy-loaded).
|
|
|
|
Si le fichier JSON n'existe pas, retourne un dict vide avec un warning.
|
|
"""
|
|
global _dict_cache
|
|
if _dict_cache is not None:
|
|
return _dict_cache
|
|
|
|
if CCAM_DICT_PATH.exists():
|
|
with open(CCAM_DICT_PATH, encoding="utf-8") as f:
|
|
_dict_cache = json.load(f)
|
|
else:
|
|
logger.warning("Dictionnaire CCAM absent : %s — lancez --build-ccam-dict", CCAM_DICT_PATH)
|
|
_dict_cache = {}
|
|
|
|
return _dict_cache
|
|
|
|
|
|
def _get_normalized_entries() -> list[tuple[str, str, str]]:
|
|
"""Retourne une liste de (code, description, description_normalisée) triée par longueur."""
|
|
global _normalized_cache
|
|
if _normalized_cache is not None:
|
|
return _normalized_cache
|
|
|
|
d = load_dict()
|
|
entries = []
|
|
for code, info in d.items():
|
|
desc = info.get("description", "") if isinstance(info, dict) else str(info)
|
|
norm = normalize_text(desc)
|
|
entries.append((code, desc, norm))
|
|
|
|
# Trier par longueur de description décroissante (plus spécifique d'abord)
|
|
entries.sort(key=lambda e: -len(e[2]))
|
|
_normalized_cache = entries
|
|
return _normalized_cache
|
|
|
|
|
|
def lookup(
|
|
text: str,
|
|
domain_overrides: dict[str, str] | None = None,
|
|
) -> str | None:
|
|
"""Recherche un code CCAM pour un texte donné.
|
|
|
|
Stratégie en 3 niveaux :
|
|
1. Match substring dans domain_overrides (prioritaire, ex: CCAM_MAP existant)
|
|
2. Match exact normalisé dans le dictionnaire complet
|
|
3. Match substring normalisé avec scoring par spécificité
|
|
|
|
Args:
|
|
text: Le texte de l'acte médical à rechercher.
|
|
domain_overrides: Dictionnaire terme→code prioritaire.
|
|
|
|
Returns:
|
|
Le code CCAM trouvé ou None.
|
|
"""
|
|
if not text:
|
|
return None
|
|
|
|
text_norm = normalize_text(text)
|
|
|
|
# Niveau 1 : domain overrides (substring match)
|
|
if domain_overrides:
|
|
for terme, code in domain_overrides.items():
|
|
if normalize_text(terme) in text_norm:
|
|
return code
|
|
|
|
entries = _get_normalized_entries()
|
|
|
|
# Niveau 2 : match exact normalisé
|
|
for code, _desc, norm_desc in entries:
|
|
if norm_desc == text_norm:
|
|
return code
|
|
|
|
# Niveau 3 : substring match normalisé (plus spécifique d'abord)
|
|
for code, _desc, norm_desc in entries:
|
|
if not norm_desc or len(norm_desc) < 4:
|
|
continue
|
|
if norm_desc in text_norm or text_norm in norm_desc:
|
|
return code
|
|
|
|
return None
|
|
|
|
|
|
def validate_code(code: str) -> tuple[bool, str]:
|
|
"""Vérifie si un code CCAM existe dans le dictionnaire.
|
|
|
|
Returns:
|
|
(is_valid, description) — description vide si invalide.
|
|
"""
|
|
d = load_dict()
|
|
if code in d:
|
|
info = d[code]
|
|
desc = info.get("description", "") if isinstance(info, dict) else str(info)
|
|
return True, desc
|
|
return False, ""
|
|
|
|
|
|
def reset_cache() -> None:
|
|
"""Réinitialise les caches (utile pour les tests)."""
|
|
global _dict_cache, _normalized_cache
|
|
_dict_cache = None
|
|
_normalized_cache = None
|