"""App Flask — viewer CIM-10 T2A."""
from __future__ import annotations
import json
import logging
import re
from pathlib import Path
import requests
from flask import Flask, Response, abort, render_template, request, jsonify
from markupsafe import Markup
from werkzeug.utils import secure_filename
from collections import Counter
from ..config import (
ANONYMIZED_DIR, STRUCTURED_DIR, INPUT_DIR, REPORTS_DIR,
OLLAMA_URL, CCAM_DICT_PATH, DossierMedical,
ALLOWED_EXTENSIONS, UPLOAD_MAX_SIZE_MB,
CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CIM10_DICT_PATH, CIM10_SUPPLEMENTS_PATH,
)
from .. import config as cfg
from .referentiels import ReferentielManager
from .validation import ValidationManager
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def compute_group_stats(items: list[dict]) -> dict:
"""Calcule des statistiques agrégées pour un groupe de dossiers.
Returns:
{das_count, alertes_count, actes_count, cma_count}
"""
das_count = 0
alertes_count = 0
actes_count = 0
cma_count = 0
for item in items:
d = item["dossier"]
das_count += len(d.diagnostics_associes)
alertes_count += len(d.alertes_codage)
actes_count += len(d.actes_ccam)
for diag in d.diagnostics_associes:
if diag.est_cma:
cma_count += 1
if d.diagnostic_principal and d.diagnostic_principal.est_cma:
cma_count += 1
return {
"das_count": das_count,
"alertes_count": alertes_count,
"actes_count": actes_count,
"cma_count": cma_count,
}
def compute_dashboard_stats(groups: dict[str, list[dict]]) -> dict:
"""Calcule les statistiques globales du pipeline pour le dashboard."""
total_dossiers = len(groups)
total_fichiers = 0
total_das = 0
total_actes = 0
total_alertes = 0
total_cma = 0
total_cpam = 0
dp_confidence: Counter = Counter()
dp_validity: Counter = Counter()
code_counter: Counter = Counter()
ghm_types: Counter = Counter()
severity_dist: Counter = Counter()
processing_times: list[float] = []
for items in groups.values():
total_fichiers += len(items)
for item in items:
d = item["dossier"]
total_das += len(d.diagnostics_associes)
total_actes += len(d.actes_ccam)
total_alertes += len(d.alertes_codage)
total_cpam += len(d.controles_cpam)
if d.processing_time_s is not None:
processing_times.append(d.processing_time_s)
# DP confidence & validity
dp = d.diagnostic_principal
if dp:
conf = dp.cim10_confidence or "none"
dp_confidence[conf] += 1
if dp.cim10_suggestion:
dp_validity["valide"] += 1
code_counter[dp.cim10_suggestion] += 1
else:
dp_validity["absent"] += 1
else:
dp_confidence["none"] += 1
dp_validity["absent"] += 1
# DAS codes + CMA
for das in d.diagnostics_associes:
if das.cim10_suggestion:
code_counter[das.cim10_suggestion] += 1
if das.est_cma:
total_cma += 1
if dp and dp.est_cma:
total_cma += 1
# GHM
ghm = d.ghm_estimation
if ghm:
if ghm.type_ghm:
ghm_types[ghm.type_ghm] += 1
severity_dist[ghm.severite] += 1
top_codes = code_counter.most_common(15)
top_max = top_codes[0][1] if top_codes else 1
return {
"total_dossiers": total_dossiers,
"total_fichiers": total_fichiers,
"total_das": total_das,
"total_actes": total_actes,
"total_alertes": total_alertes,
"total_cma": total_cma,
"total_cpam": total_cpam,
"dp_confidence": dict(dp_confidence),
"dp_validity": dict(dp_validity),
"top_codes": top_codes,
"top_max": top_max,
"ghm_types": dict(ghm_types),
"severity_dist": dict(severity_dist),
"processing_time_total": sum(processing_times),
"processing_time_avg": sum(processing_times) / len(processing_times) if processing_times else 0,
}
def collect_cpam_controls(groups: dict[str, list[dict]]) -> list[dict]:
"""Collecte tous les contrôles CPAM de tous les dossiers."""
controls = []
for group_name, items in groups.items():
for item in items:
d = item["dossier"]
dp_code = d.diagnostic_principal.cim10_suggestion if d.diagnostic_principal else None
for ctrl in d.controles_cpam:
controls.append({
"group_name": group_name,
"filepath": item["path_rel"],
"ctrl": ctrl,
"dp_code": dp_code,
})
controls.sort(key=lambda c: c["ctrl"].numero_ogc)
return controls
def get_builtin_referentiels() -> list[dict]:
"""Retourne les infos sur les référentiels intégrés (PDFs + dicts)."""
rag_index_meta = Path(STRUCTURED_DIR).parent / "data" / "rag_index" / "metadata.json"
chunks_by_doc: dict[str, int] = {}
if rag_index_meta.exists():
try:
import json as _json
meta = _json.loads(rag_index_meta.read_text(encoding="utf-8"))
for m in meta:
doc = m.get("document", "")
chunks_by_doc[doc] = chunks_by_doc.get(doc, 0) + 1
except Exception:
pass
refs = []
builtin_sources = [
("CIM-10 FR 2026", CIM10_PDF, ".pdf", ["cim10", "cim10_alpha"]),
("Guide Méthodologique MCO 2026", GUIDE_METHODO_PDF, ".pdf", ["guide_methodo"]),
("CCAM 2025", CCAM_PDF, ".pdf", ["ccam"]),
("Dictionnaire CIM-10", CIM10_DICT_PATH, ".json", []),
("Suppléments CIM-10", CIM10_SUPPLEMENTS_PATH, ".json", []),
("Dictionnaire CCAM", CCAM_DICT_PATH, ".json", []),
]
for name, path, ext, doc_keys in builtin_sources:
size_mb = path.stat().st_size / (1024 * 1024) if path.exists() else 0
chunks = sum(chunks_by_doc.get(k, 0) for k in doc_keys)
refs.append({
"name": name,
"filename": path.name,
"extension": ext,
"size_mb": size_mb,
"chunks": chunks,
"exists": path.exists(),
})
return refs
def load_ccam_dict() -> dict[str, dict]:
"""Charge le dictionnaire CCAM pour les regroupements."""
if CCAM_DICT_PATH.exists():
try:
data = json.loads(CCAM_DICT_PATH.read_text(encoding="utf-8"))
return data
except Exception:
logger.warning("Impossible de charger le dictionnaire CCAM")
return {}
def scan_dossiers() -> dict[str, list[dict]]:
"""Scanne output/structured/ et retourne les fichiers groupés par sous-dossier.
Returns:
{"racine": [{name, path_rel, dossier}, ...], "sous-dossier": [...]}
Chaque groupe contient aussi une clé "stats" avec les compteurs agrégés.
"""
groups: dict[str, list[dict]] = {}
for json_path in sorted(STRUCTURED_DIR.rglob("*.json")):
rel = json_path.relative_to(STRUCTURED_DIR)
parts = rel.parts
if len(parts) == 1:
group_name = "racine"
else:
group_name = str(Path(*parts[:-1]))
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
dossier = DossierMedical.model_validate(data)
except Exception:
logger.warning("Impossible de charger %s", json_path)
continue
groups.setdefault(group_name, []).append({
"name": json_path.stem,
"path_rel": str(rel),
"dossier": dossier,
})
return groups
def load_dossier(path_rel: str) -> DossierMedical:
"""Charge un JSON et le désérialise. Vérifie contre le path traversal."""
safe_path = (STRUCTURED_DIR / path_rel).resolve()
if not safe_path.is_relative_to(STRUCTURED_DIR.resolve()):
abort(403)
if not safe_path.exists():
abort(404)
data = json.loads(safe_path.read_text(encoding="utf-8"))
return DossierMedical.model_validate(data)
def fetch_ollama_models() -> list[str]:
"""Appelle GET {OLLAMA_URL}/api/tags pour lister les modèles disponibles."""
try:
resp = requests.get(f"{cfg.OLLAMA_URL}/api/tags", timeout=5)
resp.raise_for_status()
models = resp.json().get("models", [])
return [m["name"] for m in models]
except Exception:
logger.warning("Impossible de contacter Ollama pour lister les modèles")
return []
# ---------------------------------------------------------------------------
# Filtres Jinja2
# ---------------------------------------------------------------------------
_CONFIDENCE_COLORS = {
"high": ("#16a34a", "#dcfce7"),
"medium": ("#ca8a04", "#fef9c3"),
"low": ("#dc2626", "#fee2e2"),
}
_CONFIDENCE_LABELS = {
"high": "Haute",
"medium": "Moyenne",
"low": "Basse",
}
def confidence_badge(value: str | None) -> Markup:
if not value:
return Markup("")
fg, bg = _CONFIDENCE_COLORS.get(value, ("#6b7280", "#f3f4f6"))
label = _CONFIDENCE_LABELS.get(value, value)
return Markup(
f''
f'{label}'
)
def confidence_label(value: str | None) -> str:
if not value:
return ""
return _CONFIDENCE_LABELS.get(value, value)
_SEVERITY_STYLES = {
"severe": ("Sévère", "#dc2626", "#fee2e2"),
"modere": ("Modéré", "#92400e", "#fef3c7"),
"leger": ("Léger", "#065f46", "#d1fae5"),
}
_CMA_LEVEL_STYLES = {
1: ("1", "#6b7280", "#f3f4f6"), # gris — pas CMA
2: ("2", "#065f46", "#d1fae5"), # vert
3: ("3", "#92400e", "#fef3c7"), # jaune/orange
4: ("4", "#dc2626", "#fee2e2"), # rouge
}
def format_duration(seconds: float | None) -> str:
"""Formate une durée en secondes vers un format lisible (ex: 2min 30s)."""
if seconds is None:
return ""
if seconds < 60:
return f"{seconds:.1f}s"
minutes = int(seconds // 60)
secs = int(seconds % 60)
if secs == 0:
return f"{minutes}min"
return f"{minutes}min {secs:02d}s"
def severity_badge(value: str | None) -> Markup:
if not value or value not in _SEVERITY_STYLES:
return Markup("")
label, fg, bg = _SEVERITY_STYLES[value]
return Markup(
f''
f'{label}'
)
def cma_level_badge(value: int | None) -> Markup:
"""Badge CMA niveau 1-4 avec couleurs graduées."""
if value is None or value < 1:
return Markup("")
level = min(value, 4)
label, fg, bg = _CMA_LEVEL_STYLES.get(level, _CMA_LEVEL_STYLES[1])
title = {1: "Pas CMA", 2: "CMA niveau 2", 3: "CMA niveau 3", 4: "CMA niveau 4"}.get(level, "")
return Markup(
f''
f'CMA {label}'
)
def format_dossier_name(name: str) -> str:
"""Retourne le nom complet du dossier (ex: 1_23096332)."""
if name == "racine":
return "Non classés"
return name
def format_doc_name(name: str) -> str:
"""Transforme un nom de fichier JSON en nom lisible."""
n = name.lower()
if "fusionne" in n:
return "Fusionné"
if n.startswith("cro") or n.startswith("crh"):
return name.split("_")[0].upper()
if "trackare" in n:
return "Trackare"
if "anapath" in n:
return "Anapath"
return name
def format_cpam_text(text: str | None) -> Markup:
"""Convertit un texte CPAM (section) en HTML avec puces et paragraphes."""
if not text:
return Markup("")
from markupsafe import escape
lines = str(text).split("\n")
html_parts: list[str] = []
in_list = False
for line in lines:
stripped = line.strip()
if not stripped:
if in_list:
html_parts.append("")
in_list = False
html_parts.append("
")
continue
if stripped.startswith("- "):
if not in_list:
html_parts.append("
{escape(stripped)}
") if in_list: html_parts.append("") return Markup("\n".join(html_parts)) # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- def create_app() -> Flask: app = Flask(__name__) app.jinja_env.filters["confidence_badge"] = confidence_badge app.jinja_env.filters["confidence_label"] = confidence_label app.jinja_env.filters["severity_badge"] = severity_badge app.jinja_env.filters["cma_level_badge"] = cma_level_badge app.jinja_env.filters["format_duration"] = format_duration app.jinja_env.filters["format_dossier_name"] = format_dossier_name app.jinja_env.filters["format_doc_name"] = format_doc_name app.jinja_env.filters["format_cpam_text"] = format_cpam_text ccam_dict = load_ccam_dict() @app.route("/") def index(): groups = scan_dossiers() group_stats = {name: compute_group_stats(items) for name, items in groups.items()} return render_template("index.html", groups=groups, group_stats=group_stats) @app.route("/dossier/