feat: pass LLM hybride pour DAS + interface admin référentiels RAG

Chantier 1 — Extraction DAS par LLM :
- Nouveau prompt expert DIM dans rag_search.py (extract_das_llm)
- Phase 4 dans cim10_extractor.py : détection DAS supplémentaires avant enrichissement RAG
- Cache persistant (clé hash du texte), validation CIM-10, déduplication
- Activé uniquement avec use_rag=True (--no-rag le désactive)

Chantier 2 — Admin référentiels :
- Config : REFERENTIELS_DIR, UPLOAD_MAX_SIZE_MB, ALLOWED_EXTENSIONS
- Chunking générique (PDF/CSV/Excel/TXT) + ajout incrémental FAISS dans rag_index.py
- ReferentielManager CRUD dans viewer/referentiels.py
- 5 routes Flask (listing, upload, indexation, suppression, rebuild)
- Template admin avec tableau interactif + lien sidebar

Fix : if cache → if cache is not None (OllamaCache vide évaluait à False)

410 tests passent (27 nouveaux, 0 régression).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-12 23:12:39 +01:00
parent bf92a0ce3e
commit f44216b95b
10 changed files with 1197 additions and 6 deletions

View File

@@ -46,6 +46,9 @@ NUM_UM = "0000"
# --- Configuration RAG --- # --- Configuration RAG ---
RAG_INDEX_DIR = BASE_DIR / "data" / "rag_index" RAG_INDEX_DIR = BASE_DIR / "data" / "rag_index"
REFERENTIELS_DIR = BASE_DIR / "data" / "referentiels"
UPLOAD_MAX_SIZE_MB = 50
ALLOWED_EXTENSIONS = {".pdf", ".csv", ".xlsx", ".xls", ".txt"}
CIM10_DICT_PATH = BASE_DIR / "data" / "cim10_dict.json" CIM10_DICT_PATH = BASE_DIR / "data" / "cim10_dict.json"
CCAM_DICT_PATH = BASE_DIR / "data" / "ccam_dict.json" CCAM_DICT_PATH = BASE_DIR / "data" / "ccam_dict.json"
CIM10_PDF = Path("/home/dom/ai/aivanov_CIM/cim-10-fr_2026_a_usage_pmsi_version_provisoire_111225.pdf") CIM10_PDF = Path("/home/dom/ai/aivanov_CIM/cim-10-fr_2026_a_usage_pmsi_version_provisoire_111225.pdf")

View File

@@ -112,6 +112,10 @@ def extract_medical_info(
_extract_imagerie(anonymized_text, dossier) _extract_imagerie(anonymized_text, dossier)
_extract_complications(anonymized_text, dossier, edsnlp_result) _extract_complications(anonymized_text, dossier, edsnlp_result)
# Phase 4 : pass LLM pour détecter des DAS supplémentaires
if use_rag:
_extract_das_llm(anonymized_text, dossier)
if use_rag: if use_rag:
_enrich_with_rag(dossier) _enrich_with_rag(dossier)
@@ -133,6 +137,79 @@ def extract_medical_info(
return dossier return dossier
def _extract_das_llm(text: str, dossier: DossierMedical) -> None:
"""Extrait des DAS supplémentaires via un pass LLM (avant enrichissement RAG)."""
try:
from .rag_search import extract_das_llm
from .ollama_cache import OllamaCache
from ..config import OLLAMA_CACHE_PATH, OLLAMA_MODEL
except ImportError:
logger.warning("Module RAG non disponible pour l'extraction DAS LLM")
return
try:
cache = OllamaCache(OLLAMA_CACHE_PATH, OLLAMA_MODEL)
# Construire le contexte
contexte = {
"sexe": dossier.sejour.sexe,
"age": dossier.sejour.age,
"duree_sejour": dossier.sejour.duree_sejour,
"imc": dossier.sejour.imc,
"antecedents": dossier.antecedents[:5],
"biologie_cle": [(b.test, b.valeur, b.anomalie) for b in dossier.biologie_cle],
"imagerie": [(i.type, (i.conclusion or "")[:200]) for i in dossier.imagerie],
"complications": dossier.complications,
}
# DAS existants (texte + code)
existing_das = []
existing_codes = set()
if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
label = d.texte
if d.cim10_suggestion:
label += f" ({d.cim10_suggestion})"
existing_codes.add(d.cim10_suggestion)
existing_das.append(label)
dp_texte = dossier.diagnostic_principal.texte if dossier.diagnostic_principal else ""
das_results = extract_das_llm(text, contexte, existing_das, dp_texte, cache=cache)
added = 0
for das in das_results:
texte = clean_diagnostic_text(das.get("texte", ""))
if not texte or not is_valid_diagnostic_text(texte):
continue
code = das.get("code_cim10")
if code:
code = normalize_code(code)
is_valid, _ = cim10_validate(code)
if not is_valid:
logger.info("DAS LLM : code %s invalide pour « %s », ignoré", code, texte)
continue
if code in existing_codes:
continue
existing_codes.add(code)
dossier.diagnostics_associes.append(Diagnostic(
texte=texte,
cim10_suggestion=code,
justification=das.get("justification"),
))
added += 1
if added:
logger.info("DAS LLM : %d diagnostics supplémentaires ajoutés", added)
cache.save()
except Exception:
logger.warning("Erreur lors de l'extraction DAS LLM", exc_info=True)
def _enrich_with_rag(dossier: DossierMedical) -> None: def _enrich_with_rag(dossier: DossierMedical) -> None:
"""Enrichit les diagnostics via le RAG (FAISS + Ollama).""" """Enrichit les diagnostics via le RAG (FAISS + Ollama)."""
try: try:

View File

@@ -11,7 +11,7 @@ from typing import Optional
import pdfplumber import pdfplumber
from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CCAM_DICT_PATH from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CCAM_DICT_PATH, REFERENTIELS_DIR
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -482,3 +482,183 @@ def get_index() -> tuple | None:
logger.info("Index FAISS chargé : %d vecteurs", _faiss_index.ntotal) logger.info("Index FAISS chargé : %d vecteurs", _faiss_index.ntotal)
return _faiss_index, _metadata return _faiss_index, _metadata
# ---------------------------------------------------------------------------
# Chunking générique pour fichiers utilisateur (référentiels)
# ---------------------------------------------------------------------------
def chunk_user_file(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un fichier utilisateur en chunks pour indexation FAISS.
Dispatch selon l'extension :
- PDF : pages groupées par 2
- CSV/Excel : une ligne = un chunk
- TXT : paragraphes (blocs séparés par lignes vides)
Args:
file_path: Chemin du fichier.
doc_name: Nom du document (utilisé comme identifiant dans les métadonnées).
Returns:
Liste de Chunk prêts pour l'indexation.
"""
suffix = file_path.suffix.lower()
if suffix == ".pdf":
return _chunk_user_pdf(file_path, doc_name)
elif suffix in (".csv", ".xlsx", ".xls"):
return _chunk_user_tabular(file_path, doc_name)
elif suffix == ".txt":
return _chunk_user_txt(file_path, doc_name)
else:
logger.warning("Extension non supportée pour chunking : %s", suffix)
return []
def _chunk_user_pdf(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un PDF utilisateur en chunks de 2 pages."""
chunks: list[Chunk] = []
try:
with pdfplumber.open(file_path) as pdf:
page_texts: list[str] = []
start_page = 1
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text:
page_texts.append(text)
if len(page_texts) >= 2:
combined = "\n".join(page_texts)
if len(combined.split()) >= 10:
chunks.append(Chunk(
text=combined,
document=doc_name,
page=start_page,
))
page_texts = []
start_page = page_num + 1
if page_texts:
combined = "\n".join(page_texts)
if len(combined.split()) >= 10:
chunks.append(Chunk(
text=combined,
document=doc_name,
page=start_page,
))
except Exception:
logger.warning("Erreur lors du chunking PDF %s", file_path, exc_info=True)
logger.info("Référentiel PDF %s : %d chunks", doc_name, len(chunks))
return chunks
def _chunk_user_tabular(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un CSV/Excel : une ligne = un chunk."""
chunks: list[Chunk] = []
try:
import pandas as pd
suffix = file_path.suffix.lower()
if suffix == ".csv":
df = pd.read_csv(file_path, encoding="utf-8", on_bad_lines="skip")
else:
df = pd.read_excel(file_path)
for idx, row in df.iterrows():
text = " | ".join(str(v) for v in row.values if pd.notna(v))
if len(text.split()) >= 3:
chunks.append(Chunk(
text=text,
document=doc_name,
page=int(idx) + 1,
))
except Exception:
logger.warning("Erreur lors du chunking tabular %s", file_path, exc_info=True)
logger.info("Référentiel tabular %s : %d chunks", doc_name, len(chunks))
return chunks
def _chunk_user_txt(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un fichier TXT en paragraphes (blocs séparés par lignes vides)."""
chunks: list[Chunk] = []
try:
text = file_path.read_text(encoding="utf-8")
paragraphs = re.split(r"\n\s*\n", text)
for i, para in enumerate(paragraphs):
para = para.strip()
if len(para.split()) >= 5:
chunks.append(Chunk(
text=para,
document=doc_name,
page=i + 1,
))
except Exception:
logger.warning("Erreur lors du chunking TXT %s", file_path, exc_info=True)
logger.info("Référentiel TXT %s : %d chunks", doc_name, len(chunks))
return chunks
def add_chunks_to_index(chunks: list[Chunk]) -> int:
"""Ajoute des chunks à l'index FAISS existant (incrémental).
Charge l'index si nécessaire, encode les chunks, ajoute les vecteurs,
et sauvegarde le tout.
Args:
chunks: Liste de Chunk à ajouter.
Returns:
Nombre de chunks effectivement ajoutés.
"""
if not chunks:
return 0
import faiss
import numpy as np
from .rag_search import _get_embed_model
index_path = RAG_INDEX_DIR / "faiss.index"
meta_path = RAG_INDEX_DIR / "metadata.json"
# Charger l'index existant ou en créer un nouveau
if index_path.exists() and meta_path.exists():
faiss_idx = faiss.read_index(str(index_path))
metadata = json.loads(meta_path.read_text(encoding="utf-8"))
else:
model = _get_embed_model()
# Obtenir la dimension via un encodage test
test_vec = model.encode(["test"], normalize_embeddings=True)
dim = test_vec.shape[1]
faiss_idx = faiss.IndexFlatIP(dim)
metadata = []
# Encoder les nouveaux chunks
model = _get_embed_model()
texts = [c.text[:2000] for c in chunks]
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=64)
embeddings = np.array(embeddings, dtype=np.float32)
# Ajouter à l'index
faiss_idx.add(embeddings)
# Ajouter les métadonnées
from dataclasses import asdict
for chunk in chunks:
meta = asdict(chunk)
meta["extrait"] = meta.pop("text")[:800]
metadata.append(meta)
# Sauvegarder
RAG_INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(faiss_idx, str(index_path))
meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
# Invalider le singleton pour forcer le rechargement
reset_index()
logger.info("Index FAISS : %d chunks ajoutés (total : %d)", len(chunks), faiss_idx.ntotal)
return len(chunks)
def reset_index() -> None:
"""Invalide le singleton FAISS pour forcer le rechargement au prochain accès."""
global _faiss_index, _metadata
_faiss_index = None
_metadata = []

View File

@@ -473,6 +473,101 @@ def enrich_acte(acte: ActeCCAM, contexte: dict, cache: OllamaCache | None = None
logger.info("Ollama non disponible — sources FAISS CCAM conservées sans justification LLM") logger.info("Ollama non disponible — sources FAISS CCAM conservées sans justification LLM")
def _build_prompt_das_extraction(text: str, contexte: dict, existing_das: list[str], dp_texte: str) -> str:
"""Construit le prompt pour l'extraction LLM de DAS supplémentaires."""
ctx_str = _format_contexte(contexte)
existing_str = "\n".join(f"- {d}" for d in existing_das) if existing_das else "Aucun"
return f"""Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI.
Analyse le texte médical suivant et identifie les diagnostics associés significatifs (DAS) qui n'ont PAS encore été codés.
RÈGLES IMPÉRATIVES :
- Un DAS doit avoir mobilisé des ressources supplémentaires pendant le séjour
- Ne PAS proposer de doublons avec les DAS déjà codés ci-dessous
- Ne PAS proposer le diagnostic principal comme DAS
- Ne PAS coder les symptômes (R00-R99) si un diagnostic précis les explique
- Ne PAS coder les antécédents non pertinents pour le séjour
- Privilégie les codes CIM-10 les plus SPÉCIFIQUES (4e ou 5e caractère)
- Ne propose que des diagnostics CLAIREMENT mentionnés dans le texte
DIAGNOSTIC PRINCIPAL : {dp_texte or "Non identifié"}
DAS DÉJÀ CODÉS :
{existing_str}
CONTEXTE CLINIQUE :
{ctx_str}
TEXTE MÉDICAL :
{text[:4000]}
Réponds UNIQUEMENT avec un objet JSON au format suivant, sans aucun texte avant ou après :
{{
"diagnostics_supplementaires": [
{{
"texte": "description du diagnostic",
"code_cim10": "X99.9",
"justification": "pourquoi ce DAS est pertinent pour le séjour"
}}
]
}}
Si aucun DAS supplémentaire n'est pertinent, retourne : {{"diagnostics_supplementaires": []}}"""
def extract_das_llm(
text: str,
contexte: dict,
existing_das: list[str],
dp_texte: str,
cache: OllamaCache | None = None,
) -> list[dict]:
"""Extrait des DAS supplémentaires via un pass LLM.
Args:
text: Texte médical complet.
contexte: Contexte patient (sexe, age, etc.).
existing_das: Liste des DAS déjà codés (texte + code).
dp_texte: Texte du diagnostic principal.
cache: Cache Ollama optionnel.
Returns:
Liste de dicts {texte, code_cim10, justification} pour les DAS détectés.
"""
import hashlib
# Clé de cache basée sur le hash du texte
text_hash = hashlib.md5(text[:4000].encode()).hexdigest()[:16]
cache_key_text = f"das_extract::{text_hash}"
# Vérifier le cache
if cache is not None:
cached = cache.get(cache_key_text, "das_llm")
if cached is not None:
logger.info("Cache hit pour extraction DAS LLM")
return cached.get("diagnostics_supplementaires", [])
# Construire le prompt et appeler Ollama
prompt = _build_prompt_das_extraction(text, contexte, existing_das, dp_texte)
result = call_ollama(prompt, temperature=0.1, max_tokens=2000)
if result is None:
logger.warning("Extraction DAS LLM : Ollama non disponible")
return []
das_list = result.get("diagnostics_supplementaires", [])
if not isinstance(das_list, list):
logger.warning("Extraction DAS LLM : format inattendu")
return []
# Stocker dans le cache
if cache is not None:
cache.put(cache_key_text, "das_llm", result)
logger.info("Extraction DAS LLM : %d diagnostics supplémentaires détectés", len(das_list))
return das_list
def enrich_dossier(dossier: DossierMedical) -> None: def enrich_dossier(dossier: DossierMedical) -> None:
"""Enrichit le DP et tous les DAS d'un dossier via le RAG. """Enrichit le DP et tous les DAS d'un dossier via le RAG.

View File

@@ -11,8 +11,11 @@ import requests
from flask import Flask, abort, render_template, request, jsonify from flask import Flask, abort, render_template, request, jsonify
from markupsafe import Markup from markupsafe import Markup
from ..config import STRUCTURED_DIR, OLLAMA_URL, CCAM_DICT_PATH, DossierMedical from werkzeug.utils import secure_filename
from ..config import STRUCTURED_DIR, OLLAMA_URL, CCAM_DICT_PATH, DossierMedical, ALLOWED_EXTENSIONS, UPLOAD_MAX_SIZE_MB
from .. import config as cfg from .. import config as cfg
from .referentiels import ReferentielManager
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -271,12 +274,12 @@ def create_app() -> Flask:
def reprocess(filepath: str): def reprocess(filepath: str):
"""Relance le traitement d'un dossier.""" """Relance le traitement d'un dossier."""
from ..main import process_pdf, write_outputs from ..main import process_pdf, write_outputs
dossier = load_dossier(filepath) dossier = load_dossier(filepath)
source_file = dossier.source_file source_file = dossier.source_file
if not source_file: if not source_file:
return jsonify({"error": "Fichier source introuvable"}), 400 return jsonify({"error": "Fichier source introuvable"}), 400
# Chercher le PDF source dans input/ # Chercher le PDF source dans input/
input_dir = Path(__file__).parent.parent.parent / "input" input_dir = Path(__file__).parent.parent.parent / "input"
pdf_path = None pdf_path = None
@@ -284,10 +287,10 @@ def create_app() -> Flask:
if p.is_file(): if p.is_file():
pdf_path = p pdf_path = p
break break
if not pdf_path: if not pdf_path:
return jsonify({"error": f"PDF source '{source_file}' introuvable"}), 404 return jsonify({"error": f"PDF source '{source_file}' introuvable"}), 404
try: try:
anonymized_text, new_dossier, report = process_pdf(pdf_path) anonymized_text, new_dossier, report = process_pdf(pdf_path)
stem = pdf_path.stem.replace(" ", "_") stem = pdf_path.stem.replace(" ", "_")
@@ -300,4 +303,64 @@ def create_app() -> Flask:
logger.exception("Erreur lors du retraitement") logger.exception("Erreur lors du retraitement")
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
# ------------------------------------------------------------------
# Routes admin référentiels
# ------------------------------------------------------------------
ref_manager = ReferentielManager()
@app.route("/admin/referentiels")
def admin_referentiels():
refs = ref_manager.list_all()
return render_template("admin_referentiels.html", referentiels=refs, max_size=UPLOAD_MAX_SIZE_MB)
@app.route("/admin/referentiels/upload", methods=["POST"])
def upload_referentiel():
if "file" not in request.files:
return jsonify({"error": "Aucun fichier envoyé"}), 400
f = request.files["file"]
if not f.filename:
return jsonify({"error": "Nom de fichier vide"}), 400
filename = secure_filename(f.filename)
try:
file_data = f.read()
ref = ref_manager.add_file(filename, file_data)
return jsonify({"ok": True, "referentiel": ref})
except ValueError as e:
return jsonify({"error": str(e)}), 400
@app.route("/admin/referentiels/<ref_id>/index", methods=["POST"])
def index_referentiel(ref_id: str):
try:
count = ref_manager.index_referentiel(ref_id)
return jsonify({"ok": True, "chunks": count})
except ValueError as e:
return jsonify({"error": str(e)}), 404
except Exception as e:
logger.exception("Erreur lors de l'indexation du référentiel %s", ref_id)
return jsonify({"error": str(e)}), 500
@app.route("/admin/referentiels/<ref_id>", methods=["DELETE"])
def delete_referentiel(ref_id: str):
if ref_manager.remove(ref_id):
return jsonify({"ok": True})
return jsonify({"error": "Référentiel introuvable"}), 404
@app.route("/admin/referentiels/rebuild-index", methods=["POST"])
def rebuild_index():
try:
from ..medical.rag_index import build_index
build_index(force=True)
# Réindexer tous les référentiels actifs
reindexed = 0
for ref in ref_manager.list_all():
if ref["status"] == "indexed":
ref_manager.index_referentiel(ref["id"])
reindexed += 1
return jsonify({"ok": True, "reindexed": reindexed})
except Exception as e:
logger.exception("Erreur lors du rebuild de l'index")
return jsonify({"error": str(e)}), 500
return app return app

155
src/viewer/referentiels.py Normal file
View File

@@ -0,0 +1,155 @@
"""Gestionnaire de référentiels utilisateur pour le RAG."""
from __future__ import annotations
import json
import logging
import shutil
import uuid
from datetime import datetime
from pathlib import Path
from ..config import REFERENTIELS_DIR, ALLOWED_EXTENSIONS, UPLOAD_MAX_SIZE_MB
logger = logging.getLogger(__name__)
class ReferentielManager:
"""CRUD pour les fichiers de référentiels utilisateur.
Stocke les fichiers dans REFERENTIELS_DIR avec un index.json
pour les métadonnées.
"""
def __init__(self, referentiels_dir: Path | None = None):
self._dir = referentiels_dir or REFERENTIELS_DIR
self._dir.mkdir(parents=True, exist_ok=True)
self._index_path = self._dir / "index.json"
self._index: list[dict] = self._load_index()
def _load_index(self) -> list[dict]:
if self._index_path.exists():
try:
return json.loads(self._index_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, KeyError):
logger.warning("Index référentiels corrompu, réinitialisé")
return []
def _save_index(self) -> None:
self._index_path.write_text(
json.dumps(self._index, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def list_all(self) -> list[dict]:
"""Retourne la liste de tous les référentiels."""
return list(self._index)
def get(self, ref_id: str) -> dict | None:
"""Retourne un référentiel par son ID."""
for ref in self._index:
if ref["id"] == ref_id:
return ref
return None
def add_file(self, filename: str, file_data: bytes) -> dict:
"""Ajoute un fichier de référentiel.
Args:
filename: Nom original du fichier.
file_data: Contenu binaire du fichier.
Returns:
Métadonnées du référentiel créé.
Raises:
ValueError: Extension non autorisée ou taille dépassée.
"""
ext = Path(filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise ValueError(f"Extension '{ext}' non autorisée. Extensions valides : {ALLOWED_EXTENSIONS}")
size_mb = len(file_data) / (1024 * 1024)
if size_mb > UPLOAD_MAX_SIZE_MB:
raise ValueError(f"Fichier trop volumineux ({size_mb:.1f} Mo > {UPLOAD_MAX_SIZE_MB} Mo)")
ref_id = uuid.uuid4().hex[:12]
safe_name = f"{ref_id}_{Path(filename).stem}{ext}"
file_path = self._dir / safe_name
file_path.write_bytes(file_data)
ref = {
"id": ref_id,
"filename": filename,
"stored_name": safe_name,
"extension": ext,
"size_bytes": len(file_data),
"date_added": datetime.now().isoformat(),
"status": "uploaded",
"chunks_count": 0,
}
self._index.append(ref)
self._save_index()
logger.info("Référentiel ajouté : %s (%s)", filename, ref_id)
return ref
def remove(self, ref_id: str) -> bool:
"""Supprime un référentiel (fichier + métadonnées).
Returns:
True si trouvé et supprimé, False sinon.
"""
ref = self.get(ref_id)
if not ref:
return False
file_path = self._dir / ref["stored_name"]
if file_path.exists():
file_path.unlink()
self._index = [r for r in self._index if r["id"] != ref_id]
self._save_index()
logger.info("Référentiel supprimé : %s (%s)", ref["filename"], ref_id)
return True
def index_referentiel(self, ref_id: str) -> int:
"""Indexe un référentiel dans FAISS.
Args:
ref_id: ID du référentiel à indexer.
Returns:
Nombre de chunks indexés.
Raises:
ValueError: Référentiel introuvable.
"""
ref = self.get(ref_id)
if not ref:
raise ValueError(f"Référentiel {ref_id} introuvable")
file_path = self._dir / ref["stored_name"]
if not file_path.exists():
raise ValueError(f"Fichier {ref['stored_name']} introuvable")
from ..medical.rag_index import chunk_user_file, add_chunks_to_index
doc_name = f"ref:{ref['filename']}"
chunks = chunk_user_file(file_path, doc_name)
if not chunks:
ref["status"] = "empty"
ref["chunks_count"] = 0
self._save_index()
return 0
count = add_chunks_to_index(chunks)
ref["status"] = "indexed"
ref["chunks_count"] = count
self._save_index()
logger.info("Référentiel indexé : %s%d chunks", ref["filename"], count)
return count

View File

@@ -0,0 +1,220 @@
{% extends "base.html" %}
{% block title %}Référentiels RAG{% endblock %}
{% block sidebar %}
<div class="group-title">Admin</div>
<a href="/admin/referentiels" style="color:#60a5fa;font-weight:600;border-left-color:#3b82f6;">Référentiels RAG</a>
<a href="/">Retour aux dossiers</a>
{% endblock %}
{% block content %}
<h2>Référentiels RAG</h2>
<p style="font-size:0.85rem;color:#64748b;margin-bottom:1.5rem;">
Ajoutez des documents de référence (PDF, CSV, Excel, TXT) pour enrichir la base de connaissances du RAG.
</p>
<!-- Zone upload -->
<div class="card" style="margin-bottom:1.5rem;">
<h3>Ajouter un référentiel</h3>
<form id="upload-form" style="display:flex;gap:0.75rem;align-items:end;flex-wrap:wrap;margin-top:0.75rem;">
<div>
<label style="display:block;font-size:0.7rem;color:#64748b;text-transform:uppercase;letter-spacing:0.05em;font-weight:600;margin-bottom:0.25rem;">Fichier</label>
<input type="file" id="file-input" name="file" accept=".pdf,.csv,.xlsx,.xls,.txt"
style="font-size:0.85rem;padding:0.35rem;">
</div>
<button type="submit" id="upload-btn"
style="padding:0.5rem 1.25rem;border-radius:6px;border:none;background:#3b82f6;color:#fff;font-size:0.85rem;font-weight:600;cursor:pointer;">
Uploader
</button>
<span id="upload-status" style="font-size:0.8rem;"></span>
</form>
<p style="font-size:0.7rem;color:#94a3b8;margin-top:0.5rem;">
Extensions : .pdf, .csv, .xlsx, .xls, .txt — Max {{ max_size }} Mo
</p>
</div>
<!-- Tableau référentiels -->
<div class="card">
<div style="display:flex;justify-content:space-between;align-items:center;margin-bottom:0.75rem;">
<h3>Référentiels indexés</h3>
<button id="rebuild-btn"
style="padding:0.35rem 0.75rem;border-radius:6px;border:1px solid #e2e8f0;background:#fff;font-size:0.75rem;cursor:pointer;">
Rebuild complet
</button>
</div>
<table>
<thead>
<tr>
<th>Nom</th>
<th>Type</th>
<th>Taille</th>
<th>Date</th>
<th>Chunks</th>
<th>Statut</th>
<th>Actions</th>
</tr>
</thead>
<tbody id="ref-table">
{% for ref in referentiels %}
<tr id="row-{{ ref.id }}">
<td>{{ ref.filename }}</td>
<td><span class="badge" style="background:#f1f5f9;color:#334155;">{{ ref.extension }}</span></td>
<td>{{ "%.1f"|format(ref.size_bytes / 1024 / 1024) }} Mo</td>
<td style="font-size:0.8rem;">{{ ref.date_added[:10] }}</td>
<td>{{ ref.chunks_count }}</td>
<td>
{% if ref.status == 'indexed' %}
<span class="badge" style="background:#dcfce7;color:#16a34a;">Indexé</span>
{% elif ref.status == 'empty' %}
<span class="badge" style="background:#fef9c3;color:#ca8a04;">Vide</span>
{% else %}
<span class="badge" style="background:#f1f5f9;color:#64748b;">Uploadé</span>
{% endif %}
</td>
<td>
<button onclick="indexRef('{{ ref.id }}')" class="action-btn"
style="padding:2px 8px;border-radius:4px;border:1px solid #3b82f6;background:#eff6ff;color:#2563eb;font-size:0.75rem;cursor:pointer;margin-right:4px;">
Indexer
</button>
<button onclick="deleteRef('{{ ref.id }}')" class="action-btn"
style="padding:2px 8px;border-radius:4px;border:1px solid #fca5a5;background:#fef2f2;color:#dc2626;font-size:0.75rem;cursor:pointer;">
Supprimer
</button>
</td>
</tr>
{% endfor %}
{% if not referentiels %}
<tr id="empty-row">
<td colspan="7" style="text-align:center;color:#94a3b8;padding:2rem;">Aucun référentiel</td>
</tr>
{% endif %}
</tbody>
</table>
</div>
<div id="global-status" style="margin-top:1rem;font-size:0.8rem;"></div>
{% endblock %}
{% block scripts %}
<script>
(function() {
const uploadForm = document.getElementById('upload-form');
const fileInput = document.getElementById('file-input');
const uploadBtn = document.getElementById('upload-btn');
const uploadStatus = document.getElementById('upload-status');
const globalStatus = document.getElementById('global-status');
const rebuildBtn = document.getElementById('rebuild-btn');
uploadForm.addEventListener('submit', function(e) {
e.preventDefault();
const file = fileInput.files[0];
if (!file) { uploadStatus.textContent = 'Sélectionnez un fichier'; return; }
const fd = new FormData();
fd.append('file', file);
uploadBtn.disabled = true;
uploadBtn.innerHTML = '<span class="spinner"></span>';
uploadStatus.textContent = '';
fetch('/admin/referentiels/upload', { method: 'POST', body: fd })
.then(r => r.json())
.then(d => {
uploadBtn.disabled = false;
uploadBtn.textContent = 'Uploader';
if (d.ok) {
uploadStatus.style.color = '#16a34a';
uploadStatus.textContent = 'Uploadé';
setTimeout(() => location.reload(), 800);
} else {
uploadStatus.style.color = '#dc2626';
uploadStatus.textContent = d.error || 'Erreur';
}
})
.catch(() => {
uploadBtn.disabled = false;
uploadBtn.textContent = 'Uploader';
uploadStatus.style.color = '#dc2626';
uploadStatus.textContent = 'Erreur réseau';
});
});
window.indexRef = function(id) {
const btn = event.target;
btn.disabled = true;
btn.innerHTML = '<span class="spinner" style="border-color:rgba(37,99,235,0.3);border-top-color:#2563eb;width:10px;height:10px;"></span>';
fetch('/admin/referentiels/' + id + '/index', { method: 'POST' })
.then(r => r.json())
.then(d => {
if (d.ok) {
globalStatus.style.color = '#16a34a';
globalStatus.textContent = d.chunks + ' chunks indexés';
setTimeout(() => location.reload(), 800);
} else {
btn.disabled = false;
btn.textContent = 'Indexer';
globalStatus.style.color = '#dc2626';
globalStatus.textContent = d.error || 'Erreur';
}
})
.catch(() => {
btn.disabled = false;
btn.textContent = 'Indexer';
globalStatus.style.color = '#dc2626';
globalStatus.textContent = 'Erreur réseau';
});
};
window.deleteRef = function(id) {
if (!confirm('Supprimer ce référentiel ?')) return;
fetch('/admin/referentiels/' + id, { method: 'DELETE' })
.then(r => r.json())
.then(d => {
if (d.ok) {
const row = document.getElementById('row-' + id);
if (row) row.remove();
globalStatus.style.color = '#16a34a';
globalStatus.textContent = 'Supprimé';
} else {
globalStatus.style.color = '#dc2626';
globalStatus.textContent = d.error || 'Erreur';
}
})
.catch(() => {
globalStatus.style.color = '#dc2626';
globalStatus.textContent = 'Erreur réseau';
});
};
rebuildBtn.addEventListener('click', function() {
if (!confirm('Reconstruire l\'index FAISS complet ? Cela peut prendre plusieurs minutes.')) return;
rebuildBtn.disabled = true;
rebuildBtn.innerHTML = '<span class="spinner" style="border-color:rgba(0,0,0,0.2);border-top-color:#333;width:10px;height:10px;"></span> Rebuild…';
fetch('/admin/referentiels/rebuild-index', { method: 'POST' })
.then(r => r.json())
.then(d => {
rebuildBtn.disabled = false;
rebuildBtn.textContent = 'Rebuild complet';
if (d.ok) {
globalStatus.style.color = '#16a34a';
globalStatus.textContent = 'Index reconstruit (' + d.reindexed + ' référentiels réindexés)';
} else {
globalStatus.style.color = '#dc2626';
globalStatus.textContent = d.error || 'Erreur';
}
})
.catch(() => {
rebuildBtn.disabled = false;
rebuildBtn.textContent = 'Rebuild complet';
globalStatus.style.color = '#dc2626';
globalStatus.textContent = 'Erreur réseau';
});
});
})();
</script>
{% endblock %}

View File

@@ -227,6 +227,12 @@
<nav class="sidebar-nav" id="sidebar-nav"> <nav class="sidebar-nav" id="sidebar-nav">
{% block sidebar %}{% endblock %} {% block sidebar %}{% endblock %}
</nav> </nav>
<div class="sidebar-admin" style="border-top:1px solid #1e293b;padding:0.5rem 1rem;">
<a href="/admin/referentiels" style="display:block;color:#94a3b8;text-decoration:none;font-size:0.8rem;padding:0.35rem 0;transition:color 0.15s;"
onmouseover="this.style.color='#e2e8f0'" onmouseout="this.style.color='#94a3b8'">
Référentiels RAG
</a>
</div>
<div class="sidebar-admin"> <div class="sidebar-admin">
<label for="model-select">Modèle Ollama</label> <label for="model-select">Modèle Ollama</label>
<select id="model-select"><option>Chargement…</option></select> <select id="model-select"><option>Chargement…</option></select>

213
tests/test_das_llm.py Normal file
View File

@@ -0,0 +1,213 @@
"""Tests pour le pass LLM d'extraction de DAS supplémentaires."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from src.config import Diagnostic, DossierMedical, Sejour
from src.medical.ollama_cache import OllamaCache
class TestExtractDasLlm:
"""Tests pour extract_das_llm() dans rag_search.py."""
def test_returns_das_from_llm(self):
"""Le pass LLM retourne des DAS supplémentaires."""
from src.medical.rag_search import extract_das_llm
mock_result = {
"diagnostics_supplementaires": [
{
"texte": "Hypertension artérielle",
"code_cim10": "I10",
"justification": "HTA mentionnée dans le texte",
},
]
}
with patch("src.medical.rag_search.call_ollama", return_value=mock_result):
result = extract_das_llm(
text="Patient hypertendu sous traitement",
contexte={"sexe": "M", "age": 65},
existing_das=["Diabète de type 2 (E11.9)"],
dp_texte="Pancréatite aiguë biliaire",
)
assert len(result) == 1
assert result[0]["code_cim10"] == "I10"
assert result[0]["texte"] == "Hypertension artérielle"
def test_returns_empty_when_ollama_unavailable(self):
"""Retourne une liste vide si Ollama est indisponible."""
from src.medical.rag_search import extract_das_llm
with patch("src.medical.rag_search.call_ollama", return_value=None):
result = extract_das_llm(
text="Texte médical",
contexte={},
existing_das=[],
dp_texte="",
)
assert result == []
def test_returns_empty_on_bad_format(self):
"""Retourne une liste vide si le format de réponse est inattendu."""
from src.medical.rag_search import extract_das_llm
with patch("src.medical.rag_search.call_ollama", return_value={"other_key": "value"}):
result = extract_das_llm(
text="Texte médical",
contexte={},
existing_das=[],
dp_texte="",
)
assert result == []
def test_cache_hit(self, tmp_path):
"""Le cache est utilisé quand disponible."""
from src.medical.rag_search import extract_das_llm
cache = OllamaCache(tmp_path / "cache.json", "test-model")
mock_result = {
"diagnostics_supplementaires": [
{"texte": "Anémie", "code_cim10": "D64.9", "justification": "test"},
]
}
# Premier appel : cache miss, appelle Ollama
with patch("src.medical.rag_search.call_ollama", return_value=mock_result) as mock_call:
result1 = extract_das_llm(
text="Patient anémique Hb basse",
contexte={},
existing_das=[],
dp_texte="",
cache=cache,
)
assert mock_call.call_count == 1
assert len(result1) == 1
# Vérifier que le cache contient bien l'entrée
assert len(cache) > 0
# Deuxième appel : cache hit, pas d'appel Ollama
with patch("src.medical.rag_search.call_ollama") as mock_call:
result2 = extract_das_llm(
text="Patient anémique Hb basse",
contexte={},
existing_das=[],
dp_texte="",
cache=cache,
)
mock_call.assert_not_called()
assert len(result2) == 1
assert result2[0]["code_cim10"] == "D64.9"
def test_prompt_includes_context(self):
"""Le prompt contient le contexte patient et les DAS existants."""
from src.medical.rag_search import _build_prompt_das_extraction
prompt = _build_prompt_das_extraction(
text="Patient hypertendu diabétique",
contexte={"sexe": "F", "age": 72, "duree_sejour": 5},
existing_das=["Diabète de type 2 (E11.9)", "Obésité (E66.0)"],
dp_texte="Pancréatite aiguë biliaire",
)
assert "Pancréatite aiguë biliaire" in prompt
assert "Diabète de type 2 (E11.9)" in prompt
assert "Obésité (E66.0)" in prompt
assert "Patient hypertendu diabétique" in prompt
class TestExtractDasLlmIntegration:
"""Tests d'intégration pour le pass LLM DAS dans cim10_extractor.py."""
def test_das_llm_called_when_use_rag_true(self):
"""Le pass LLM DAS est appelé quand use_rag=True."""
from src.medical.cim10_extractor import extract_medical_info
parsed = {
"type": "CRH",
"patient": {"sexe": "M"},
"sejour": {},
"diagnostics": [
{"libelle": "Pancréatite aiguë biliaire", "code_cim10": "K85.1", "type": "principal"},
],
}
with patch("src.medical.cim10_extractor._extract_das_llm") as mock_llm, \
patch("src.medical.cim10_extractor._enrich_with_rag"):
extract_medical_info(parsed, "texte médical", use_rag=True)
mock_llm.assert_called_once()
def test_das_llm_not_called_when_use_rag_false(self):
"""Le pass LLM DAS n'est PAS appelé quand use_rag=False."""
from src.medical.cim10_extractor import extract_medical_info
parsed = {
"type": "CRH",
"patient": {"sexe": "M"},
"sejour": {},
"diagnostics": [
{"libelle": "Pancréatite aiguë biliaire", "code_cim10": "K85.1", "type": "principal"},
],
}
with patch("src.medical.cim10_extractor._extract_das_llm") as mock_llm:
extract_medical_info(parsed, "texte médical", use_rag=False)
mock_llm.assert_not_called()
def test_das_llm_filters_invalid_codes(self):
"""Les codes CIM-10 invalides sont filtrés lors de l'intégration."""
from src.medical.cim10_extractor import _extract_das_llm
dossier = DossierMedical()
dossier.sejour = Sejour(sexe="M", age=50)
dossier.diagnostic_principal = Diagnostic(
texte="Pancréatite aiguë", cim10_suggestion="K85.9",
)
mock_result = [
{"texte": "Hypertension artérielle", "code_cim10": "I10", "justification": "ok"},
{"texte": "Diagnostic bidon", "code_cim10": "ZZZ.99", "justification": "invalide"},
]
with patch("src.medical.rag_search.extract_das_llm", return_value=mock_result):
_extract_das_llm("texte médical", dossier)
# I10 est valide → ajouté ; ZZZ.99 est invalide → filtré
codes = [d.cim10_suggestion for d in dossier.diagnostics_associes]
assert "I10" in codes
assert "ZZZ.99" not in codes
def test_das_llm_deduplicates(self):
"""Les codes déjà présents dans les DAS ne sont pas dupliqués."""
from src.medical.cim10_extractor import _extract_das_llm
dossier = DossierMedical()
dossier.sejour = Sejour(sexe="M", age=50)
dossier.diagnostic_principal = Diagnostic(
texte="Pancréatite aiguë", cim10_suggestion="K85.9",
)
dossier.diagnostics_associes = [
Diagnostic(texte="Hypertension artérielle", cim10_suggestion="I10"),
]
mock_result = [
{"texte": "HTA essentielle", "code_cim10": "I10", "justification": "doublon"},
{"texte": "Obésité", "code_cim10": "E66.0", "justification": "nouveau"},
]
with patch("src.medical.rag_search.extract_das_llm", return_value=mock_result):
_extract_das_llm("texte médical", dossier)
codes = [d.cim10_suggestion for d in dossier.diagnostics_associes]
assert codes.count("I10") == 1 # Pas de doublon
assert "E66.0" in codes # Nouveau ajouté

179
tests/test_referentiels.py Normal file
View File

@@ -0,0 +1,179 @@
"""Tests pour le gestionnaire de référentiels et les routes Flask associées."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from src.viewer.referentiels import ReferentielManager
from src.config import ALLOWED_EXTENSIONS, UPLOAD_MAX_SIZE_MB
# ---------------------------------------------------------------------------
# Tests ReferentielManager
# ---------------------------------------------------------------------------
class TestReferentielManager:
@pytest.fixture
def manager(self, tmp_path):
return ReferentielManager(tmp_path / "refs")
def test_add_file(self, manager):
ref = manager.add_file("guide.pdf", b"fake pdf content")
assert ref["filename"] == "guide.pdf"
assert ref["extension"] == ".pdf"
assert ref["status"] == "uploaded"
assert ref["size_bytes"] == len(b"fake pdf content")
assert ref["chunks_count"] == 0
def test_list_all(self, manager):
manager.add_file("a.txt", b"hello")
manager.add_file("b.csv", b"col1,col2")
assert len(manager.list_all()) == 2
def test_get(self, manager):
ref = manager.add_file("guide.pdf", b"content")
found = manager.get(ref["id"])
assert found is not None
assert found["filename"] == "guide.pdf"
def test_get_not_found(self, manager):
assert manager.get("nonexistent") is None
def test_remove(self, manager):
ref = manager.add_file("guide.pdf", b"content")
assert manager.remove(ref["id"]) is True
assert len(manager.list_all()) == 0
assert manager.get(ref["id"]) is None
def test_remove_not_found(self, manager):
assert manager.remove("nonexistent") is False
def test_add_file_invalid_extension(self, manager):
with pytest.raises(ValueError, match="Extension"):
manager.add_file("malware.exe", b"evil")
def test_add_file_too_large(self, manager):
big_data = b"x" * (UPLOAD_MAX_SIZE_MB * 1024 * 1024 + 1)
with pytest.raises(ValueError, match="volumineux"):
manager.add_file("big.pdf", big_data)
def test_persistence(self, tmp_path):
"""L'index persiste entre les instances."""
dir_path = tmp_path / "refs"
m1 = ReferentielManager(dir_path)
m1.add_file("a.txt", b"hello")
m2 = ReferentielManager(dir_path)
assert len(m2.list_all()) == 1
assert m2.list_all()[0]["filename"] == "a.txt"
def test_file_stored_on_disk(self, manager, tmp_path):
ref = manager.add_file("test.txt", b"file content here")
stored_path = manager._dir / ref["stored_name"]
assert stored_path.exists()
assert stored_path.read_bytes() == b"file content here"
def test_remove_deletes_file(self, manager):
ref = manager.add_file("test.txt", b"content")
stored_path = manager._dir / ref["stored_name"]
assert stored_path.exists()
manager.remove(ref["id"])
assert not stored_path.exists()
# ---------------------------------------------------------------------------
# Tests chunking générique
# ---------------------------------------------------------------------------
class TestChunking:
def test_chunk_txt(self, tmp_path):
from src.medical.rag_index import chunk_user_file
txt_file = tmp_path / "test.txt"
txt_file.write_text(
"Premier paragraphe avec assez de mots pour le seuil.\n\n"
"Deuxième paragraphe avec encore plus de mots pour dépasser le minimum.\n\n"
"Court\n\n"
"Troisième paragraphe qui devrait aussi être un chunk valide.",
encoding="utf-8",
)
chunks = chunk_user_file(txt_file, "test_doc")
assert len(chunks) >= 2 # au moins 2 paragraphes assez longs
assert all(c.document == "test_doc" for c in chunks)
def test_chunk_csv(self, tmp_path):
from src.medical.rag_index import chunk_user_file
csv_file = tmp_path / "test.csv"
csv_file.write_text(
"code,description,note\n"
"K85.1,Pancréatite aiguë biliaire,diagnostic fréquent\n"
"I10,Hypertension essentielle,comorbidité courante\n",
encoding="utf-8",
)
chunks = chunk_user_file(csv_file, "csv_doc")
assert len(chunks) == 2
assert "K85.1" in chunks[0].text
assert "I10" in chunks[1].text
def test_chunk_unsupported_extension(self, tmp_path):
from src.medical.rag_index import chunk_user_file
bad_file = tmp_path / "test.xyz"
bad_file.write_text("content")
chunks = chunk_user_file(bad_file, "bad")
assert chunks == []
# ---------------------------------------------------------------------------
# Tests routes Flask
# ---------------------------------------------------------------------------
class TestReferentielRoutes:
@pytest.fixture
def app(self, tmp_path):
"""Crée une app Flask de test avec un manager temporaire."""
from src.viewer.app import create_app
app = create_app()
app.config["TESTING"] = True
return app
@pytest.fixture
def client(self, app):
return app.test_client()
def test_admin_page_loads(self, client):
resp = client.get("/admin/referentiels")
assert resp.status_code == 200
assert "Référentiels RAG" in resp.data.decode()
def test_upload_no_file(self, client):
resp = client.post("/admin/referentiels/upload")
assert resp.status_code == 400
data = resp.get_json()
assert "error" in data
def test_upload_valid_file(self, client):
from io import BytesIO
data = {
"file": (BytesIO(b"test content"), "doc.txt"),
}
resp = client.post("/admin/referentiels/upload", data=data, content_type="multipart/form-data")
result = resp.get_json()
assert resp.status_code == 200
assert result["ok"] is True
assert result["referentiel"]["filename"] == "doc.txt"
def test_delete_nonexistent(self, client):
resp = client.delete("/admin/referentiels/nonexistent")
assert resp.status_code == 404