- Cache persistant JSON thread-safe pour les résultats Ollama (invalidation par modèle) - Parallélisation des appels Ollama (ThreadPoolExecutor, 2 workers) - 6 nouvelles règles de filtrage DAS parasites (doublons, ponctuation, OCR, labo, fragments) - Client Ollama centralisé (mode JSON natif + retry) - Module GHM (estimation CMD/sévérité) - Module contrôle CPAM (parser + contre-argumentation RAG) - Export RUM (format RSS) - Viewer enrichi (détail dossier) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
116 lines
3.5 KiB
Python
116 lines
3.5 KiB
Python
"""Parsing du fichier Excel de contrôle CPAM (UCR) et matching OGC."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import openpyxl
|
|
|
|
from ..config import ControleCPAM
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Colonnes attendues dans le fichier Excel
|
|
_EXPECTED_COLUMNS = ("N° OGC", "Titre", "Arg_UCR", "Décision_UCR", "DP_UCR", "DA_UCR", "DR_UCR", "Actes_UCR")
|
|
|
|
|
|
def parse_cpam_excel(path: str | Path) -> dict[int, list[ControleCPAM]]:
|
|
"""Lit le fichier Excel de contrôle CPAM et retourne un dict OGC -> liste de contrôles.
|
|
|
|
Args:
|
|
path: Chemin vers le fichier .xlsx CPAM.
|
|
|
|
Returns:
|
|
Dict avec le numéro OGC comme clé et la liste des contrôles associés.
|
|
"""
|
|
path = Path(path)
|
|
if not path.exists():
|
|
logger.error("Fichier CPAM introuvable : %s", path)
|
|
return {}
|
|
|
|
wb = openpyxl.load_workbook(path, read_only=True)
|
|
ws = wb[wb.sheetnames[0]]
|
|
|
|
# Lire l'en-tête
|
|
rows = ws.iter_rows(values_only=True)
|
|
header = next(rows, None)
|
|
if header is None:
|
|
logger.error("Fichier CPAM vide : %s", path)
|
|
return {}
|
|
|
|
# Construire le mapping colonne -> index
|
|
col_map = {}
|
|
for i, col_name in enumerate(header):
|
|
if col_name:
|
|
col_map[col_name.strip()] = i
|
|
|
|
# Vérifier les colonnes requises
|
|
missing = [c for c in _EXPECTED_COLUMNS[:4] if c not in col_map]
|
|
if missing:
|
|
logger.error("Colonnes manquantes dans le fichier CPAM : %s", missing)
|
|
return {}
|
|
|
|
result: dict[int, list[ControleCPAM]] = {}
|
|
count = 0
|
|
|
|
for row in rows:
|
|
ogc_val = row[col_map["N° OGC"]]
|
|
if ogc_val is None:
|
|
continue
|
|
|
|
try:
|
|
numero_ogc = int(ogc_val)
|
|
except (ValueError, TypeError):
|
|
logger.warning("N° OGC invalide ignoré : %s", ogc_val)
|
|
continue
|
|
|
|
controle = ControleCPAM(
|
|
numero_ogc=numero_ogc,
|
|
titre=str(row[col_map.get("Titre", 1)] or "").strip(),
|
|
arg_ucr=str(row[col_map.get("Arg_UCR", 2)] or "").strip(),
|
|
decision_ucr=str(row[col_map.get("Décision_UCR", 3)] or "").strip(),
|
|
dp_ucr=_clean_optional(row, col_map.get("DP_UCR")),
|
|
da_ucr=_clean_optional(row, col_map.get("DA_UCR")),
|
|
dr_ucr=_clean_optional(row, col_map.get("DR_UCR")),
|
|
actes_ucr=_clean_optional(row, col_map.get("Actes_UCR")),
|
|
)
|
|
|
|
result.setdefault(numero_ogc, []).append(controle)
|
|
count += 1
|
|
|
|
logger.info("CPAM : %d contrôles chargés pour %d OGC distincts", count, len(result))
|
|
return result
|
|
|
|
|
|
def _clean_optional(row: tuple, idx: int | None) -> str | None:
|
|
"""Extrait une valeur optionnelle depuis une ligne Excel."""
|
|
if idx is None or idx >= len(row):
|
|
return None
|
|
val = row[idx]
|
|
if val is None:
|
|
return None
|
|
val = str(val).strip()
|
|
return val if val else None
|
|
|
|
|
|
def match_dossier_ogc(source_name: str, cpam_data: dict[int, list[ControleCPAM]]) -> list[ControleCPAM]:
|
|
"""Cherche les contrôles CPAM correspondant à un dossier par préfixe OGC.
|
|
|
|
Le nom du dossier suit le format "17_23100690" où 17 est le N° OGC.
|
|
|
|
Args:
|
|
source_name: Nom du sous-dossier (ex: "17_23100690").
|
|
cpam_data: Dict OGC -> contrôles retourné par parse_cpam_excel().
|
|
|
|
Returns:
|
|
Liste des contrôles CPAM pour cet OGC, ou liste vide.
|
|
"""
|
|
match = re.match(r"^(\d+)_", source_name)
|
|
if not match:
|
|
return []
|
|
|
|
ogc = int(match.group(1))
|
|
return cpam_data.get(ogc, [])
|