feat: dictionnaire CCAM complet (8 257 codes) + index FAISS enrichi + validation actes
Phase 2 (CCAM) : - Nouveau src/medical/ccam_dict.py : build depuis CCAM_V81.xls via xlrd, lookup 3 niveaux, validation codes - Intégration dans l'extracteur : fallback ccam_lookup + _validate_ccam() avec alertes - CLI : --build-ccam-dict, --rebuild-index Phase 3 (FAISS) : - Chunks CCAM depuis le dictionnaire JSON (priorité sur le PDF) - Chunks CIM-10 index alphabétique (terme → code) - Priorisation cim10_alpha dans la recherche RAG Viewer : endpoint reprocess + bloc scripts Tests : 8 tests CCAM + tests raisonnement RAG (161 passed) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
191
src/medical/ccam_dict.py
Normal file
191
src/medical/ccam_dict.py
Normal file
@@ -0,0 +1,191 @@
|
||||
"""Dictionnaire CCAM complet extrait depuis le fichier XLS officiel (CNAM).
|
||||
|
||||
Fournit un lookup intelligent avec normalisation Unicode pour la recherche
|
||||
de codes CCAM à partir de textes d'actes médicaux en français.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from ..config import CCAM_DICT_PATH
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Singleton : dictionnaire chargé une seule fois
|
||||
_dict_cache: dict[str, dict] | None = None
|
||||
# Cache des labels normalisés pour le substring matching
|
||||
_normalized_cache: list[tuple[str, str, str]] | None = None
|
||||
|
||||
_CCAM_CODE_RE = re.compile(r"^[A-Z]{4}\d{3}$")
|
||||
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
"""Normalise un texte : accent folding, lowercase, collapse whitespace."""
|
||||
text = text.replace("\u2019", "'").replace("\u2018", "'").replace("\u02BC", "'")
|
||||
nfkd = unicodedata.normalize("NFKD", text)
|
||||
stripped = "".join(c for c in nfkd if unicodedata.category(c) != "Mn")
|
||||
return re.sub(r"\s+", " ", stripped.lower()).strip()
|
||||
|
||||
|
||||
def build_dict(source_path: str | Path) -> dict[str, dict]:
|
||||
"""Construit le dictionnaire CCAM depuis un fichier XLS et l'écrit en JSON.
|
||||
|
||||
Format JSON : {code: {description, activite, tarif_s1, regroupement}}
|
||||
|
||||
Args:
|
||||
source_path: Chemin vers le fichier XLS CCAM (ex: CCAM_V81.xls).
|
||||
|
||||
Returns:
|
||||
Le dictionnaire code → infos.
|
||||
"""
|
||||
import xlrd
|
||||
|
||||
source_path = Path(source_path)
|
||||
if not source_path.exists():
|
||||
logger.error("Fichier XLS non trouvé : %s", source_path)
|
||||
return {}
|
||||
|
||||
wb = xlrd.open_workbook(str(source_path))
|
||||
sheet = wb.sheet_by_index(0)
|
||||
|
||||
result: dict[str, dict] = {}
|
||||
|
||||
for r in range(sheet.nrows):
|
||||
code = str(sheet.cell_value(r, 0)).strip()
|
||||
if not _CCAM_CODE_RE.match(code):
|
||||
continue
|
||||
|
||||
description = str(sheet.cell_value(r, 2)).strip()
|
||||
activite_raw = sheet.cell_value(r, 3)
|
||||
activite = int(activite_raw) if isinstance(activite_raw, float) else None
|
||||
|
||||
tarif_raw = sheet.cell_value(r, 5)
|
||||
tarif_s1 = round(tarif_raw, 2) if isinstance(tarif_raw, (int, float)) else None
|
||||
|
||||
regroupement = str(sheet.cell_value(r, 10)).strip() or None
|
||||
|
||||
result[code] = {
|
||||
"description": description,
|
||||
"activite": activite,
|
||||
"tarif_s1": tarif_s1,
|
||||
"regroupement": regroupement,
|
||||
}
|
||||
|
||||
# Écrire le fichier JSON
|
||||
CCAM_DICT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(CCAM_DICT_PATH, "w", encoding="utf-8") as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=2)
|
||||
|
||||
logger.info("Dictionnaire CCAM généré : %d codes → %s", len(result), CCAM_DICT_PATH)
|
||||
return result
|
||||
|
||||
|
||||
def load_dict() -> dict[str, dict]:
|
||||
"""Charge le dictionnaire CCAM (singleton lazy-loaded).
|
||||
|
||||
Si le fichier JSON n'existe pas, retourne un dict vide avec un warning.
|
||||
"""
|
||||
global _dict_cache
|
||||
if _dict_cache is not None:
|
||||
return _dict_cache
|
||||
|
||||
if CCAM_DICT_PATH.exists():
|
||||
with open(CCAM_DICT_PATH, encoding="utf-8") as f:
|
||||
_dict_cache = json.load(f)
|
||||
else:
|
||||
logger.warning("Dictionnaire CCAM absent : %s — lancez --build-ccam-dict", CCAM_DICT_PATH)
|
||||
_dict_cache = {}
|
||||
|
||||
return _dict_cache
|
||||
|
||||
|
||||
def _get_normalized_entries() -> list[tuple[str, str, str]]:
|
||||
"""Retourne une liste de (code, description, description_normalisée) triée par longueur."""
|
||||
global _normalized_cache
|
||||
if _normalized_cache is not None:
|
||||
return _normalized_cache
|
||||
|
||||
d = load_dict()
|
||||
entries = []
|
||||
for code, info in d.items():
|
||||
desc = info.get("description", "") if isinstance(info, dict) else str(info)
|
||||
norm = normalize_text(desc)
|
||||
entries.append((code, desc, norm))
|
||||
|
||||
# Trier par longueur de description décroissante (plus spécifique d'abord)
|
||||
entries.sort(key=lambda e: -len(e[2]))
|
||||
_normalized_cache = entries
|
||||
return _normalized_cache
|
||||
|
||||
|
||||
def lookup(
|
||||
text: str,
|
||||
domain_overrides: dict[str, str] | None = None,
|
||||
) -> str | None:
|
||||
"""Recherche un code CCAM pour un texte donné.
|
||||
|
||||
Stratégie en 3 niveaux :
|
||||
1. Match substring dans domain_overrides (prioritaire, ex: CCAM_MAP existant)
|
||||
2. Match exact normalisé dans le dictionnaire complet
|
||||
3. Match substring normalisé avec scoring par spécificité
|
||||
|
||||
Args:
|
||||
text: Le texte de l'acte médical à rechercher.
|
||||
domain_overrides: Dictionnaire terme→code prioritaire.
|
||||
|
||||
Returns:
|
||||
Le code CCAM trouvé ou None.
|
||||
"""
|
||||
if not text:
|
||||
return None
|
||||
|
||||
text_norm = normalize_text(text)
|
||||
|
||||
# Niveau 1 : domain overrides (substring match)
|
||||
if domain_overrides:
|
||||
for terme, code in domain_overrides.items():
|
||||
if normalize_text(terme) in text_norm:
|
||||
return code
|
||||
|
||||
entries = _get_normalized_entries()
|
||||
|
||||
# Niveau 2 : match exact normalisé
|
||||
for code, _desc, norm_desc in entries:
|
||||
if norm_desc == text_norm:
|
||||
return code
|
||||
|
||||
# Niveau 3 : substring match normalisé (plus spécifique d'abord)
|
||||
for code, _desc, norm_desc in entries:
|
||||
if not norm_desc or len(norm_desc) < 4:
|
||||
continue
|
||||
if norm_desc in text_norm or text_norm in norm_desc:
|
||||
return code
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def validate_code(code: str) -> tuple[bool, str]:
|
||||
"""Vérifie si un code CCAM existe dans le dictionnaire.
|
||||
|
||||
Returns:
|
||||
(is_valid, description) — description vide si invalide.
|
||||
"""
|
||||
d = load_dict()
|
||||
if code in d:
|
||||
info = d[code]
|
||||
desc = info.get("description", "") if isinstance(info, dict) else str(info)
|
||||
return True, desc
|
||||
return False, ""
|
||||
|
||||
|
||||
def reset_cache() -> None:
|
||||
"""Réinitialise les caches (utile pour les tests)."""
|
||||
global _dict_cache, _normalized_cache
|
||||
_dict_cache = None
|
||||
_normalized_cache = None
|
||||
Reference in New Issue
Block a user