Files
t2a/src/control/cpam_parser.py
dom aa501789fd feat: scoring DP déterministe + parser CPAM nouveau format + sections CRH
- Nouveau module dp_scoring.py : shortlist, scoring multi-critères, select_dp,
  LLM one-shot fallback avec garde-fous (négation, comorbidité, Z/R-codes)
- Parser CPAM : auto-détection format legacy/ucr_extract, 6 nouveaux champs
  ControleCPAM (codes_etablissement, libelle, codes_retenus, ghm_ghs)
- CRH parser : 3 nouvelles sections (diag_sortie, diag_principal, synthese)
- Prompt DP_LLM_ONESHOT externalisé dans templates.py
- Propagation dp_selection dans fusion.py
- 808 tests passent (dont 21 nouveaux CPAM + 77 dp_scoring + 8 CRH)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 22:28:59 +01:00

234 lines
8.3 KiB
Python

"""Parsing du fichier Excel de contrôle CPAM (UCR) et matching OGC.
Supporte deux formats :
- **Ancien** (ogc_structure) : colonnes N° OGC, Titre, Arg_UCR, Décision_UCR, DP_UCR, DA_UCR, DR_UCR, Actes_UCR
- **Nouveau** (ucr_extract) : colonnes N° OGC, Type désaccord, Codes Établissement, Codes Contrôleurs,
Décision UCR, Codes retenus, GHM / GHS, Texte décision, etc.
Le format est auto-détecté à partir des en-têtes de la première ligne.
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
import openpyxl
from ..config import ControleCPAM
logger = logging.getLogger(__name__)
# Colonnes obligatoires par format
_LEGACY_REQUIRED = ("N° OGC", "Titre", "Arg_UCR", "Décision_UCR")
_NEW_REQUIRED = ("N° OGC", "Type désaccord", "Décision UCR", "Texte décision")
def parse_cpam_excel(path: str | Path) -> dict[int, list[ControleCPAM]]:
"""Lit le fichier Excel de contrôle CPAM et retourne un dict OGC -> liste de contrôles.
Auto-détecte le format (ancien ogc_structure vs nouveau ucr_extract).
"""
path = Path(path)
if not path.exists():
logger.error("Fichier CPAM introuvable : %s", path)
return {}
wb = openpyxl.load_workbook(path, read_only=True)
ws = wb[wb.sheetnames[0]]
rows = ws.iter_rows(values_only=True)
header = next(rows, None)
if header is None:
logger.error("Fichier CPAM vide : %s", path)
wb.close()
return {}
col_map = {}
for i, col_name in enumerate(header):
if col_name:
col_map[str(col_name).strip()] = i
# Auto-détection du format
is_new = all(c in col_map for c in _NEW_REQUIRED)
is_legacy = all(c in col_map for c in _LEGACY_REQUIRED)
if is_new:
logger.info("CPAM : format ucr_extract détecté")
result = _parse_new_format(rows, col_map)
elif is_legacy:
logger.info("CPAM : format ogc_structure (ancien) détecté")
result = _parse_legacy_format(rows, col_map)
else:
missing_new = [c for c in _NEW_REQUIRED if c not in col_map]
missing_leg = [c for c in _LEGACY_REQUIRED if c not in col_map]
logger.error(
"Format CPAM non reconnu. Colonnes trouvées : %s. "
"Manquantes (nouveau) : %s, (ancien) : %s",
list(col_map.keys()), missing_new, missing_leg,
)
wb.close()
return {}
wb.close()
total = sum(len(v) for v in result.values())
logger.info("CPAM : %d contrôles chargés pour %d OGC distincts", total, len(result))
return result
def _parse_legacy_format(rows, col_map: dict[str, int]) -> dict[int, list[ControleCPAM]]:
"""Parse l'ancien format ogc_structure."""
result: dict[int, list[ControleCPAM]] = {}
for row in rows:
ogc_val = row[col_map["N° OGC"]]
if ogc_val is None:
continue
try:
numero_ogc = int(ogc_val)
except (ValueError, TypeError):
logger.warning("N° OGC invalide ignoré : %s", ogc_val)
continue
controle = ControleCPAM(
numero_ogc=numero_ogc,
titre=str(row[col_map.get("Titre", 1)] or "").strip(),
arg_ucr=str(row[col_map.get("Arg_UCR", 2)] or "").strip(),
decision_ucr=str(row[col_map.get("Décision_UCR", 3)] or "").strip(),
dp_ucr=_clean_optional(row, col_map.get("DP_UCR")),
da_ucr=_clean_optional(row, col_map.get("DA_UCR")),
dr_ucr=_clean_optional(row, col_map.get("DR_UCR")),
actes_ucr=_clean_optional(row, col_map.get("Actes_UCR")),
)
result.setdefault(numero_ogc, []).append(controle)
return result
def _parse_new_format(rows, col_map: dict[str, int]) -> dict[int, list[ControleCPAM]]:
"""Parse le nouveau format ucr_extract.
Mapping colonnes :
N° OGC → numero_ogc
Type désaccord → titre (ex: "Désaccord sur le DP")
Texte décision → arg_ucr
Décision UCR → decision_ucr (Favorable / Défavorable)
Codes Contrôleurs → dp_ucr / da_ucr selon Type désaccord
Codes Établissement → codes_etablissement
Libellé Établissement → libelle_etablissement
Libellé Contrôleurs → libelle_controleurs
Codes retenus → codes_retenus
GHM / GHS → ghm_ghs
"""
result: dict[int, list[ControleCPAM]] = {}
idx_ogc = col_map["N° OGC"]
idx_type = col_map["Type désaccord"]
idx_decision = col_map["Décision UCR"]
idx_texte = col_map["Texte décision"]
idx_codes_etab = col_map.get("Codes Établissement")
idx_lib_etab = col_map.get("Libellé Établissement")
idx_codes_ctrl = col_map.get("Codes Contrôleurs")
idx_lib_ctrl = col_map.get("Libellé Contrôleurs")
idx_codes_ret = col_map.get("Codes retenus")
idx_ghm = col_map.get("GHM / GHS")
for row in rows:
ogc_val = row[idx_ogc]
if ogc_val is None:
continue
try:
numero_ogc = int(ogc_val)
except (ValueError, TypeError):
logger.warning("N° OGC invalide ignoré : %s", ogc_val)
continue
type_desaccord = str(row[idx_type] or "").strip()
decision = str(row[idx_decision] or "").strip()
texte_decision = str(row[idx_texte] or "").strip()
codes_ctrl = _clean_optional(row, idx_codes_ctrl)
codes_etab = _clean_optional(row, idx_codes_etab)
# Construire le titre lisible
if type_desaccord == "DP":
titre = "Désaccord sur le DP"
elif type_desaccord == "DAS":
titre = "Désaccord sur les DAS"
elif type_desaccord == "DP+DAS":
titre = "Désaccord sur le DP et les DAS"
else:
titre = f"Désaccord : {type_desaccord}" if type_desaccord else ""
# Mapper la décision vers le format attendu par cpam_response
if decision.lower().startswith("favorable"):
decision_ucr = "UCR retient"
elif decision.lower().startswith("défavorable") or decision.lower().startswith("defavorable"):
decision_ucr = "UCR confirme avis médecins contrôleurs"
else:
decision_ucr = decision
# Distribuer les codes selon le type de désaccord
dp_ucr = None
da_ucr = None
if type_desaccord == "DP":
dp_ucr = codes_ctrl
elif type_desaccord == "DAS":
da_ucr = codes_ctrl
elif type_desaccord == "DP+DAS":
# Les codes contrôleurs peuvent mélanger DP et DAS.
# Convention : le premier code est le DP, le reste DAS.
if codes_ctrl:
parts = [c.strip() for c in codes_ctrl.split(",") if c.strip()]
dp_ucr = parts[0] if parts else None
da_ucr = ",".join(parts[1:]) if len(parts) > 1 else None
controle = ControleCPAM(
numero_ogc=numero_ogc,
titre=titre,
arg_ucr=texte_decision,
decision_ucr=decision_ucr,
dp_ucr=dp_ucr,
da_ucr=da_ucr,
codes_etablissement=codes_etab,
libelle_etablissement=_clean_optional(row, idx_lib_etab),
codes_controleurs=codes_ctrl,
libelle_controleurs=_clean_optional(row, idx_lib_ctrl),
codes_retenus=_clean_optional(row, idx_codes_ret),
ghm_ghs=_clean_optional(row, idx_ghm),
)
result.setdefault(numero_ogc, []).append(controle)
return result
def _clean_optional(row: tuple, idx: int | None) -> str | None:
"""Extrait une valeur optionnelle depuis une ligne Excel."""
if idx is None or idx >= len(row):
return None
val = row[idx]
if val is None:
return None
val = str(val).strip()
return val if val else None
def match_dossier_ogc(source_name: str, cpam_data: dict[int, list[ControleCPAM]]) -> list[ControleCPAM]:
"""Cherche les contrôles CPAM correspondant à un dossier par préfixe OGC.
Le nom du dossier suit le format "17_23100690" où 17 est le N° OGC.
Args:
source_name: Nom du sous-dossier (ex: "17_23100690").
cpam_data: Dict OGC -> contrôles retourné par parse_cpam_excel().
Returns:
Liste des contrôles CPAM pour cet OGC, ou liste vide.
"""
match = re.match(r"^(\d+)_", source_name)
if not match:
return []
ogc = int(match.group(1))
return cpam_data.get(ogc, [])