CPAM — Méthode TIM (mémoire en défense) : - Réécriture CPAM_ARGUMENTATION avec raisonnement 5 passes TIM (contexte admin → motif réel → confrontation bio → hiérarchie → validation défensive) - _BIO_THRESHOLDS (19 entrées) + _build_bio_confrontation() pour confrontation biologie/diagnostic avec seuils chiffrés et verdicts - _format_response() dual format : nouveau TIM (moyens numérotés, tableau bio, codes non défendables, conclusion dispositive) + rétrocompat legacy - CPAM_ADVERSARIAL mis à jour pour vérifier honnêteté intellectuelle - Tests adaptés + 12 nouveaux tests (bio confrontation, format TIM) Moteur de règles : - Nouvelles règles YAML : demographic, diagnostic_conflicts, procedure_diagnosis, temporal, parcours - Bio extraction FAISS (synonymes vectoriels) - Veto engine enrichi (citations, Trackare skip, règles démographiques) - Decision engine : _apply_bio_rules_gen() + matchers analytiques Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
874 lines
32 KiB
Python
874 lines
32 KiB
Python
"""Validation et formatage des réponses CPAM (grounding, adversariale, codes)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
|
|
from ..config import ControleCPAM, DossierMedical
|
|
from ..medical.bio_normals import BIO_NORMALS
|
|
from ..medical.cim10_dict import normalize_code, validate_code
|
|
from ..medical.ollama_client import call_anthropic, call_ollama
|
|
from ..prompts import CPAM_ADVERSARIAL
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _fuzzy_match_ref(ref: str, tag_map: dict[str, str]) -> str | None:
|
|
"""Tente de résoudre une ref inventée vers un tag réel.
|
|
|
|
Stratégie : si la ref ressemble à un code CIM-10 (ex: "C83.3"),
|
|
chercher dans tag_map un tag dont le contenu contient ce code.
|
|
|
|
Returns:
|
|
Le tag réel trouvé, ou None si aucun match.
|
|
"""
|
|
ref_upper = ref.strip().upper()
|
|
# Match par code CIM-10 dans le contenu des tags
|
|
if re.match(r"^[A-Z]\d{2}\.?\d{0,2}$", ref_upper):
|
|
for tag, content in tag_map.items():
|
|
if ref_upper in content.upper() or ref in content:
|
|
return tag
|
|
return None
|
|
|
|
|
|
def _validate_grounding(response_data: dict, tag_map: dict[str, str]) -> list[str]:
|
|
"""Vérifie que les références dans preuves correspondent à des tags existants.
|
|
|
|
Supporte les deux formats :
|
|
- Ancien : response_data["preuves_dossier"][].ref
|
|
- Nouveau TIM : response_data["moyens_defense"][].preuves[].ref
|
|
|
|
Applique un fuzzy matching par code CIM-10 avant de flaguer un warning.
|
|
|
|
Returns:
|
|
Liste de warnings pour les références inventées.
|
|
"""
|
|
if not tag_map:
|
|
return []
|
|
|
|
warnings: list[str] = []
|
|
|
|
def _check_ref(ref: str, context: str) -> None:
|
|
if not ref:
|
|
return
|
|
# Nettoyer les crochets si présents (nouveau format utilise "[BIO-1]")
|
|
clean_ref = ref.strip("[]")
|
|
if clean_ref in tag_map or ref in tag_map:
|
|
return
|
|
matched_tag = _fuzzy_match_ref(clean_ref, tag_map)
|
|
if matched_tag:
|
|
logger.info("Grounding : ref [%s] résolue vers [%s]", ref, matched_tag)
|
|
return
|
|
warnings.append(f"Preuve [{ref}] non traçable (« {context} »)")
|
|
logger.warning("Grounding : preuve [%s] introuvable dans les tags du dossier", ref)
|
|
|
|
# Ancien format : preuves_dossier
|
|
preuves = response_data.get("preuves_dossier")
|
|
if preuves and isinstance(preuves, list):
|
|
for p in preuves:
|
|
if isinstance(p, dict):
|
|
_check_ref(p.get("ref", ""), p.get("valeur", "?"))
|
|
|
|
# Nouveau format TIM : moyens_defense[].preuves
|
|
moyens = response_data.get("moyens_defense")
|
|
if moyens and isinstance(moyens, list):
|
|
for moyen in moyens:
|
|
if not isinstance(moyen, dict):
|
|
continue
|
|
moyen_preuves = moyen.get("preuves")
|
|
if not moyen_preuves or not isinstance(moyen_preuves, list):
|
|
continue
|
|
for p in moyen_preuves:
|
|
if isinstance(p, dict):
|
|
_check_ref(p.get("ref", ""), p.get("fait", "?"))
|
|
|
|
return warnings
|
|
|
|
|
|
def _validate_references(parsed: dict, sources: list[dict]) -> list[str]:
|
|
"""Vérifie que les références citées correspondent aux sources RAG fournies.
|
|
|
|
Returns:
|
|
Liste d'avertissements pour les références non vérifiables.
|
|
"""
|
|
warnings = []
|
|
refs = parsed.get("references")
|
|
if not refs or not isinstance(refs, list):
|
|
return warnings
|
|
|
|
# Construire un set des documents sources disponibles
|
|
source_docs = set()
|
|
for src in sources:
|
|
doc_name = src.get("document", "")
|
|
source_docs.add(doc_name)
|
|
# Ajouter les noms lisibles aussi
|
|
readable = {
|
|
"cim10": "CIM-10 FR 2026",
|
|
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
|
|
"guide_methodo": "Guide Méthodologique MCO 2026",
|
|
"ccam": "CCAM PMSI V4 2025",
|
|
}.get(doc_name, "")
|
|
if readable:
|
|
source_docs.add(readable)
|
|
source_docs.add(readable.lower())
|
|
|
|
if not source_docs:
|
|
return warnings
|
|
|
|
for ref in refs:
|
|
if not isinstance(ref, dict):
|
|
continue
|
|
doc = ref.get("document", "")
|
|
if doc and not any(sd in doc.lower() or doc.lower() in sd.lower() for sd in source_docs if sd):
|
|
warnings.append(f"Référence non vérifiable : {doc}")
|
|
logger.warning("CPAM : référence non vérifiable « %s »", doc)
|
|
|
|
return warnings
|
|
|
|
|
|
# Regex pour capturer les codes CIM-10 (ex: K81.0, E87, Z45.80)
|
|
_CIM10_CODE_RE = re.compile(r"\b([A-Z]\d{2}\.?\d{0,2})\b")
|
|
|
|
# Champs textuels de la réponse LLM à scanner pour les codes CIM-10
|
|
# Supporte les deux formats : ancien (contre_arguments_*) et nouveau (moyens_defense TIM)
|
|
_TEXT_FIELDS = (
|
|
# Ancien format
|
|
"analyse_contestation",
|
|
"contre_arguments_medicaux",
|
|
"contre_arguments_asymetrie",
|
|
"contre_arguments_reglementaires",
|
|
"conclusion",
|
|
# Nouveau format TIM
|
|
"rappel_faits",
|
|
"asymetrie_information",
|
|
"reponse_points_cpam",
|
|
"conclusion_dispositive",
|
|
)
|
|
|
|
|
|
def _build_whitelist_prefixes(
|
|
dossier: DossierMedical,
|
|
controle: ControleCPAM,
|
|
) -> set[str]:
|
|
"""Construit la whitelist de préfixes CIM-10 autorisés (3 chars).
|
|
|
|
Sources : DP + DAS du dossier, dp_ucr + da_ucr + dr_ucr du contrôle.
|
|
"""
|
|
prefixes: set[str] = set()
|
|
|
|
def _add(raw: str) -> None:
|
|
raw = raw.strip()
|
|
if not raw:
|
|
return
|
|
norm = normalize_code(raw)
|
|
if norm and len(norm) >= 3:
|
|
prefixes.add(norm[:3])
|
|
|
|
if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion:
|
|
_add(dossier.diagnostic_principal.cim10_suggestion)
|
|
for das in dossier.diagnostics_associes:
|
|
if das.cim10_suggestion:
|
|
_add(das.cim10_suggestion)
|
|
|
|
for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr):
|
|
if not field:
|
|
continue
|
|
for raw in re.split(r"[,;\s]+", field.strip()):
|
|
_add(raw)
|
|
|
|
return prefixes
|
|
|
|
|
|
# Patterns pour supprimer proprement un code hors périmètre et ses artefacts :
|
|
# "D62 — libellé" → "libellé"
|
|
# "(D62)" → ""
|
|
# "D62" → ""
|
|
_SANITIZE_PATTERNS = [
|
|
# "CODE — libellé" ou "CODE - libellé"
|
|
re.compile(r"\b[A-Z]\d{2}\.?\d{0,2}\s*[—–\-]\s*"),
|
|
# "(CODE)" avec espaces optionnels
|
|
re.compile(r"\(\s*[A-Z]\d{2}\.?\d{0,2}\s*\)"),
|
|
# "CODE" seul
|
|
_CIM10_CODE_RE,
|
|
]
|
|
|
|
|
|
def _sanitize_unauthorized_codes(
|
|
parsed: dict,
|
|
dossier: DossierMedical,
|
|
controle: ControleCPAM,
|
|
) -> list[str]:
|
|
"""Supprime les codes CIM-10 hors périmètre des champs textuels de la réponse.
|
|
|
|
Modifie `parsed` in-place. Applique le principe « LLM propose, moteur de
|
|
règles dispose » : le texte garde le sens médical mais les codes inventés
|
|
sont retirés pour éviter les warnings CRITIQUE.
|
|
|
|
Returns:
|
|
Liste des codes supprimés (pour logging).
|
|
"""
|
|
whitelist = _build_whitelist_prefixes(dossier, controle)
|
|
if not whitelist:
|
|
return []
|
|
|
|
removed: list[str] = []
|
|
|
|
def _is_authorized(code_str: str) -> bool:
|
|
norm = normalize_code(code_str)
|
|
return bool(norm and len(norm) >= 3 and norm[:3] in whitelist)
|
|
|
|
def _replace_code(match: re.Match) -> str:
|
|
"""Callback de remplacement : garde le code si autorisé, supprime sinon."""
|
|
code = _CIM10_CODE_RE.search(match.group(0))
|
|
if not code:
|
|
return match.group(0)
|
|
if _is_authorized(code.group(0)):
|
|
return match.group(0)
|
|
if code.group(0) not in removed:
|
|
removed.append(code.group(0))
|
|
return ""
|
|
|
|
# Sanitiser les champs textuels
|
|
for key in _TEXT_FIELDS:
|
|
val = parsed.get(key)
|
|
if not val or not isinstance(val, str):
|
|
continue
|
|
new_val = val
|
|
for pattern in _SANITIZE_PATTERNS:
|
|
new_val = pattern.sub(
|
|
lambda m, _p=pattern: _replace_code(m),
|
|
new_val,
|
|
)
|
|
# Nettoyage artefacts : doubles espaces, parenthèses vides
|
|
new_val = re.sub(r"\(\s*\)", "", new_val)
|
|
new_val = re.sub(r" +", " ", new_val)
|
|
new_val = new_val.strip()
|
|
if new_val != val:
|
|
parsed[key] = new_val
|
|
|
|
# Sanitiser aussi les preuves_dossier.valeur (ancien format)
|
|
preuves = parsed.get("preuves_dossier")
|
|
if preuves and isinstance(preuves, list):
|
|
for p in preuves:
|
|
if not isinstance(p, dict):
|
|
continue
|
|
v = p.get("valeur", "")
|
|
if not v or not isinstance(v, str):
|
|
continue
|
|
new_v = v
|
|
for pattern in _SANITIZE_PATTERNS:
|
|
new_v = pattern.sub(
|
|
lambda m, _p=pattern: _replace_code(m),
|
|
new_v,
|
|
)
|
|
new_v = re.sub(r"\(\s*\)", "", new_v)
|
|
new_v = re.sub(r" +", " ", new_v).strip()
|
|
if new_v != v:
|
|
p["valeur"] = new_v
|
|
|
|
# Sanitiser les moyens_defense[].argument (nouveau format TIM)
|
|
moyens = parsed.get("moyens_defense")
|
|
if moyens and isinstance(moyens, list):
|
|
for moyen in moyens:
|
|
if not isinstance(moyen, dict):
|
|
continue
|
|
for field_key in ("argument", "titre"):
|
|
val = moyen.get(field_key, "")
|
|
if not val or not isinstance(val, str):
|
|
continue
|
|
new_val = val
|
|
for pattern in _SANITIZE_PATTERNS:
|
|
new_val = pattern.sub(
|
|
lambda m, _p=pattern: _replace_code(m),
|
|
new_val,
|
|
)
|
|
new_val = re.sub(r"\(\s*\)", "", new_val)
|
|
new_val = re.sub(r" +", " ", new_val).strip()
|
|
if new_val != val:
|
|
moyen[field_key] = new_val
|
|
|
|
if removed:
|
|
for code in removed:
|
|
norm = normalize_code(code)
|
|
is_valid, label = validate_code(norm)
|
|
label_str = f" ({label})" if is_valid and label else ""
|
|
logger.info("Sanitize : code %s%s hors périmètre supprimé du texte", code, label_str)
|
|
|
|
return removed
|
|
|
|
|
|
def _validate_codes_in_response(
|
|
parsed: dict,
|
|
dossier: DossierMedical,
|
|
controle: ControleCPAM,
|
|
) -> list[str]:
|
|
"""Vérifie que les codes CIM-10 cités dans la réponse sont dans le périmètre du dossier.
|
|
|
|
Construit une whitelist à partir du dossier (DP, DAS) et de l'UCR (dp_ucr, da_ucr, dr_ucr),
|
|
puis extrait tous les codes CIM-10 des champs textuels de la réponse LLM.
|
|
La comparaison se fait par préfixe 3 caractères (ex: K81 matche K81.0 et K81.09).
|
|
|
|
Returns:
|
|
Liste de warnings pour les codes hors périmètre.
|
|
"""
|
|
whitelist_prefixes = _build_whitelist_prefixes(dossier, controle)
|
|
if not whitelist_prefixes:
|
|
return []
|
|
|
|
# 2. Extraire les codes CIM-10 de la réponse LLM
|
|
text_fields = []
|
|
for key in _TEXT_FIELDS:
|
|
val = parsed.get(key)
|
|
if val and isinstance(val, str):
|
|
text_fields.append(val)
|
|
|
|
# Preuves du dossier — valeurs (ancien format)
|
|
preuves = parsed.get("preuves_dossier")
|
|
if preuves and isinstance(preuves, list):
|
|
for p in preuves:
|
|
if isinstance(p, dict):
|
|
v = p.get("valeur", "")
|
|
if v and isinstance(v, str):
|
|
text_fields.append(v)
|
|
|
|
# Moyens de défense (nouveau format TIM)
|
|
moyens = parsed.get("moyens_defense")
|
|
if moyens and isinstance(moyens, list):
|
|
for moyen in moyens:
|
|
if isinstance(moyen, dict):
|
|
for mkey in ("argument", "titre"):
|
|
v = moyen.get(mkey, "")
|
|
if v and isinstance(v, str):
|
|
text_fields.append(v)
|
|
|
|
combined_text = "\n".join(text_fields)
|
|
found_codes = _CIM10_CODE_RE.findall(combined_text)
|
|
|
|
if not found_codes:
|
|
return []
|
|
|
|
# 3. Comparer par préfixe 3 chars
|
|
warnings: list[str] = []
|
|
seen_warned: set[str] = set()
|
|
|
|
for raw_code in found_codes:
|
|
norm = normalize_code(raw_code)
|
|
if not norm or len(norm) < 3:
|
|
continue
|
|
prefix = norm[:3]
|
|
if prefix in whitelist_prefixes:
|
|
continue
|
|
if norm in seen_warned:
|
|
continue
|
|
seen_warned.add(norm)
|
|
is_valid, label = validate_code(norm)
|
|
label_str = f" ({label})" if is_valid and label else ""
|
|
warnings.append(f"Code {norm}{label_str} hors périmètre dossier/UCR")
|
|
logger.warning("CPAM : code %s%s absent du dossier et de l'UCR", norm, label_str)
|
|
|
|
return warnings
|
|
|
|
|
|
def _validate_adversarial(
|
|
response_data: dict,
|
|
tag_map: dict[str, str],
|
|
controle: ControleCPAM,
|
|
) -> dict | None:
|
|
"""Validation adversariale — vérifie la cohérence de la contre-argumentation.
|
|
|
|
Un appel LLM de relecture critique vérifie :
|
|
1. Les valeurs cliniques citées correspondent aux éléments tagués du dossier
|
|
2. La conclusion est cohérente avec l'argumentation
|
|
3. Les points d'accord ne contredisent pas la contre-argumentation
|
|
4. Les codes CIM-10 cités sont cohérents
|
|
|
|
Returns:
|
|
dict {"coherent": bool, "erreurs": list[str], "score_confiance": int} ou None si échec.
|
|
"""
|
|
import json as _json
|
|
|
|
# LOGIC-3 — Vérifier si les modèles CPAM et validation sont identiques
|
|
from ..config import check_adversarial_model_config
|
|
|
|
same_model, model_msg = check_adversarial_model_config()
|
|
if same_model:
|
|
logger.warning("LOGIC-3: %s", model_msg)
|
|
return {
|
|
"coherent": True,
|
|
"erreurs": [f"Validation adversariale dégradée : {model_msg}"],
|
|
"score_confiance": 0,
|
|
}
|
|
|
|
# Construire le résumé des éléments factuels disponibles
|
|
if tag_map:
|
|
factual_lines = "\n".join(f" [{tag}] {content}" for tag, content in tag_map.items())
|
|
factual_section = f"ÉLÉMENTS FACTUELS DU DOSSIER :\n{factual_lines}"
|
|
else:
|
|
factual_section = "ÉLÉMENTS FACTUELS DU DOSSIER : aucun élément tagué disponible"
|
|
|
|
# Sérialiser la réponse LLM de façon compacte
|
|
try:
|
|
response_json = _json.dumps(response_data, ensure_ascii=False, indent=None)
|
|
# Tronquer si trop long pour le prompt de validation
|
|
if len(response_json) > 10000:
|
|
response_json = response_json[:10000] + "..."
|
|
except (TypeError, ValueError):
|
|
logger.warning("Validation adversariale : impossible de sérialiser la réponse")
|
|
return None
|
|
|
|
# Normes biologiques pour vérifier les interprétations
|
|
normes_lines = []
|
|
for test, (lo, hi) in BIO_NORMALS.items():
|
|
normes_lines.append(f" {test}: {lo}-{hi}")
|
|
normes_section = "NORMES BIOLOGIQUES DE RÉFÉRENCE :\n" + "\n".join(normes_lines)
|
|
|
|
dp_ucr_line = f"DP UCR : {controle.dp_ucr}" if controle.dp_ucr else ""
|
|
da_ucr_line = f"DA UCR : {controle.da_ucr}" if controle.da_ucr else ""
|
|
|
|
prompt = CPAM_ADVERSARIAL.format(
|
|
response_json=response_json,
|
|
factual_section=factual_section,
|
|
normes_section=normes_section,
|
|
dp_ucr_line=dp_ucr_line,
|
|
da_ucr_line=da_ucr_line,
|
|
)
|
|
|
|
logger.debug(" Validation adversariale")
|
|
result = call_ollama(prompt, temperature=0.0, max_tokens=6000, role="validation")
|
|
if result is None:
|
|
result = call_anthropic(prompt, temperature=0.0, max_tokens=6000)
|
|
if result is None:
|
|
logger.warning(" Validation adversariale échouée — LLM indisponible")
|
|
return None
|
|
|
|
coherent = result.get("coherent", True)
|
|
erreurs = result.get("erreurs", [])
|
|
score = result.get("score_confiance", -1)
|
|
|
|
if not coherent and erreurs:
|
|
logger.warning(" Validation adversariale : %d incohérence(s) détectée(s) (score %s/10)",
|
|
len(erreurs), score)
|
|
for e in erreurs:
|
|
logger.warning(" - %s", e)
|
|
else:
|
|
logger.info(" Validation adversariale OK (score %s/10)", score)
|
|
|
|
return result
|
|
|
|
|
|
def _build_correction_prompt(
|
|
original_prompt: str,
|
|
original_response: dict,
|
|
adversarial_result: dict,
|
|
) -> str:
|
|
"""Construit un prompt de correction en injectant les erreurs détectées.
|
|
|
|
Args:
|
|
original_prompt: Le prompt d'argumentation initial.
|
|
original_response: La réponse LLM originale (dict).
|
|
adversarial_result: Le résultat de la validation adversariale.
|
|
|
|
Returns:
|
|
Prompt de correction prêt à envoyer au LLM.
|
|
"""
|
|
import json as _json
|
|
|
|
erreurs = adversarial_result.get("erreurs", [])
|
|
erreurs_text = "\n".join(f" {i}. {e}" for i, e in enumerate(erreurs, 1))
|
|
|
|
# Résumé compact de la réponse problématique (supporte les deux formats)
|
|
summary_fields = {}
|
|
# Ancien format
|
|
for key in ("analyse_contestation", "contre_arguments_medicaux",
|
|
"contre_arguments_asymetrie", "contre_arguments_reglementaires",
|
|
"conclusion"):
|
|
val = original_response.get(key)
|
|
if val and isinstance(val, str):
|
|
summary_fields[key] = val[:400] + ("..." if len(val) > 400 else "")
|
|
# Nouveau format TIM
|
|
for key in ("rappel_faits", "asymetrie_information", "reponse_points_cpam",
|
|
"conclusion_dispositive"):
|
|
val = original_response.get(key)
|
|
if val and isinstance(val, str):
|
|
summary_fields[key] = val[:400] + ("..." if len(val) > 400 else "")
|
|
|
|
try:
|
|
response_summary = _json.dumps(summary_fields, ensure_ascii=False, indent=2)
|
|
except (TypeError, ValueError):
|
|
response_summary = str(summary_fields)
|
|
|
|
correction_block = (
|
|
"\n\n=== CORRECTION REQUISE — ERREURS DÉTECTÉES DANS TA RÉPONSE PRÉCÉDENTE ===\n"
|
|
f"{erreurs_text}\n\n"
|
|
f"RÉPONSE PRÉCÉDENTE (À CORRIGER) :\n{response_summary}\n\n"
|
|
"Corrige UNIQUEMENT les erreurs ci-dessus. Conserve les parties correctes.\n"
|
|
"Réponds avec le même format JSON."
|
|
)
|
|
|
|
return original_prompt + correction_block
|
|
|
|
|
|
def _assess_quality_tier(
|
|
parsed: dict,
|
|
ref_warnings: list[str],
|
|
grounding_warnings: list[str],
|
|
code_warnings: list[str],
|
|
adversarial_result: dict | None,
|
|
is_weak_dossier: bool = False,
|
|
) -> tuple[str, bool, list[str]]:
|
|
"""Évalue le tier qualité (A/B/C) et le flag requires_review.
|
|
|
|
Classification :
|
|
- Tier C (requires_review=True) :
|
|
score adversarial < 4 OU code_warnings > 0 OU grounding_warnings > 2
|
|
(si dossier faible : seuil adversarial abaissé à < 2)
|
|
- Tier B :
|
|
score adversarial 4-6 OU ref_warnings > 0 OU grounding_warnings 1-2
|
|
(si dossier faible : score 2-3 accepté en B)
|
|
- Tier A :
|
|
score adversarial >= 7, 0 warning critique, <= 1 warning mineur
|
|
|
|
Args:
|
|
is_weak_dossier: Si True, relaxe les seuils adversariaux car un score bas
|
|
est attendu quand le dossier manque d'éléments probants.
|
|
|
|
Returns:
|
|
(tier, requires_review, categorized_warnings)
|
|
"""
|
|
categorized: list[str] = []
|
|
score = adversarial_result.get("score_confiance", -1) if adversarial_result else -1
|
|
has_critical = False
|
|
minor_count = 0
|
|
|
|
# Seuil adversarial adapté à la force du dossier
|
|
score_critical_threshold = 2 if is_weak_dossier else 4
|
|
|
|
# --- Warnings critiques ---
|
|
for w in code_warnings:
|
|
categorized.append(f"[CRITIQUE] {w}")
|
|
has_critical = True
|
|
|
|
if score != -1 and score < score_critical_threshold:
|
|
categorized.append(f"[CRITIQUE] Score adversarial très bas : {score}/10")
|
|
has_critical = True
|
|
elif score != -1 and score <= 3 and is_weak_dossier:
|
|
# Score 2-3 sur dossier faible → warning mineur (pas critique)
|
|
categorized.append(
|
|
f"[MINEUR] Score adversarial bas ({score}/10) — "
|
|
f"attendu pour un dossier à preuves limitées"
|
|
)
|
|
minor_count += 1
|
|
|
|
if len(grounding_warnings) > 2:
|
|
for w in grounding_warnings:
|
|
categorized.append(f"[CRITIQUE] {w}")
|
|
has_critical = True
|
|
elif grounding_warnings:
|
|
for w in grounding_warnings:
|
|
categorized.append(f"[MINEUR] {w}")
|
|
minor_count += 1
|
|
|
|
# --- Warnings mineurs ---
|
|
for w in ref_warnings:
|
|
categorized.append(f"[MINEUR] {w}")
|
|
minor_count += 1
|
|
|
|
if adversarial_result and not adversarial_result.get("coherent", True):
|
|
for e in adversarial_result.get("erreurs", []):
|
|
if isinstance(e, str) and e.strip():
|
|
categorized.append(f"[MINEUR] Incohérence : {e}")
|
|
minor_count += 1
|
|
|
|
if score != -1 and 4 <= score <= 6:
|
|
categorized.append(f"[MINEUR] Score adversarial moyen : {score}/10")
|
|
minor_count += 1
|
|
|
|
# --- Classification ---
|
|
if has_critical or (score != -1 and score < score_critical_threshold):
|
|
tier = "C"
|
|
requires_review = True
|
|
elif minor_count > 0 or (score != -1 and 4 <= score <= 6):
|
|
tier = "B"
|
|
requires_review = False
|
|
else:
|
|
tier = "A"
|
|
requires_review = False
|
|
|
|
return tier, requires_review, categorized
|
|
|
|
|
|
def _is_new_tim_format(parsed: dict) -> bool:
|
|
"""Détecte si la réponse LLM utilise le nouveau format TIM (moyens_defense)."""
|
|
return "moyens_defense" in parsed
|
|
|
|
|
|
def _format_response(
|
|
parsed: dict,
|
|
ref_warnings: list[str] | None = None,
|
|
quality_tier: str | None = None,
|
|
categorized_warnings: list[str] | None = None,
|
|
) -> str:
|
|
"""Formate la réponse LLM en texte lisible.
|
|
|
|
Supporte deux formats via duck-typing :
|
|
- Nouveau TIM : moyens_defense, confrontation_bio, conclusion_dispositive
|
|
- Ancien : contre_arguments_medicaux, points_accord, conclusion
|
|
"""
|
|
if _is_new_tim_format(parsed):
|
|
return _format_response_tim(parsed, ref_warnings, quality_tier, categorized_warnings)
|
|
return _format_response_legacy(parsed, ref_warnings, quality_tier, categorized_warnings)
|
|
|
|
|
|
def _format_response_tim(
|
|
parsed: dict,
|
|
ref_warnings: list[str] | None = None,
|
|
quality_tier: str | None = None,
|
|
categorized_warnings: list[str] | None = None,
|
|
) -> str:
|
|
"""Formate la réponse LLM au format mémoire en défense TIM."""
|
|
sections: list[str] = []
|
|
sep = "───────────────────────────────────────────────────────"
|
|
sep_heavy = "═══════════════════════════════════════════════════════"
|
|
|
|
# En-tête
|
|
objet = parsed.get("objet", "Mémoire en défense")
|
|
sections.append(f"{sep_heavy}\nMÉMOIRE EN DÉFENSE — {objet}\n{sep_heavy}")
|
|
|
|
# Bandeau qualité si tier C
|
|
if quality_tier == "C":
|
|
sections.append("⚠ REVUE MANUELLE REQUISE (Qualité : C)")
|
|
|
|
# Rappel des faits
|
|
rappel = parsed.get("rappel_faits")
|
|
if rappel:
|
|
sections.append(f"RAPPEL DES FAITS\n{rappel}")
|
|
|
|
sections.append(sep)
|
|
|
|
# Moyens de défense numérotés
|
|
moyens = parsed.get("moyens_defense")
|
|
if moyens and isinstance(moyens, list):
|
|
for moyen in moyens:
|
|
if not isinstance(moyen, dict):
|
|
continue
|
|
num = moyen.get("numero", "?")
|
|
titre = moyen.get("titre", "")
|
|
argument = moyen.get("argument", "")
|
|
|
|
moyen_lines = [f"MOYEN N°{num} — {titre}"]
|
|
if argument:
|
|
moyen_lines.append(argument)
|
|
|
|
# Preuves intégrées dans chaque moyen
|
|
moyen_preuves = moyen.get("preuves")
|
|
if moyen_preuves and isinstance(moyen_preuves, list):
|
|
for p in moyen_preuves:
|
|
if isinstance(p, dict):
|
|
ref = p.get("ref", "")
|
|
fait = p.get("fait", "")
|
|
signif = p.get("signification", "")
|
|
moyen_lines.append(f" Preuve : {ref} {fait} → {signif}")
|
|
|
|
# Source réglementaire du moyen
|
|
src_regl = moyen.get("source_reglementaire")
|
|
if src_regl and src_regl != "null":
|
|
moyen_lines.append(f" Source : {src_regl}")
|
|
|
|
sections.append("\n".join(moyen_lines))
|
|
|
|
sections.append(sep)
|
|
|
|
# Confrontation biologie / diagnostic (tableau)
|
|
confrontation = parsed.get("confrontation_bio")
|
|
if confrontation and isinstance(confrontation, list):
|
|
table_lines = ["CONFRONTATION BIOLOGIE / DIAGNOSTIC"]
|
|
table_lines.append(
|
|
"┌─────────────────┬─────────────┬──────────────┬───────────┬───────────────┐"
|
|
)
|
|
table_lines.append(
|
|
"│ Diagnostic │ Test requis │ Seuil │ Valeur │ Verdict │"
|
|
)
|
|
table_lines.append(
|
|
"├─────────────────┼─────────────┼──────────────┼───────────┼───────────────┤"
|
|
)
|
|
for row in confrontation:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
diag = str(row.get("diagnostic", ""))[:17].ljust(17)
|
|
test = str(row.get("test", ""))[:13].ljust(13)
|
|
seuil = str(row.get("seuil", ""))[:14].ljust(14)
|
|
valeur = str(row.get("valeur", ""))[:11].ljust(11)
|
|
verdict = str(row.get("verdict", ""))[:15].ljust(15)
|
|
table_lines.append(f"│ {diag}│ {test}│ {seuil}│ {valeur}│ {verdict}│")
|
|
table_lines.append(
|
|
"└─────────────────┴─────────────┴──────────────┴───────────┴───────────────┘"
|
|
)
|
|
sections.append("\n".join(table_lines))
|
|
|
|
sections.append(sep)
|
|
|
|
# Codes non défendables (honnêteté intellectuelle)
|
|
codes_nd = parsed.get("codes_non_defendables")
|
|
if codes_nd and isinstance(codes_nd, list) and len(codes_nd) > 0:
|
|
nd_lines = ["⚠ CODES NON DÉFENDABLES (honnêteté intellectuelle)"]
|
|
for nd in codes_nd:
|
|
if isinstance(nd, dict):
|
|
code = nd.get("code", "?")
|
|
raison = nd.get("raison", "")
|
|
reco = nd.get("recommandation", "")
|
|
nd_lines.append(f"- {code} : {raison}")
|
|
if reco:
|
|
nd_lines.append(f" → {reco}")
|
|
sections.append("\n".join(nd_lines))
|
|
sections.append(sep)
|
|
|
|
# Asymétrie d'information
|
|
asymetrie = parsed.get("asymetrie_information")
|
|
if asymetrie:
|
|
sections.append(f"ASYMÉTRIE D'INFORMATION\n{asymetrie}")
|
|
sections.append(sep)
|
|
|
|
# Réponse aux points CPAM
|
|
reponse_cpam = parsed.get("reponse_points_cpam")
|
|
if reponse_cpam:
|
|
sections.append(f"RÉPONSE AUX POINTS DE LA CPAM\n{reponse_cpam}")
|
|
sections.append(sep)
|
|
|
|
# Références réglementaires
|
|
refs = parsed.get("references")
|
|
if refs:
|
|
if isinstance(refs, list):
|
|
ref_lines = ["RÉFÉRENCES RÉGLEMENTAIRES"]
|
|
for r in refs:
|
|
if isinstance(r, dict):
|
|
doc = r.get("document", "")
|
|
page = r.get("page", "")
|
|
citation = r.get("citation", "")
|
|
ref_lines.append(f"- [{doc}, p.{page}] {citation}")
|
|
else:
|
|
ref_lines.append(f"- {r}")
|
|
sections.append("\n".join(ref_lines))
|
|
else:
|
|
sections.append(f"RÉFÉRENCES RÉGLEMENTAIRES\n{refs}")
|
|
|
|
sections.append(sep_heavy)
|
|
|
|
# Conclusion dispositive
|
|
conclusion = parsed.get("conclusion_dispositive")
|
|
if conclusion:
|
|
sections.append(f"CONCLUSION\n{conclusion}")
|
|
|
|
sections.append(sep_heavy)
|
|
|
|
# Avertissements
|
|
sections.extend(_format_warnings(categorized_warnings, ref_warnings))
|
|
|
|
return "\n\n".join(sections)
|
|
|
|
|
|
def _format_response_legacy(
|
|
parsed: dict,
|
|
ref_warnings: list[str] | None = None,
|
|
quality_tier: str | None = None,
|
|
categorized_warnings: list[str] | None = None,
|
|
) -> str:
|
|
"""Formate la réponse LLM au format hérité (rétro-compatibilité cache)."""
|
|
sections = []
|
|
|
|
# Bandeau qualité si tier C
|
|
if quality_tier == "C":
|
|
sections.append("⚠ REVUE MANUELLE REQUISE (Qualité : C)")
|
|
|
|
analyse = parsed.get("analyse_contestation")
|
|
if analyse:
|
|
sections.append(f"ANALYSE DE LA CONTESTATION\n{analyse}")
|
|
|
|
accord = parsed.get("points_accord")
|
|
if accord and accord.lower() not in ("aucun", "non applicable", "n/a", ""):
|
|
sections.append(f"POINTS D'ACCORD\n{accord}")
|
|
|
|
# Champs structurés par axe
|
|
contre_med = parsed.get("contre_arguments_medicaux")
|
|
if contre_med:
|
|
sections.append(f"CONTRE-ARGUMENTS MÉDICAUX\n{contre_med}")
|
|
|
|
# Preuves du dossier
|
|
preuves = parsed.get("preuves_dossier")
|
|
if preuves and isinstance(preuves, list):
|
|
preuves_lines = []
|
|
for p in preuves:
|
|
if isinstance(p, dict):
|
|
ref = p.get("ref", "")
|
|
elem = p.get("element", "")
|
|
valeur = p.get("valeur", "")
|
|
signif = p.get("signification", "")
|
|
ref_prefix = f"[{ref}] " if ref else ""
|
|
preuves_lines.append(f"- {ref_prefix}[{elem}] {valeur} → {signif}")
|
|
if preuves_lines:
|
|
sections.append(f"PREUVES DU DOSSIER\n" + "\n".join(preuves_lines))
|
|
|
|
contre_asym = parsed.get("contre_arguments_asymetrie")
|
|
if contre_asym:
|
|
sections.append(f"ASYMÉTRIE D'INFORMATION\n{contre_asym}")
|
|
|
|
contre_regl = parsed.get("contre_arguments_reglementaires")
|
|
if contre_regl:
|
|
sections.append(f"CONTRE-ARGUMENTS RÉGLEMENTAIRES\n{contre_regl}")
|
|
|
|
# Fallback : ancien champ unique (réponses en cache existantes)
|
|
if not contre_med and not contre_asym and not contre_regl:
|
|
contre = parsed.get("contre_arguments")
|
|
if contre:
|
|
sections.append(f"CONTRE-ARGUMENTS\n{contre}")
|
|
|
|
# Références structurées ou ancien format string
|
|
refs = parsed.get("references")
|
|
if refs:
|
|
if isinstance(refs, list):
|
|
ref_lines = []
|
|
for r in refs:
|
|
if isinstance(r, dict):
|
|
doc = r.get("document", "")
|
|
page = r.get("page", "")
|
|
citation = r.get("citation", "")
|
|
ref_lines.append(f"- [{doc}, p.{page}] {citation}")
|
|
else:
|
|
ref_lines.append(f"- {r}")
|
|
if ref_lines:
|
|
sections.append(f"REFERENCES\n" + "\n".join(ref_lines))
|
|
else:
|
|
sections.append(f"REFERENCES\n{refs}")
|
|
|
|
conclusion = parsed.get("conclusion")
|
|
if conclusion:
|
|
sections.append(f"CONCLUSION\n{conclusion}")
|
|
|
|
# Avertissements
|
|
sections.extend(_format_warnings(categorized_warnings, ref_warnings))
|
|
|
|
return "\n\n".join(sections)
|
|
|
|
|
|
def _format_warnings(
|
|
categorized_warnings: list[str] | None = None,
|
|
ref_warnings: list[str] | None = None,
|
|
) -> list[str]:
|
|
"""Formate les avertissements qualité (partagé entre les deux formats)."""
|
|
sections: list[str] = []
|
|
if categorized_warnings:
|
|
critiques = [w for w in categorized_warnings if w.startswith("[CRITIQUE]")]
|
|
mineurs = [w for w in categorized_warnings if w.startswith("[MINEUR]")]
|
|
if critiques:
|
|
sections.append(
|
|
"AVERTISSEMENTS CRITIQUES\n" + "\n".join(f"- {w}" for w in critiques)
|
|
)
|
|
if mineurs:
|
|
sections.append(
|
|
"AVERTISSEMENTS MINEURS\n" + "\n".join(f"- {w}" for w in mineurs)
|
|
)
|
|
elif ref_warnings:
|
|
warning_text = "\n".join(f"- {w}" for w in ref_warnings)
|
|
sections.append(f"AVERTISSEMENT — REFERENCES NON VÉRIFIÉES\n{warning_text}")
|
|
return sections
|