"""App Flask — viewer CIM-10 T2A."""
from __future__ import annotations
import json
import logging
import re
from pathlib import Path
import requests
from flask import Flask, abort, render_template, request, jsonify
from markupsafe import Markup
from werkzeug.utils import secure_filename
from collections import Counter
from ..config import (
STRUCTURED_DIR, OLLAMA_URL, CCAM_DICT_PATH, DossierMedical,
ALLOWED_EXTENSIONS, UPLOAD_MAX_SIZE_MB,
CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CIM10_DICT_PATH, CIM10_SUPPLEMENTS_PATH,
)
from .. import config as cfg
from .referentiels import ReferentielManager
from .validation import ValidationManager
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def compute_group_stats(items: list[dict]) -> dict:
"""Calcule des statistiques agrégées pour un groupe de dossiers.
Returns:
{das_count, alertes_count, actes_count, cma_count}
"""
das_count = 0
alertes_count = 0
actes_count = 0
cma_count = 0
for item in items:
d = item["dossier"]
das_count += len(d.diagnostics_associes)
alertes_count += len(d.alertes_codage)
actes_count += len(d.actes_ccam)
for diag in d.diagnostics_associes:
if diag.est_cma:
cma_count += 1
if d.diagnostic_principal and d.diagnostic_principal.est_cma:
cma_count += 1
return {
"das_count": das_count,
"alertes_count": alertes_count,
"actes_count": actes_count,
"cma_count": cma_count,
}
def compute_dashboard_stats(groups: dict[str, list[dict]]) -> dict:
"""Calcule les statistiques globales du pipeline pour le dashboard."""
total_dossiers = len(groups)
total_fichiers = 0
total_das = 0
total_actes = 0
total_alertes = 0
total_cma = 0
total_cpam = 0
dp_confidence: Counter = Counter()
dp_validity: Counter = Counter()
code_counter: Counter = Counter()
ghm_types: Counter = Counter()
severity_dist: Counter = Counter()
processing_times: list[float] = []
for items in groups.values():
total_fichiers += len(items)
for item in items:
d = item["dossier"]
total_das += len(d.diagnostics_associes)
total_actes += len(d.actes_ccam)
total_alertes += len(d.alertes_codage)
total_cpam += len(d.controles_cpam)
if d.processing_time_s is not None:
processing_times.append(d.processing_time_s)
# DP confidence & validity
dp = d.diagnostic_principal
if dp:
conf = dp.cim10_confidence or "none"
dp_confidence[conf] += 1
if dp.cim10_suggestion:
dp_validity["valide"] += 1
code_counter[dp.cim10_suggestion] += 1
else:
dp_validity["absent"] += 1
else:
dp_confidence["none"] += 1
dp_validity["absent"] += 1
# DAS codes + CMA
for das in d.diagnostics_associes:
if das.cim10_suggestion:
code_counter[das.cim10_suggestion] += 1
if das.est_cma:
total_cma += 1
if dp and dp.est_cma:
total_cma += 1
# GHM
ghm = d.ghm_estimation
if ghm:
if ghm.type_ghm:
ghm_types[ghm.type_ghm] += 1
severity_dist[ghm.severite] += 1
top_codes = code_counter.most_common(15)
top_max = top_codes[0][1] if top_codes else 1
return {
"total_dossiers": total_dossiers,
"total_fichiers": total_fichiers,
"total_das": total_das,
"total_actes": total_actes,
"total_alertes": total_alertes,
"total_cma": total_cma,
"total_cpam": total_cpam,
"dp_confidence": dict(dp_confidence),
"dp_validity": dict(dp_validity),
"top_codes": top_codes,
"top_max": top_max,
"ghm_types": dict(ghm_types),
"severity_dist": dict(severity_dist),
"processing_time_total": sum(processing_times),
"processing_time_avg": sum(processing_times) / len(processing_times) if processing_times else 0,
}
def collect_cpam_controls(groups: dict[str, list[dict]]) -> list[dict]:
"""Collecte tous les contrôles CPAM de tous les dossiers."""
controls = []
for group_name, items in groups.items():
for item in items:
d = item["dossier"]
dp_code = d.diagnostic_principal.cim10_suggestion if d.diagnostic_principal else None
for ctrl in d.controles_cpam:
controls.append({
"group_name": group_name,
"filepath": item["path_rel"],
"ctrl": ctrl,
"dp_code": dp_code,
})
controls.sort(key=lambda c: c["ctrl"].numero_ogc)
return controls
def get_builtin_referentiels() -> list[dict]:
"""Retourne les infos sur les référentiels intégrés (PDFs + dicts)."""
rag_index_meta = Path(STRUCTURED_DIR).parent / "data" / "rag_index" / "metadata.json"
chunks_by_doc: dict[str, int] = {}
if rag_index_meta.exists():
try:
import json as _json
meta = _json.loads(rag_index_meta.read_text(encoding="utf-8"))
for m in meta:
doc = m.get("document", "")
chunks_by_doc[doc] = chunks_by_doc.get(doc, 0) + 1
except Exception:
pass
refs = []
builtin_sources = [
("CIM-10 FR 2026", CIM10_PDF, ".pdf", ["cim10", "cim10_alpha"]),
("Guide Méthodologique MCO 2026", GUIDE_METHODO_PDF, ".pdf", ["guide_methodo"]),
("CCAM 2025", CCAM_PDF, ".pdf", ["ccam"]),
("Dictionnaire CIM-10", CIM10_DICT_PATH, ".json", []),
("Suppléments CIM-10", CIM10_SUPPLEMENTS_PATH, ".json", []),
("Dictionnaire CCAM", CCAM_DICT_PATH, ".json", []),
]
for name, path, ext, doc_keys in builtin_sources:
size_mb = path.stat().st_size / (1024 * 1024) if path.exists() else 0
chunks = sum(chunks_by_doc.get(k, 0) for k in doc_keys)
refs.append({
"name": name,
"filename": path.name,
"extension": ext,
"size_mb": size_mb,
"chunks": chunks,
"exists": path.exists(),
})
return refs
def load_ccam_dict() -> dict[str, dict]:
"""Charge le dictionnaire CCAM pour les regroupements."""
if CCAM_DICT_PATH.exists():
try:
data = json.loads(CCAM_DICT_PATH.read_text(encoding="utf-8"))
return data
except Exception:
logger.warning("Impossible de charger le dictionnaire CCAM")
return {}
def scan_dossiers() -> dict[str, list[dict]]:
"""Scanne output/structured/ et retourne les fichiers groupés par sous-dossier.
Returns:
{"racine": [{name, path_rel, dossier}, ...], "sous-dossier": [...]}
Chaque groupe contient aussi une clé "stats" avec les compteurs agrégés.
"""
groups: dict[str, list[dict]] = {}
for json_path in sorted(STRUCTURED_DIR.rglob("*.json")):
rel = json_path.relative_to(STRUCTURED_DIR)
parts = rel.parts
if len(parts) == 1:
group_name = "racine"
else:
group_name = str(Path(*parts[:-1]))
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
dossier = DossierMedical.model_validate(data)
except Exception:
logger.warning("Impossible de charger %s", json_path)
continue
groups.setdefault(group_name, []).append({
"name": json_path.stem,
"path_rel": str(rel),
"dossier": dossier,
})
return groups
def load_dossier(path_rel: str) -> DossierMedical:
"""Charge un JSON et le désérialise. Vérifie contre le path traversal."""
safe_path = (STRUCTURED_DIR / path_rel).resolve()
if not safe_path.is_relative_to(STRUCTURED_DIR.resolve()):
abort(403)
if not safe_path.exists():
abort(404)
data = json.loads(safe_path.read_text(encoding="utf-8"))
return DossierMedical.model_validate(data)
def fetch_ollama_models() -> list[str]:
"""Appelle GET {OLLAMA_URL}/api/tags pour lister les modèles disponibles."""
try:
resp = requests.get(f"{cfg.OLLAMA_URL}/api/tags", timeout=5)
resp.raise_for_status()
models = resp.json().get("models", [])
return [m["name"] for m in models]
except Exception:
logger.warning("Impossible de contacter Ollama pour lister les modèles")
return []
# ---------------------------------------------------------------------------
# Filtres Jinja2
# ---------------------------------------------------------------------------
_CONFIDENCE_COLORS = {
"high": ("#16a34a", "#dcfce7"),
"medium": ("#ca8a04", "#fef9c3"),
"low": ("#dc2626", "#fee2e2"),
}
_CONFIDENCE_LABELS = {
"high": "Haute",
"medium": "Moyenne",
"low": "Basse",
}
def confidence_badge(value: str | None) -> Markup:
if not value:
return Markup("")
fg, bg = _CONFIDENCE_COLORS.get(value, ("#6b7280", "#f3f4f6"))
label = _CONFIDENCE_LABELS.get(value, value)
return Markup(
f''
f'{label}'
)
def confidence_label(value: str | None) -> str:
if not value:
return ""
return _CONFIDENCE_LABELS.get(value, value)
_SEVERITY_STYLES = {
"severe": ("Sévère", "#dc2626", "#fee2e2"),
"modere": ("Modéré", "#92400e", "#fef3c7"),
"leger": ("Léger", "#065f46", "#d1fae5"),
}
_CMA_LEVEL_STYLES = {
1: ("1", "#6b7280", "#f3f4f6"), # gris — pas CMA
2: ("2", "#065f46", "#d1fae5"), # vert
3: ("3", "#92400e", "#fef3c7"), # jaune/orange
4: ("4", "#dc2626", "#fee2e2"), # rouge
}
def format_duration(seconds: float | None) -> str:
"""Formate une durée en secondes vers un format lisible (ex: 2min 30s)."""
if seconds is None:
return ""
if seconds < 60:
return f"{seconds:.1f}s"
minutes = int(seconds // 60)
secs = int(seconds % 60)
if secs == 0:
return f"{minutes}min"
return f"{minutes}min {secs:02d}s"
def severity_badge(value: str | None) -> Markup:
if not value or value not in _SEVERITY_STYLES:
return Markup("")
label, fg, bg = _SEVERITY_STYLES[value]
return Markup(
f''
f'{label}'
)
def cma_level_badge(value: int | None) -> Markup:
"""Badge CMA niveau 1-4 avec couleurs graduées."""
if value is None or value < 1:
return Markup("")
level = min(value, 4)
label, fg, bg = _CMA_LEVEL_STYLES.get(level, _CMA_LEVEL_STYLES[1])
title = {1: "Pas CMA", 2: "CMA niveau 2", 3: "CMA niveau 3", 4: "CMA niveau 4"}.get(level, "")
return Markup(
f''
f'CMA {label}'
)
def format_dossier_name(name: str) -> str:
"""Retourne le nom complet du dossier (ex: 1_23096332)."""
if name == "racine":
return "Non classés"
return name
def format_doc_name(name: str) -> str:
"""Transforme un nom de fichier JSON en nom lisible."""
n = name.lower()
if "fusionne" in n:
return "Fusionné"
if n.startswith("cro") or n.startswith("crh"):
return name.split("_")[0].upper()
if "trackare" in n:
return "Trackare"
if "anapath" in n:
return "Anapath"
return name
# ---------------------------------------------------------------------------
# App factory
# ---------------------------------------------------------------------------
def create_app() -> Flask:
app = Flask(__name__)
app.jinja_env.filters["confidence_badge"] = confidence_badge
app.jinja_env.filters["confidence_label"] = confidence_label
app.jinja_env.filters["severity_badge"] = severity_badge
app.jinja_env.filters["cma_level_badge"] = cma_level_badge
app.jinja_env.filters["format_duration"] = format_duration
app.jinja_env.filters["format_dossier_name"] = format_dossier_name
app.jinja_env.filters["format_doc_name"] = format_doc_name
ccam_dict = load_ccam_dict()
@app.route("/")
def index():
groups = scan_dossiers()
group_stats = {name: compute_group_stats(items) for name, items in groups.items()}
return render_template("index.html", groups=groups, group_stats=group_stats)
@app.route("/dossier/")
def detail(filepath: str):
dossier = load_dossier(filepath)
# Trouver les fichiers du même groupe pour la navigation
groups = scan_dossiers()
siblings = []
current_group = None
rel_parts = Path(filepath).parts
if len(rel_parts) > 1:
current_group = str(Path(*rel_parts[:-1]))
siblings = groups.get(current_group, [])
return render_template(
"detail.html",
dossier=dossier,
filepath=filepath,
ccam_dict=ccam_dict,
siblings=siblings,
current_group=current_group,
)
@app.route("/dashboard")
def dashboard():
groups = scan_dossiers()
stats = compute_dashboard_stats(groups)
return render_template("dashboard.html", stats=stats, groups=groups)
@app.route("/cpam")
def cpam_list():
groups = scan_dossiers()
controls = collect_cpam_controls(groups)
return render_template("cpam.html", controls=controls, total=len(controls), groups=groups)
@app.route("/admin/models", methods=["GET"])
def list_models():
models = fetch_ollama_models()
return jsonify({"models": models, "current": cfg.OLLAMA_MODEL})
@app.route("/admin/models", methods=["POST"])
def set_model():
data = request.get_json(silent=True) or {}
new_model = data.get("model", "").strip()
if not new_model:
return jsonify({"error": "Champ 'model' requis"}), 400
cfg.OLLAMA_MODEL = new_model
logger.info("Modèle Ollama changé : %s", new_model)
return jsonify({"ok": True, "model": cfg.OLLAMA_MODEL})
@app.route("/reprocess/", methods=["POST"])
def reprocess(filepath: str):
"""Relance le traitement d'un dossier."""
from ..main import process_pdf, write_outputs
dossier = load_dossier(filepath)
source_file = dossier.source_file
if not source_file:
return jsonify({"error": "Fichier source introuvable"}), 400
# Chercher le PDF source dans input/
input_dir = Path(__file__).parent.parent.parent / "input"
pdf_path = None
for p in input_dir.rglob(source_file):
if p.is_file():
pdf_path = p
break
if not pdf_path:
return jsonify({"error": f"PDF source '{source_file}' introuvable"}), 404
try:
pdf_results = process_pdf(pdf_path)
stem = pdf_path.stem.replace(" ", "_")
subdir = None
if pdf_path.parent != input_dir:
subdir = pdf_path.parent.name
multi = len(pdf_results) > 1
for part_idx, (anonymized_text, new_dossier, report) in enumerate(pdf_results):
part_stem = f"{stem}_part{part_idx + 1}" if multi else stem
write_outputs(part_stem, anonymized_text, new_dossier, report, subdir=subdir)
return jsonify({"ok": True, "message": f"Traitement terminé ({len(pdf_results)} dossier(s))"})
except Exception as e:
logger.exception("Erreur lors du retraitement")
return jsonify({"error": str(e)}), 500
# ------------------------------------------------------------------
# Routes admin référentiels
# ------------------------------------------------------------------
ref_manager = ReferentielManager()
@app.route("/admin/referentiels")
def admin_referentiels():
refs = ref_manager.list_all()
builtin = get_builtin_referentiels()
return render_template("admin_referentiels.html", referentiels=refs, builtin_refs=builtin, max_size=UPLOAD_MAX_SIZE_MB)
@app.route("/admin/referentiels/upload", methods=["POST"])
def upload_referentiel():
if "file" not in request.files:
return jsonify({"error": "Aucun fichier envoyé"}), 400
f = request.files["file"]
if not f.filename:
return jsonify({"error": "Nom de fichier vide"}), 400
filename = secure_filename(f.filename)
try:
file_data = f.read()
ref = ref_manager.add_file(filename, file_data)
return jsonify({"ok": True, "referentiel": ref})
except ValueError as e:
return jsonify({"error": str(e)}), 400
@app.route("/admin/referentiels//index", methods=["POST"])
def index_referentiel(ref_id: str):
try:
count = ref_manager.index_referentiel(ref_id)
return jsonify({"ok": True, "chunks": count})
except ValueError as e:
return jsonify({"error": str(e)}), 404
except Exception as e:
logger.exception("Erreur lors de l'indexation du référentiel %s", ref_id)
return jsonify({"error": str(e)}), 500
@app.route("/admin/referentiels/", methods=["DELETE"])
def delete_referentiel(ref_id: str):
if ref_manager.remove(ref_id):
return jsonify({"ok": True})
return jsonify({"error": "Référentiel introuvable"}), 404
@app.route("/admin/referentiels/rebuild-index", methods=["POST"])
def rebuild_index():
try:
from ..medical.rag_index import build_index
build_index(force=True)
# Réindexer tous les référentiels actifs
reindexed = 0
for ref in ref_manager.list_all():
if ref["status"] == "indexed":
ref_manager.index_referentiel(ref["id"])
reindexed += 1
return jsonify({"ok": True, "reindexed": reindexed})
except Exception as e:
logger.exception("Erreur lors du rebuild de l'index")
return jsonify({"error": str(e)}), 500
# ------------------------------------------------------------------
# Routes validation DIM
# ------------------------------------------------------------------
val_manager = ValidationManager()
@app.route("/validation")
def validation_list():
groups = scan_dossiers()
selection = val_manager.load_selection()
annotations = {a["dossier_id"]: a for a in val_manager.list_annotations()}
# Construire la liste enrichie
items = []
for dossier_id in selection:
annot = annotations.get(dossier_id, {})
# Trouver les données pipeline
parts = dossier_id.split("/")
group_name = parts[0] if parts else ""
group_items = groups.get(group_name, [])
pipeline = None
for gi in group_items:
if "fusionne" in gi["name"]:
pipeline = gi
break
if not pipeline and group_items:
pipeline = group_items[0]
d = pipeline["dossier"] if pipeline else None
items.append({
"dossier_id": dossier_id,
"group_name": group_name,
"dp_code": d.diagnostic_principal.cim10_suggestion if d and d.diagnostic_principal else "",
"dp_texte": d.diagnostic_principal.texte if d and d.diagnostic_principal else "",
"dp_confidence": d.diagnostic_principal.cim10_confidence if d and d.diagnostic_principal else "",
"nb_das": len(d.diagnostics_associes) if d else 0,
"has_cpam": bool(d and d.controles_cpam),
"statut": annot.get("statut", "non_commence"),
"validateur": annot.get("validateur", ""),
"date_validation": annot.get("date_validation", ""),
})
total = len(items)
valides = sum(1 for i in items if i["statut"] == "valide")
en_cours = sum(1 for i in items if i["statut"] == "en_cours")
return render_template(
"validation_list.html",
items=items,
total=total,
valides=valides,
en_cours=en_cours,
groups=groups,
)
@app.route("/validation/")
def validation_detail(dossier_id: str):
groups = scan_dossiers()
# Charger l'annotation
annotation = val_manager.load_annotation(dossier_id)
if not annotation:
abort(404)
# Charger les données pipeline
parts = dossier_id.split("/")
group_name = parts[0] if parts else ""
group_items = groups.get(group_name, [])
pipeline = None
for gi in group_items:
if "fusionne" in gi["name"]:
pipeline = gi
break
if not pipeline and group_items:
pipeline = group_items[0]
dossier = pipeline["dossier"] if pipeline else None
# Navigation : dossier précédent / suivant
selection = val_manager.load_selection()
current_idx = selection.index(dossier_id) if dossier_id in selection else -1
prev_id = selection[current_idx - 1] if current_idx > 0 else None
next_id = selection[current_idx + 1] if current_idx < len(selection) - 1 else None
return render_template(
"validation_detail.html",
annotation=annotation,
dossier=dossier,
dossier_id=dossier_id,
group_name=group_name,
prev_id=prev_id,
next_id=next_id,
groups=groups,
)
@app.route("/api/validation/save", methods=["POST"])
def api_validation_save():
data = request.get_json(silent=True)
if not data or "dossier_id" not in data:
return jsonify({"error": "dossier_id requis"}), 400
dossier_id = data["dossier_id"]
# Vérifier que le dossier fait partie de la sélection
selection = val_manager.load_selection()
if selection and dossier_id not in selection:
return jsonify({"error": "Dossier non sélectionné pour validation"}), 403
try:
val_manager.save_annotation(dossier_id, data)
return jsonify({"ok": True})
except Exception as e:
logger.exception("Erreur sauvegarde annotation %s", dossier_id)
return jsonify({"error": str(e)}), 500
@app.route("/api/cim10/search")
def api_cim10_search():
from ..medical.cim10_dict import load_dict, normalize_text
q = request.args.get("q", "").strip()
if len(q) < 2:
return jsonify({"results": []})
cim10 = load_dict()
q_norm = normalize_text(q)
q_upper = q.upper().strip()
results = []
# Recherche par code exact d'abord
for code, label in cim10.items():
if code.upper().startswith(q_upper):
results.append({"code": code, "label": label})
if len(results) >= 20:
break
# Puis recherche par texte normalisé
if len(results) < 20:
for code, label in cim10.items():
if any(r["code"] == code for r in results):
continue
if q_norm in normalize_text(label):
results.append({"code": code, "label": label})
if len(results) >= 20:
break
return jsonify({"results": results})
@app.route("/validation/metrics")
def validation_metrics():
groups = scan_dossiers()
metrics = val_manager.compute_metrics(groups)
selection = val_manager.load_selection()
return render_template(
"validation_metrics.html",
metrics=metrics,
total_selection=len(selection),
groups=groups,
)
return app