Implémente un système RAG (Retrieval Augmented Generation) qui indexe les documents de référence ATIH (CIM-10 FR 2026, Guide Métho MCO, CCAM PMSI) et utilise Ollama (mistral-small3.2:24b) pour justifier et valider le codage CIM-10 des diagnostics. - Nouveaux modèles Pydantic : RAGSource, Diagnostic étendu (confidence, justification, sources_rag) — rétrocompatible - Module rag_index.py : chunking des 3 PDFs, embedding sentence-camembert-large, index FAISS IndexFlatIP (3630 vecteurs) - Module rag_search.py : recherche FAISS + appel Ollama avec fallback double - Flag CLI --no-rag pour désactiver l'enrichissement RAG - 18 nouveaux tests (88/88 passent) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
353 lines
12 KiB
Python
353 lines
12 KiB
Python
"""Indexation FAISS des documents de référence CIM-10 / Guide métho / CCAM."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import pdfplumber
|
|
|
|
from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton pour l'index chargé en mémoire
|
|
_faiss_index = None
|
|
_metadata: list[dict] = []
|
|
|
|
|
|
@dataclass
|
|
class Chunk:
|
|
text: str
|
|
document: str # "cim10", "guide_methodo", "ccam"
|
|
page: Optional[int] = None
|
|
code: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chunking CIM-10
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _chunk_cim10(pdf_path: Path) -> list[Chunk]:
|
|
"""Découpe le PDF CIM-10 en chunks par code 3 caractères (ex: K80, K85)."""
|
|
chunks: list[Chunk] = []
|
|
current_code: str | None = None
|
|
current_text: list[str] = []
|
|
current_page: int | None = None
|
|
|
|
# Pattern pour détecter un code CIM-10 à 3 caractères en début de ligne
|
|
code3_pattern = re.compile(r"^([A-Z]\d{2})\s+(.+)")
|
|
# Pattern pour les sous-codes (ex: K80.0, K80.1)
|
|
subcode_pattern = re.compile(r"^([A-Z]\d{2}\.\d+)\s+(.+)")
|
|
|
|
logger.info("Extraction des chunks CIM-10 depuis %s", pdf_path.name)
|
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
for page_num, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text()
|
|
if not text:
|
|
continue
|
|
|
|
for line in text.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
m = code3_pattern.match(line)
|
|
if m and not subcode_pattern.match(line):
|
|
# Nouveau code 3-char → sauvegarder le chunk précédent
|
|
if current_code and current_text:
|
|
chunk_text = "\n".join(current_text)
|
|
if len(chunk_text.split()) >= 5:
|
|
chunks.append(Chunk(
|
|
text=chunk_text,
|
|
document="cim10",
|
|
page=current_page,
|
|
code=current_code,
|
|
))
|
|
current_code = m.group(1)
|
|
current_text = [line]
|
|
current_page = page_num
|
|
else:
|
|
if current_code:
|
|
current_text.append(line)
|
|
|
|
# Dernier chunk
|
|
if current_code and current_text:
|
|
chunk_text = "\n".join(current_text)
|
|
if len(chunk_text.split()) >= 5:
|
|
chunks.append(Chunk(
|
|
text=chunk_text,
|
|
document="cim10",
|
|
page=current_page,
|
|
code=current_code,
|
|
))
|
|
|
|
logger.info("CIM-10 : %d chunks extraits", len(chunks))
|
|
return chunks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chunking Guide Méthodologique MCO
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _chunk_guide_methodo(pdf_path: Path) -> list[Chunk]:
|
|
"""Découpe le Guide Méthodologique MCO par sections/titres."""
|
|
chunks: list[Chunk] = []
|
|
current_title: str | None = None
|
|
current_text: list[str] = []
|
|
current_page: int | None = None
|
|
|
|
# Patterns de titres de sections (chapitres, sous-chapitres)
|
|
title_patterns = [
|
|
re.compile(r"^((?:CHAPITRE|TITRE|PARTIE)\s+[IVXLCDM0-9]+.*)$", re.IGNORECASE),
|
|
re.compile(r"^(\d+\.\d*\s+[A-ZÉÈÊÀÂÔÙÛÜ].{5,})$"),
|
|
re.compile(r"^([A-ZÉÈÊÀÂÔÙÛÜ][A-ZÉÈÊÀÂÔÙÛÜ\s]{10,})$"),
|
|
]
|
|
|
|
logger.info("Extraction des chunks Guide Métho depuis %s", pdf_path.name)
|
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
for page_num, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text()
|
|
if not text:
|
|
continue
|
|
|
|
for line in text.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
is_title = False
|
|
for pat in title_patterns:
|
|
if pat.match(line):
|
|
is_title = True
|
|
break
|
|
|
|
if is_title and len(line) > 8:
|
|
# Sauvegarder le chunk précédent
|
|
if current_title and current_text:
|
|
chunk_text = current_title + "\n" + "\n".join(current_text)
|
|
if len(chunk_text.split()) >= 20:
|
|
chunks.append(Chunk(
|
|
text=chunk_text,
|
|
document="guide_methodo",
|
|
page=current_page,
|
|
))
|
|
current_title = line
|
|
current_text = []
|
|
current_page = page_num
|
|
else:
|
|
current_text.append(line)
|
|
|
|
# Dernier chunk
|
|
if current_title and current_text:
|
|
chunk_text = current_title + "\n" + "\n".join(current_text)
|
|
if len(chunk_text.split()) >= 20:
|
|
chunks.append(Chunk(
|
|
text=chunk_text,
|
|
document="guide_methodo",
|
|
page=current_page,
|
|
))
|
|
|
|
# Si trop peu de chunks (le PDF ne suit pas les patterns de titre),
|
|
# fallback : découper par pages groupées par 3
|
|
if len(chunks) < 10:
|
|
logger.info("Guide Métho : fallback découpe par pages (peu de titres détectés)")
|
|
chunks = []
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
page_texts: list[str] = []
|
|
start_page = 1
|
|
for page_num, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text()
|
|
if text:
|
|
page_texts.append(text)
|
|
if len(page_texts) >= 3:
|
|
combined = "\n".join(page_texts)
|
|
if len(combined.split()) >= 20:
|
|
chunks.append(Chunk(
|
|
text=combined,
|
|
document="guide_methodo",
|
|
page=start_page,
|
|
))
|
|
page_texts = []
|
|
start_page = page_num + 1
|
|
if page_texts:
|
|
combined = "\n".join(page_texts)
|
|
if len(combined.split()) >= 20:
|
|
chunks.append(Chunk(
|
|
text=combined,
|
|
document="guide_methodo",
|
|
page=start_page,
|
|
))
|
|
|
|
logger.info("Guide Métho : %d chunks extraits", len(chunks))
|
|
return chunks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chunking CCAM
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _chunk_ccam(pdf_path: Path) -> list[Chunk]:
|
|
"""Découpe le PDF CCAM en chunks par code d'acte."""
|
|
chunks: list[Chunk] = []
|
|
ccam_pattern = re.compile(r"([A-Z]{4}\d{3})\s+(.*)")
|
|
|
|
logger.info("Extraction des chunks CCAM depuis %s", pdf_path.name)
|
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
for page_num, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text()
|
|
if not text:
|
|
continue
|
|
|
|
current_code: str | None = None
|
|
current_lines: list[str] = []
|
|
|
|
for line in text.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
m = ccam_pattern.match(line)
|
|
if m:
|
|
if current_code and current_lines:
|
|
chunks.append(Chunk(
|
|
text="\n".join(current_lines),
|
|
document="ccam",
|
|
page=page_num,
|
|
code=current_code,
|
|
))
|
|
current_code = m.group(1)
|
|
current_lines = [line]
|
|
elif current_code:
|
|
current_lines.append(line)
|
|
|
|
if current_code and current_lines:
|
|
chunks.append(Chunk(
|
|
text="\n".join(current_lines),
|
|
document="ccam",
|
|
page=page_num,
|
|
code=current_code,
|
|
))
|
|
|
|
# Fallback : si aucun code CCAM détecté, indexer par page
|
|
if not chunks:
|
|
logger.info("CCAM : aucun code détecté, fallback par page")
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
for page_num, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text()
|
|
if text and len(text.split()) >= 10:
|
|
chunks.append(Chunk(
|
|
text=text,
|
|
document="ccam",
|
|
page=page_num,
|
|
))
|
|
|
|
logger.info("CCAM : %d chunks extraits", len(chunks))
|
|
return chunks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Construction de l'index FAISS
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_index(force: bool = False) -> None:
|
|
"""Construit l'index FAISS à partir des 3 PDFs de référence.
|
|
|
|
Args:
|
|
force: Si True, reconstruit même si l'index existe déjà.
|
|
"""
|
|
import faiss
|
|
import numpy as np
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
index_path = RAG_INDEX_DIR / "faiss.index"
|
|
meta_path = RAG_INDEX_DIR / "metadata.json"
|
|
|
|
if not force and index_path.exists() and meta_path.exists():
|
|
logger.info("Index FAISS déjà existant dans %s (use force=True pour reconstruire)", RAG_INDEX_DIR)
|
|
return
|
|
|
|
# Collecter tous les chunks
|
|
all_chunks: list[Chunk] = []
|
|
|
|
for pdf_path, chunk_fn in [
|
|
(CIM10_PDF, _chunk_cim10),
|
|
(GUIDE_METHODO_PDF, _chunk_guide_methodo),
|
|
(CCAM_PDF, _chunk_ccam),
|
|
]:
|
|
if pdf_path.exists():
|
|
all_chunks.extend(chunk_fn(pdf_path))
|
|
else:
|
|
logger.warning("PDF non trouvé : %s", pdf_path)
|
|
|
|
if not all_chunks:
|
|
logger.error("Aucun chunk extrait — vérifiez les chemins des PDFs")
|
|
return
|
|
|
|
logger.info("Total : %d chunks à indexer", len(all_chunks))
|
|
|
|
# Embeddings — forcer CPU pour éviter les bugs CUDA avec ce modèle
|
|
logger.info("Chargement du modèle d'embedding dangvantuan/sentence-camembert-large (CPU)...")
|
|
model = SentenceTransformer("dangvantuan/sentence-camembert-large", device="cpu")
|
|
model.max_seq_length = 512 # CamemBERT max position embeddings
|
|
|
|
texts = [c.text[:2000] for c in all_chunks] # Tronquer les chunks trop longs
|
|
logger.info("Calcul des embeddings pour %d chunks...", len(texts))
|
|
embeddings = model.encode(
|
|
texts, show_progress_bar=True, normalize_embeddings=True, batch_size=64,
|
|
)
|
|
embeddings = np.array(embeddings, dtype=np.float32)
|
|
|
|
# Index FAISS (IndexFlatIP = cosine similarity avec vecteurs normalisés)
|
|
dim = embeddings.shape[1]
|
|
index = faiss.IndexFlatIP(dim)
|
|
index.add(embeddings)
|
|
|
|
# Sauvegarder
|
|
RAG_INDEX_DIR.mkdir(parents=True, exist_ok=True)
|
|
faiss.write_index(index, str(index_path))
|
|
|
|
metadata = [asdict(c) for c in all_chunks]
|
|
# Ne pas sauvegarder le texte complet dans metadata (trop lourd),
|
|
# garder un extrait de 500 chars
|
|
for m in metadata:
|
|
m["extrait"] = m.pop("text")[:500]
|
|
|
|
meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
logger.info("Index FAISS sauvegardé : %s (%d vecteurs, dim=%d)", index_path, len(all_chunks), dim)
|
|
|
|
|
|
def get_index() -> tuple | None:
|
|
"""Charge l'index FAISS et les métadonnées (singleton lazy-loaded).
|
|
|
|
Returns:
|
|
Tuple (faiss_index, metadata_list) ou None si l'index n'existe pas.
|
|
"""
|
|
global _faiss_index, _metadata
|
|
|
|
if _faiss_index is not None:
|
|
return _faiss_index, _metadata
|
|
|
|
index_path = RAG_INDEX_DIR / "faiss.index"
|
|
meta_path = RAG_INDEX_DIR / "metadata.json"
|
|
|
|
if not index_path.exists() or not meta_path.exists():
|
|
logger.warning("Index FAISS non trouvé dans %s — lancez build_index() d'abord", RAG_INDEX_DIR)
|
|
return None
|
|
|
|
import faiss
|
|
|
|
_faiss_index = faiss.read_index(str(index_path))
|
|
_metadata = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
|
|
logger.info("Index FAISS chargé : %d vecteurs", _faiss_index.ntotal)
|
|
return _faiss_index, _metadata
|