Files
t2a_v2/src/config.py
dom 768bb94193 feat: validation config au démarrage
- Ajoute validate_config() dans src/config.py :
  * Vérifie accessibilité Ollama (warning si injoignable)
  * Warning si ANTHROPIC_API_KEY absente (fallback cloud indispo)
  * Warning si modèles CPAM et validation identiques
  * Vérifie qu'au moins un modèle LLM est configuré
- Appel automatique au démarrage du viewer Flask
- Ne fait jamais crasher l'app (warnings uniquement)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 11:49:12 +01:00

953 lines
35 KiB
Python

"""Configuration globale et modèles de données pour le pipeline T2A."""
from __future__ import annotations
import os
import contextvars
from functools import lru_cache
from pathlib import Path
from typing import Optional, Any, Dict
import logging
import yaml
from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator
_cfg_logger = logging.getLogger(__name__)
load_dotenv()
# --- Chemins ---
BASE_DIR = Path(__file__).resolve().parent.parent
INPUT_DIR = BASE_DIR / "input"
OUTPUT_DIR = BASE_DIR / "output"
ANONYMIZED_DIR = OUTPUT_DIR / "anonymized"
STRUCTURED_DIR = OUTPUT_DIR / "structured"
REPORTS_DIR = OUTPUT_DIR / "reports"
CONFIG_DIR = BASE_DIR / "config"
REFERENCE_RANGES_PATH = CONFIG_DIR / "reference_ranges.yaml"
BIO_RULES_PATH = CONFIG_DIR / "bio_rules.yaml"
LAB_SANITY_PATH = CONFIG_DIR / "lab_value_sanity.yaml"
DEMOGRAPHIC_RULES_PATH = CONFIG_DIR / "demographic_rules.yaml"
DIAGNOSTIC_CONFLICTS_PATH = CONFIG_DIR / "diagnostic_conflicts.yaml"
PROCEDURE_DIAGNOSIS_RULES_PATH = CONFIG_DIR / "procedure_diagnosis_rules.yaml"
TEMPORAL_RULES_PATH = CONFIG_DIR / "temporal_rules.yaml"
PARCOURS_RULES_PATH = CONFIG_DIR / "parcours_rules.yaml"
COMPLETUDE_RULES_PATH = CONFIG_DIR / "completude_rules.yaml"
RULES_DIR = CONFIG_DIR / "rules"
RULES_BASE_PATH = RULES_DIR / "base.yaml"
RULES_ENABLED_PATH = RULES_DIR / "enabled.yaml"
RULES_ROUTER_PATH = RULES_DIR / "router.yaml"
for d in (INPUT_DIR, ANONYMIZED_DIR, STRUCTURED_DIR, REPORTS_DIR, CONFIG_DIR, RULES_DIR):
d.mkdir(parents=True, exist_ok=True)
# --- Configuration anonymisation ---
KEEP_ESTABLISHMENT_NAME = os.environ.get("T2A_KEEP_ESTABLISHMENT", "True").lower() in ("true", "1", "yes")
NER_MODEL = os.environ.get("T2A_NER_MODEL", "Jean-Baptiste/camembert-ner")
NER_CONFIDENCE_THRESHOLD = float(os.environ.get("T2A_NER_THRESHOLD", "0.80"))
# --- Configuration Ollama ---
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "gemma3:27b")
OLLAMA_TIMEOUT = int(os.environ.get("OLLAMA_TIMEOUT", "600"))
OLLAMA_CACHE_PATH = BASE_DIR / "data" / "ollama_cache.json"
OLLAMA_MAX_PARALLEL = int(os.environ.get("OLLAMA_MAX_PARALLEL", "2"))
# --- Modèles par rôle LLM ---
OLLAMA_MODELS: dict[str, str] = {
"coding": os.environ.get("T2A_MODEL_CODING", "gemma3:27b"),
"cpam": os.environ.get("T2A_MODEL_CPAM", "mistral-small3.2:24b"),
"validation": os.environ.get("T2A_MODEL_VALIDATION", "qwen3:32b"),
"qc": os.environ.get("T2A_MODEL_QC", "gemma3:12b"),
}
def get_model(role: str) -> str:
"""Retourne le modèle associé à un rôle LLM, ou le modèle global par défaut."""
return OLLAMA_MODELS.get(role, OLLAMA_MODEL)
# --- Flag LLM pour le sélecteur DP (NUKE-3) ---
# Nom canonique : T2A_DP_RANKER_LLM (0/1)
# Ancien nom accepté (compat) : DP_RANKER_LLM_ENABLED
DP_RANKER_LLM_ENABLED = os.environ.get("T2A_DP_RANKER_LLM", "1").lower() in ("1", "true", "yes")
def get_dp_ranker_llm_enabled() -> bool:
"""Retourne l'état du flag LLM pour NUKE-3 (lecture fraîche de l'env).
Nom canonique : T2A_DP_RANKER_LLM (0/1/true/false/yes/no).
Accepte aussi l'ancien nom DP_RANKER_LLM_ENABLED avec warning.
"""
canonical = os.environ.get("T2A_DP_RANKER_LLM")
legacy = os.environ.get("DP_RANKER_LLM_ENABLED")
if canonical is not None:
return canonical.lower() in ("1", "true", "yes")
if legacy is not None:
import logging as _logging
_logging.getLogger(__name__).warning(
"Env var DP_RANKER_LLM_ENABLED est dépréciée — utiliser T2A_DP_RANKER_LLM"
)
return legacy.lower() in ("1", "true", "yes")
# Défaut : activé
return True
def check_adversarial_model_config() -> tuple[bool, str]:
"""LOGIC-3 — Vérifie si les modèles CPAM et validation sont identiques.
Returns:
(same_model, warning_message)
"""
cpam = OLLAMA_MODELS.get("cpam", "")
validation = OLLAMA_MODELS.get("validation", "")
if cpam and validation and cpam == validation:
msg = (
f"Modèles CPAM et validation identiques ({cpam}) "
"— validation adversariale dégradée"
)
return True, msg
return False, ""
def validate_config() -> list[str]:
"""Valide la configuration au démarrage et retourne la liste des warnings émis.
Vérifie :
- OLLAMA_URL défini et accessible (warning si injoignable, pas de crash)
- ANTHROPIC_API_KEY présente (warning si absente — fallback cloud indisponible)
- Modèles CPAM et validation distincts (validation adversariale)
- Au moins un modèle LLM configuré
Ne fait jamais crasher l'application.
"""
warnings_emitted: list[str] = []
# 1. Vérifier OLLAMA_URL
if not OLLAMA_URL:
msg = "OLLAMA_URL non défini — aucun LLM local disponible"
_cfg_logger.warning(msg)
warnings_emitted.append(msg)
else:
try:
import requests as _req
resp = _req.get(f"{OLLAMA_URL}/api/tags", timeout=5)
resp.raise_for_status()
_cfg_logger.info("Ollama accessible sur %s", OLLAMA_URL)
except Exception as e:
msg = f"Ollama injoignable sur {OLLAMA_URL}{e}"
_cfg_logger.warning(msg)
warnings_emitted.append(msg)
# 2. Vérifier ANTHROPIC_API_KEY
if not os.environ.get("ANTHROPIC_API_KEY"):
msg = "ANTHROPIC_API_KEY absente — fallback cloud (Haiku) indisponible"
_cfg_logger.warning(msg)
warnings_emitted.append(msg)
# 3. Vérifier modèles CPAM vs validation (adversarial)
same, adv_msg = check_adversarial_model_config()
if same:
_cfg_logger.warning(adv_msg)
warnings_emitted.append(adv_msg)
# 4. Vérifier qu'au moins un modèle LLM est configuré
has_model = bool(OLLAMA_MODEL) or any(OLLAMA_MODELS.values())
if not has_model:
msg = "Aucun modèle LLM configuré (OLLAMA_MODEL et OLLAMA_MODELS vides)"
_cfg_logger.warning(msg)
warnings_emitted.append(msg)
return warnings_emitted
# --- Configuration RUM / établissement ---
FINESS = os.environ.get("T2A_FINESS", "000000000")
NUM_UM = os.environ.get("T2A_NUM_UM", "0000")
# --- Configuration RAG ---
RAG_INDEX_DIR = BASE_DIR / "data" / "rag_index"
REFERENTIELS_DIR = BASE_DIR / "data" / "referentiels"
UPLOAD_MAX_SIZE_MB = 50
ALLOWED_EXTENSIONS = {".pdf", ".csv", ".xlsx", ".xls", ".txt"}
_DICTS_DIR = REFERENTIELS_DIR / "dicts"
_PDFS_DIR = REFERENTIELS_DIR / "pdfs"
CIM10_DICT_PATH = _DICTS_DIR / "cim10_dict.json"
CIM10_SUPPLEMENTS_PATH = _DICTS_DIR / "cim10_supplements.json"
BIO_CONCEPTS_PATH = BASE_DIR / "data" / "bio_concepts.json"
CMA_LEVELS_PATH = BASE_DIR / "data" / "cma_levels.json"
CCAM_DICT_PATH = _DICTS_DIR / "ccam_dict.json"
CIM10_PDF = Path(os.environ.get("T2A_CIM10_PDF", str(_PDFS_DIR / "cim-10-fr_2026_a_usage_pmsi_version_provisoire_111225.pdf")))
GUIDE_METHODO_PDF = Path(os.environ.get("T2A_GUIDE_METHODO_PDF", str(_PDFS_DIR / "guide_methodo_mco_2026_version_provisoire.pdf")))
CCAM_PDF = Path(os.environ.get("T2A_CCAM_PDF", str(_PDFS_DIR / "actualisation_ccam_descriptive_a_usage_pmsi_v4_2025.pdf")))
# --- Modèle d'embedding ---
EMBEDDING_MODEL = os.environ.get("T2A_EMBEDDING_MODEL", "dangvantuan/sentence-camembert-large")
# --- Modèle de re-ranking (cross-encoder, CPU uniquement) ---
RERANKER_MODEL = os.environ.get("T2A_RERANKER_MODEL", "cross-encoder/ms-marco-MiniLM-L-6-v2")
# --- Références biologiques (fallback) ---
def _load_yaml_config(path: Path, defaults: Dict[str, Any], label: str) -> Dict[str, Any]:
"""Helper : charge un YAML config avec merge sur defaults et logging explicite.
- Si le fichier n'existe pas : retourne defaults (info log).
- Si le YAML est invalide : retourne defaults + log error.
- Sinon : merge YAML sur defaults.
"""
if not path.exists():
_cfg_logger.debug("Config %s : fichier absent (%s), defaults utilisés", label, path)
return defaults
try:
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
if not isinstance(data, dict):
_cfg_logger.error("Config %s : contenu invalide (attendu dict, reçu %s) dans %s",
label, type(data).__name__, path)
return defaults
merged = dict(defaults)
for k, v in data.items():
merged[k] = v
return merged
except yaml.YAMLError as e:
_cfg_logger.error("Config %s : erreur de syntaxe YAML dans %s%s", label, path, e)
return defaults
except Exception as e:
_cfg_logger.error("Config %s : erreur lecture %s%s", label, path, e)
return defaults
@lru_cache(maxsize=1)
def load_reference_ranges() -> Dict[str, Any]:
"""Charge les intervalles de référence biologiques depuis config/reference_ranges.yaml."""
defaults: Dict[str, Any] = {
"version": 1,
"age_bands": {"adult_min_years": 18},
"fallback_ranges": {
"adult": {
"platelets": {"low": 150, "high": 450, "unit": "G/L"},
"sodium": {"low": 135, "high": 145, "unit": "mmol/L"},
"potassium": {"low": 3.5, "high": 5.0, "unit": "mmol/L"},
},
"child": {
"platelets": {"low": 150, "high": 450, "unit": "G/L"},
"sodium": {"low": 135, "high": 145, "unit": "mmol/L"},
"potassium": {"low": 3.5, "high": 5.0, "unit": "mmol/L"},
},
},
"safe_zones_unknown_age": {
"platelets_ruled_out_low": 170,
"sodium_ruled_out_low": 138,
"potassium_ruled_out_high": 4.9,
"potassium_ruled_out_low": 3.7,
},
}
return _load_yaml_config(REFERENCE_RANGES_PATH, defaults, "reference_ranges")
# --- Règles biologiques (pilotées par YAML) ---
@lru_cache(maxsize=1)
def load_bio_rules() -> Dict[str, Any]:
"""Charge les règles biologiques depuis config/bio_rules.yaml."""
defaults: Dict[str, Any] = {
"version": 1,
"rules": {
"hyponatremia": {"enabled": True, "codes": ["E87.1"], "analyte": "sodium"},
"hyperkalemia": {"enabled": True, "codes": ["E87.5"], "analyte": "potassium"},
"hypokalemia": {"enabled": True, "codes": ["E87.6"], "analyte": "potassium"},
},
}
return _load_yaml_config(BIO_RULES_PATH, defaults, "bio_rules")
@lru_cache(maxsize=1)
def load_demographic_rules() -> Dict[str, Any]:
"""Charge les règles démographiques (sexe/âge) depuis config/demographic_rules.yaml."""
return _load_yaml_config(DEMOGRAPHIC_RULES_PATH, {
"version": 1, "sex_rules": {}, "age_rules": {},
}, "demographic_rules")
@lru_cache(maxsize=1)
def load_diagnostic_conflicts() -> Dict[str, Any]:
"""Charge les conflits diagnostics depuis config/diagnostic_conflicts.yaml."""
return _load_yaml_config(DIAGNOSTIC_CONFLICTS_PATH, {
"version": 1, "mutual_exclusions": [], "incompatibilities": [],
}, "diagnostic_conflicts")
@lru_cache(maxsize=1)
def load_procedure_diagnosis_rules() -> Dict[str, Any]:
"""Charge les règles de corrélation actes/diagnostics depuis config/procedure_diagnosis_rules.yaml."""
return _load_yaml_config(PROCEDURE_DIAGNOSIS_RULES_PATH, {
"version": 1, "rules": [],
}, "procedure_diagnosis_rules")
@lru_cache(maxsize=1)
def load_temporal_rules() -> Dict[str, Any]:
"""Charge les règles temporelles depuis config/temporal_rules.yaml."""
return _load_yaml_config(TEMPORAL_RULES_PATH, {
"version": 1, "rules": [],
}, "temporal_rules")
@lru_cache(maxsize=1)
def load_parcours_rules() -> Dict[str, Any]:
"""Charge les règles de parcours patient depuis config/parcours_rules.yaml."""
return _load_yaml_config(PARCOURS_RULES_PATH, {
"version": 1, "documentary_rules": {}, "pathway_rules": {},
}, "parcours_rules")
@lru_cache(maxsize=1)
def load_completude_rules() -> Dict[str, Any]:
"""Charge les règles de complétude documentaire depuis config/completude_rules.yaml."""
return _load_yaml_config(COMPLETUDE_RULES_PATH, {
"version": 1, "diagnostics": {}, "actes": {},
}, "completude_rules")
# --- Garde-fous de parsing des valeurs biologiques (anti-OCR) ---
@lru_cache(maxsize=1)
def load_lab_value_sanity() -> Dict[str, Any]:
"""Charge des garde-fous de parsing depuis config/lab_value_sanity.yaml.
But:
- éviter que des artefacts de lecture PDF/OCR (ex: "8" au lieu de "4.8")
déclenchent de faux diagnostics (hyperK, etc.)
- garder une trace *auditable* (valeurs suspectes / écartées)
Ce fichier est volontairement éditable (future UI).
"""
defaults: Dict[str, Any] = {
"version": 1,
"policy": {
# Si True: les valeurs hors bornes plausibles sont écartées du dossier.
# Sinon: elles sont gardées avec quality="discarded".
"drop_out_of_range": True,
# Si True: on conserve les valeurs suspectes (quality="suspect") pour audit,
# mais les règles qualité privilégient les valeurs "ok" quand elles existent.
"keep_suspect": True,
},
# Clés normalisées (minuscules, sans accents) : potassium, sodium, plaquettes...
"tests": {
"potassium": {
# Bornes très larges (mmol/L) : sert uniquement à écarter l'impossible.
"hard_min": 0.5,
"hard_max": 9.0,
# Heuristique anti-OCR : un chiffre seul >=6 est souvent une décimale perdue (4,8 -> 8)
"suspect": {"single_digit_over": 6.0},
},
"sodium": {"hard_min": 90.0, "hard_max": 200.0},
"plaquettes": {"hard_min": 5.0, "hard_max": 2000.0},
"hemoglobine": {"hard_min": 3.0, "hard_max": 25.0},
"creatinine": {"hard_min": 1.0, "hard_max": 5000.0},
"crp": {"hard_min": 0.0, "hard_max": 1000.0},
"alat": {"hard_min": 0.0, "hard_max": 5000.0},
"asat": {"hard_min": 0.0, "hard_max": 5000.0},
"ggt": {"hard_min": 0.0, "hard_max": 5000.0},
"pal": {"hard_min": 0.0, "hard_max": 5000.0},
"bilirubine totale": {"hard_min": 0.0, "hard_max": 2000.0},
},
}
return _load_yaml_config(LAB_SANITY_PATH, defaults, "lab_value_sanity")
# --- Catalogue de règles (vetos + décisions), piloté par YAML ---
def _flatten_rules_yaml(data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Transforme un YAML de règles en dict {rule_id: cfg}.
Formats supportés :
- {packs: {pack_name: {enabled: bool, rules: {RULE_ID: {...}}}}}
- {rules: {RULE_ID: {...}}} (overlay simple)
"""
out: Dict[str, Dict[str, Any]] = {}
# Overlay simple
rules_block = data.get("rules")
if isinstance(rules_block, dict):
for rid, cfg in rules_block.items():
if not isinstance(cfg, dict):
cfg = {}
out[str(rid)] = dict(cfg)
packs = data.get("packs")
if isinstance(packs, dict):
for pack_name, pack_cfg in packs.items():
if not isinstance(pack_cfg, dict):
continue
pack_enabled = bool(pack_cfg.get("enabled", True))
rules = pack_cfg.get("rules")
if not isinstance(rules, dict):
continue
for rid, cfg in rules.items():
if not isinstance(cfg, dict):
cfg = {}
merged = dict(cfg)
merged.setdefault("pack", str(pack_name))
# La désactivation du pack désactive ses règles
merged["enabled"] = bool(merged.get("enabled", True)) and pack_enabled
out[str(rid)] = merged
return out
def _merge_rule_catalog(base: Dict[str, Dict[str, Any]], overlay: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""Merge overlay → base (par règle)."""
merged = {k: dict(v) for k, v in base.items()}
for rid, cfg in overlay.items():
if rid not in merged:
merged[rid] = dict(cfg)
else:
# override champ par champ
for k, v in cfg.items():
merged[rid][k] = v
return merged
@lru_cache(maxsize=1)
def load_rules_catalog() -> Dict[str, Dict[str, Any]]:
"""Charge le catalogue de règles depuis config/rules/*.yaml.
- base.yaml : socle partagé (vetos + décisions)
- enabled.yaml : sélection d'overlays (site/spécialité)
- specialties/<name>.yaml et sites/<name>.yaml : overrides ciblés
Politique : si une règle n'est pas listée, elle est considérée "enabled".
(=> ne casse pas le comportement historique)
"""
catalog: Dict[str, Dict[str, Any]] = {}
# 1) base
if RULES_BASE_PATH.exists():
try:
base_data = yaml.safe_load(RULES_BASE_PATH.read_text(encoding="utf-8")) or {}
if isinstance(base_data, dict):
catalog = _flatten_rules_yaml(base_data)
except (yaml.YAMLError, Exception) as e:
_cfg_logger.error("Rules catalog : erreur lecture base.yaml — %s", e)
catalog = {}
# 2) enabled overlays
active_site = ""
active_specialty = ""
extra_files: list[str] = []
if RULES_ENABLED_PATH.exists():
try:
enabled_data = yaml.safe_load(RULES_ENABLED_PATH.read_text(encoding="utf-8")) or {}
if isinstance(enabled_data, dict):
active = enabled_data.get("active") or {}
if isinstance(active, dict):
active_site = str(active.get("site") or "").strip()
active_specialty = str(active.get("specialty") or "").strip()
extra = active.get("extra")
if isinstance(extra, list):
extra_files = [str(x) for x in extra if str(x).strip()]
except (yaml.YAMLError, Exception) as e:
_cfg_logger.error("Rules catalog : erreur lecture enabled.yaml — %s", e)
else:
# fallback env
active_site = os.environ.get("T2A_SITE", "").strip()
active_specialty = os.environ.get("T2A_SPECIALTY", "").strip()
# 3) specialty overlay
if active_specialty:
p = RULES_DIR / "specialties" / f"{active_specialty}.yaml"
if p.exists():
try:
data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
if isinstance(data, dict):
catalog = _merge_rule_catalog(catalog, _flatten_rules_yaml(data))
except (yaml.YAMLError, Exception) as e:
_cfg_logger.error("Rules catalog : erreur overlay spécialité %s%s", active_specialty, e)
# 4) site overlay
if active_site:
p = RULES_DIR / "sites" / f"{active_site}.yaml"
if p.exists():
try:
data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
if isinstance(data, dict):
catalog = _merge_rule_catalog(catalog, _flatten_rules_yaml(data))
except (yaml.YAMLError, Exception) as e:
_cfg_logger.error("Rules catalog : erreur overlay site %s%s", active_site, e)
# 5) extra overlays
for rel in extra_files:
p = RULES_DIR / rel
if p.exists():
try:
data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
if isinstance(data, dict):
catalog = _merge_rule_catalog(catalog, _flatten_rules_yaml(data))
except (yaml.YAMLError, Exception) as e:
_cfg_logger.error("Rules catalog : erreur overlay %s%s", rel, e)
return catalog
# --- Routage dynamique des règles (packs) ---
# Contexte runtime, défini *par dossier* (contextvars => safe pour batch / multi-thread)
_RULES_RUNTIME_CTX: contextvars.ContextVar[dict | None] = contextvars.ContextVar("t2a_rules_runtime", default=None)
def set_rules_runtime(ctx: dict) -> contextvars.Token:
"""Active un contexte de règles pour le dossier courant."""
return _RULES_RUNTIME_CTX.set(ctx)
def reset_rules_runtime(token: contextvars.Token) -> None:
"""Restaure le contexte précédent."""
_RULES_RUNTIME_CTX.reset(token)
def get_rules_runtime() -> dict | None:
return _RULES_RUNTIME_CTX.get()
@lru_cache(maxsize=1)
def load_rules_router() -> Dict[str, Any]:
"""Charge la config de routage (config/rules/router.yaml).
- mode: 'strict' => une règle non listée dans base.yaml est considérée désactivée
quand le routage runtime est actif (objectif: éviter les surprises).
- defaults.enabled_packs: packs actifs par défaut sur tous les dossiers.
- triggers: conditions simples qui activent des packs additionnels.
"""
defaults: Dict[str, Any] = {
"version": 1,
"mode": "strict",
"defaults": {"enabled_packs": ["vetos_core", "decisions_core"]},
"triggers": [],
}
return _load_yaml_config(RULES_ROUTER_PATH, defaults, "rules_router")
def rule_enabled(rule_id: str) -> bool:
"""Retourne True si la règle est activée.
Mode legacy (pas de routage runtime): une règle inconnue => True (comportement historique).
Mode routé (runtime actif):
- On *garde* l'info 'enabled' du catalogue (base.yaml / overlays)
- On **désactive** automatiquement les règles dont le pack n'est pas dans enabled_packs
- En mode 'strict', une règle inconnue => False (ça évite les surprises en prod)
"""
catalog = load_rules_catalog()
cfg = catalog.get(rule_id)
runtime = get_rules_runtime()
if runtime is None:
# legacy
if not cfg:
return True
return bool(cfg.get("enabled", True))
mode = str(runtime.get("mode") or "strict").lower()
enabled_packs = set(runtime.get("enabled_packs") or [])
always_on = set(runtime.get("always_on_rules") or [])
force_enable = set(runtime.get("force_enable_rules") or [])
force_disable = set(runtime.get("force_disable_rules") or [])
if rule_id in force_disable:
return False
if rule_id in force_enable:
return True
# Règles inconnues: strict => off, legacy => on
if cfg is None:
return False if mode == "strict" else True
# Respecte le flag d'activation du catalogue (l'admin peut couper une règle)
if not bool(cfg.get("enabled", True)):
return False
pack = cfg.get("pack")
if pack and (pack not in enabled_packs) and (rule_id not in always_on):
return False
return True
def rule_force_severity(rule_id: str) -> str | None:
"""Optionnel: force la sévérité d'un veto (HARD/MEDIUM/LOW)."""
cfg = load_rules_catalog().get(rule_id) or {}
sev = cfg.get("force_severity")
return str(sev) if sev else None
# --- Modèles de données CIM-10 ---
class RAGSource(BaseModel):
document: str
page: Optional[int] = None
code: Optional[str] = None
extrait: Optional[str] = None
class Sejour(BaseModel):
sexe: Optional[str] = None
age: Optional[int] = None
date_entree: Optional[str] = None
date_sortie: Optional[str] = None
duree_sejour: Optional[int] = None
mode_entree: Optional[str] = None
mode_sortie: Optional[str] = None
imc: Optional[float] = None
poids: Optional[float] = None
taille: Optional[float] = None
class PreuveClinique(BaseModel):
type: str # "biologie" | "imagerie" | "traitement" | "acte" | "clinique"
element: str # "CRP 180 mg/L"
interpretation: str # "syndrome inflammatoire majeur"
class CodeDecision(BaseModel):
"""Décision finale sur un code (audit-friendly).
- action=KEEP: on garde la suggestion
- action=DOWNGRADE: on remplace par un code moins spécifique (ex: D50→D64.9)
- action=REMOVE: on retire le code (ou on le laisse vide)
"""
action: str = "KEEP" # KEEP | DOWNGRADE | REMOVE
final_code: Optional[str] = None
downgraded_from: Optional[str] = None
reason: Optional[str] = None
needs_info: list[str] = Field(default_factory=list)
applied_rules: list[str] = Field(default_factory=list)
class DPCandidate(BaseModel):
"""Candidat DP pour la sélection NUKE-3."""
index: int
term: str
code: Optional[str] = None
confidence: Optional[str] = None
source: Optional[str] = None
is_comorbidity_like: bool = False
is_symptom_like: bool = False
is_act_only: bool = False
section_strength: int = 0
num_occurrences: int = 1
score: float = 0.0
score_details: dict = Field(default_factory=dict)
class DPSelection(BaseModel):
"""Résultat de la sélection NUKE-3 du DP."""
chosen_index: Optional[int] = None
chosen_term: Optional[str] = None
chosen_code: Optional[str] = None
confidence: Optional[str] = None
verdict: str = "REVIEW" # CONFIRMED | REVIEW
evidence: list[str] = Field(default_factory=list)
reason: Optional[str] = None
candidates: list[DPCandidate] = Field(default_factory=list)
debug_scores: Optional[dict] = None
class Diagnostic(BaseModel):
texte: str
cim10_suggestion: Optional[str] = None
cim10_confidence: Optional[str] = None
# Statut clinique / qualité (pour affichage "barré" et exclusion métriques)
# - confirmed/probable/uncertain: actifs
# - ruled_out: visible mais barré (n'entre pas dans les métriques/GHM)
status: Optional[str] = None
ruled_out_reason: Optional[str] = None
# Sortie finale (post-traitement qualité)
cim10_final: Optional[str] = None
cim10_decision: Optional[CodeDecision] = None
justification: Optional[str] = None
raisonnement: Optional[str] = None
sources_rag: list[RAGSource] = Field(default_factory=list)
preuves_cliniques: list[PreuveClinique] = Field(default_factory=list)
est_cma: Optional[bool] = None
est_cms: Optional[bool] = None
niveau_severite: Optional[str] = None # "leger" | "modere" | "severe" | "non_evalue"
niveau_cma: Optional[int] = None # 1 (pas CMA) | 2 | 3 | 4 (niveau officiel ATIH)
source: Optional[str] = None # "trackare" | "edsnlp" | "regex" | "llm_das"
source_page: Optional[int] = None # numéro de page (1-indexed) dans le PDF source
source_excerpt: Optional[str] = None # extrait du texte source (~200 chars)
class DossierMetrics(BaseModel):
"""Métriques de qualité / reporting (audit-friendly).
Objectif : distinguer les éléments *actifs* (qui comptent pour le codage / GHM)
de ceux écartés par les règles qualité (vetos / décisions).
"""
das_total: int = 0
das_active: int = 0
das_excluded: int = 0 # total - active
das_removed: int = 0 # décision REMOVE (future: ruled_out)
das_ruled_out: int = 0 # visible mais barré (action RULED_OUT)
das_no_code: int = 0 # pas de code suggestion/final
actes_total: int = 0
actes_with_code: int = 0
dp_has_code: bool = False
class ActeCCAM(BaseModel):
texte: str
code_ccam_suggestion: Optional[str] = None
ccam_confidence: Optional[str] = None
justification: Optional[str] = None
raisonnement: Optional[str] = None
sources_rag: list[RAGSource] = Field(default_factory=list)
date: Optional[str] = None
validite: Optional[str] = None # "valide" | "obsolete" | "non_verifie"
alertes: list[str] = Field(default_factory=list)
source_page: Optional[int] = None
source_excerpt: Optional[str] = None
class Traitement(BaseModel):
medicament: str
posologie: Optional[str] = None
code_atc: Optional[str] = None
source_page: Optional[int] = None
source_excerpt: Optional[str] = None
class BiologieCle(BaseModel):
test: str
valeur: Optional[str] = None
# Valeur numérique parsée (si possible). Sert aux règles qualité.
valeur_num: Optional[float] = None
anomalie: Optional[bool] = None
# Qualité de parsing: ok | suspect | discarded
quality: Optional[str] = None
discard_reason: Optional[str] = None
source_page: Optional[int] = None
source_excerpt: Optional[str] = None
class Imagerie(BaseModel):
type: str
conclusion: Optional[str] = None
score: Optional[str] = None
source_page: Optional[int] = None
source_excerpt: Optional[str] = None
class Antecedent(BaseModel):
texte: str
source_page: Optional[int] = None
source_excerpt: Optional[str] = None
class Complication(BaseModel):
texte: str
source_page: Optional[int] = None
source_excerpt: Optional[str] = None
class DossierMedical(BaseModel):
source_file: str = ""
document_type: str = ""
sejour: Sejour = Field(default_factory=Sejour)
diagnostic_principal: Optional[Diagnostic] = None
dp_selection: Optional[DPSelection] = None
# Traçabilité DP (finalizer) — audit DIM
dp_trackare: Optional[DPSelection] = None # DP issu du Trackare (si existant)
dp_crh_only: Optional[DPSelection] = None # DP issu du CRH-only pipeline
dp_final: Optional[DPSelection] = None # DP final après arbitrage finalizer
quality_flags: dict = Field(default_factory=dict)
diagnostics_associes: list[Diagnostic] = Field(default_factory=list)
actes_ccam: list[ActeCCAM] = Field(default_factory=list)
antecedents: list[Antecedent] = Field(default_factory=list)
traitements_sortie: list[Traitement] = Field(default_factory=list)
biologie_cle: list[BiologieCle] = Field(default_factory=list)
# Valeurs biologiques écartées (artefacts PDF/OCR) pour audit
biologie_discarded: list[dict] = Field(default_factory=list)
imagerie: list[Imagerie] = Field(default_factory=list)
complications: list[Complication] = Field(default_factory=list)
alertes_codage: list[str] = Field(default_factory=list)
source_files: list[str] = Field(default_factory=list)
ghm_estimation: Optional[GHMEstimation] = None
controles_cpam: list[ControleCPAM] = Field(default_factory=list)
veto_report: Optional["VetoReport"] = None
completude: Optional["CompletudeDossier"] = None
processing_time_s: float | None = None
metrics: Optional[DossierMetrics] = None
rules_runtime: Optional[dict] = None
@field_validator("antecedents", mode="before")
@classmethod
def _coerce_antecedents(cls, v):
"""Backward compat : convertit les anciennes list[str] en list[Antecedent]."""
if not isinstance(v, list):
return v
result = []
for item in v:
if isinstance(item, str):
result.append({"texte": item})
else:
result.append(item)
return result
@field_validator("complications", mode="before")
@classmethod
def _coerce_complications(cls, v):
"""Backward compat : convertit les anciennes list[str] en list[Complication]."""
if not isinstance(v, list):
return v
result = []
for item in v:
if isinstance(item, str):
result.append({"texte": item})
else:
result.append(item)
return result
# --- Rapport d'anonymisation ---
class GHMEstimation(BaseModel):
cmd: Optional[str] = None
cmd_libelle: Optional[str] = None
type_ghm: Optional[str] = None # "C" / "M" / "K"
severite: int = 1 # 1-4
ghm_approx: Optional[str] = None # ex: "07C??2"
cma_count: int = 0
cms_count: int = 0
alertes: list[str] = Field(default_factory=list)
class FinancialImpact(BaseModel):
"""Estimation de l'impact financier d'un contrôle UCR."""
delta_severite: int = 0 # ex: -2 (perte 2 niveaux)
impact_estime_euros: int = 0 # estimation grossière
priorite: str = "normale" # "critique" | "haute" | "normale" | "faible"
raison: str = ""
class ControleCPAM(BaseModel):
numero_ogc: int
titre: str = ""
arg_ucr: str = ""
decision_ucr: str = ""
dp_ucr: Optional[str] = None
da_ucr: Optional[str] = None
dr_ucr: Optional[str] = None
actes_ucr: Optional[str] = None
type_desaccord: Optional[str] = None # "DP" | "DAS" | "DP+DAS" | "Actes"
financial_impact: Optional[FinancialImpact] = None
contre_argumentation: Optional[str] = None
response_data: Optional[dict] = None
sources_reponse: list[RAGSource] = Field(default_factory=list)
quality_tier: Optional[str] = None # "A" | "B" | "C"
requires_review: bool = False
quality_warnings: list[str] = Field(default_factory=list)
# Délais réglementaires
date_notification: Optional[str] = None # JJ/MM/AAAA
date_limite_reponse: Optional[str] = None # calculé : notification + 30j
statut_reponse: str = "a_traiter" # "a_traiter" | "en_cours" | "envoye" | "hors_delai"
# Workflow validation DIM
validation_dim: str = "non_valide" # "non_valide" | "en_revision" | "valide" | "rejete"
commentaire_dim: Optional[str] = None
date_validation: Optional[str] = None
# --- Qualité / Vetos (contestabilité) ---
class VetoIssue(BaseModel):
"""Un problème détecté lors du contrôle de contestabilité."""
veto: str
severity: str # HARD | MEDIUM | LOW
where: str
message: str
citation: Optional[str] = None
class VetoReport(BaseModel):
"""Rapport global de vetos pour un dossier."""
verdict: str # PASS | NEED_INFO | FAIL
score_contestabilite: int = 100 # 0-100
issues: list[VetoIssue] = Field(default_factory=list)
# --- Complétude documentaire DIM ---
class ItemCompletude(BaseModel):
"""Élément requis/recommandé pour justifier un code."""
categorie: str # "biologie" | "imagerie" | "document" | "acte" | "clinique"
element: str # "Albumine" | "CRO" | "Scanner abdominal"
statut: str # "present" | "absent" | "present_confirme" | "present_non_confirme" | "present_indirect"
valeur: Optional[str] = None # "28 g/L" si présent
importance: str # "obligatoire" | "recommande"
impact_cpam: str = "" # explication du risque
confirmation_detail: Optional[str] = None # "Albumine 28 g/L < 30 → confirme E43"
class CheckCompletude(BaseModel):
"""Vérification de complétude pour un code diagnostique."""
code: str # "E43"
libelle: str # "Dénutrition sévère"
type_diag: str # "DP" | "DAS"
items: list[ItemCompletude] = Field(default_factory=list)
score: int = 100 # 0-100
verdict: str = "defendable" # "defendable" | "fragile" | "indefendable"
resume: str = "" # "2/3 éléments obligatoires présents"
class CompletudeDossier(BaseModel):
"""Rapport global de complétude documentaire pour un dossier."""
checks: list[CheckCompletude] = Field(default_factory=list)
score_global: int = 100
verdict_global: str = "defendable"
documents_presents: list[str] = Field(default_factory=list)
documents_manquants: list[str] = Field(default_factory=list)
class AnonymizationReport(BaseModel):
source_file: str
total_replacements: int = 0
regex_replacements: int = 0
ner_replacements: int = 0
sweep_replacements: int = 0
entities_found: list[dict] = Field(default_factory=list)