Files
t2a/src/medical/rag_index.py
dom aa397d5360 feat: configuration externalisée via .env + audit requirements
- Externalise 13 variables de config via python-dotenv (chemins PDF,
  modèles Ollama/embedding/NER, FINESS, seuils) avec défauts identiques
- Centralise EMBEDDING_MODEL dans config.py (était hardcodé en 3 endroits)
- Ajoute .env.example documenté et .env au .gitignore
- Ajoute openpyxl et pandas manquants au requirements.txt
- Ajoute data/referentiels au mkdir de run.sh

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 19:46:33 +01:00

665 lines
24 KiB
Python

"""Indexation FAISS des documents de référence CIM-10 / Guide métho / CCAM."""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
import pdfplumber
from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CCAM_DICT_PATH, REFERENTIELS_DIR, EMBEDDING_MODEL
logger = logging.getLogger(__name__)
# Singleton pour l'index chargé en mémoire
_faiss_index = None
_metadata: list[dict] = []
@dataclass
class Chunk:
text: str
document: str # "cim10", "guide_methodo", "ccam"
page: Optional[int] = None
code: Optional[str] = None
# ---------------------------------------------------------------------------
# Chunking CIM-10
# ---------------------------------------------------------------------------
def _chunk_cim10(pdf_path: Path) -> list[Chunk]:
"""Découpe le PDF CIM-10 en double chunking : sous-codes individuels + parents 3-char."""
chunks: list[Chunk] = []
current_code3: str | None = None
current_code3_text: list[str] = []
current_code3_page: int | None = None
# Sous-codes en cours d'accumulation
current_subcode: str | None = None
current_subcode_text: list[str] = []
current_subcode_page: int | None = None
code3_pattern = re.compile(r"^([A-Z]\d{2})\s+(.+)")
subcode_pattern = re.compile(r"^([A-Z]\d{2}\.\d+)\s+(.+)")
logger.info("Extraction des chunks CIM-10 (double chunking) depuis %s", pdf_path.name)
def _flush_subcode():
"""Sauvegarde le chunk sous-code en cours."""
if current_subcode and current_subcode_text:
chunk_text = "\n".join(current_subcode_text)
if len(chunk_text.split()) >= 3:
chunks.append(Chunk(
text=chunk_text,
document="cim10",
page=current_subcode_page,
code=current_subcode,
))
def _flush_code3():
"""Sauvegarde le chunk parent 3-char en cours."""
_flush_subcode()
if current_code3 and current_code3_text:
chunk_text = "\n".join(current_code3_text)
if len(chunk_text.split()) >= 5:
chunks.append(Chunk(
text=chunk_text,
document="cim10",
page=current_code3_page,
code=current_code3,
))
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
for line in text.split("\n"):
line = line.strip()
if not line:
continue
m_sub = subcode_pattern.match(line)
m3 = code3_pattern.match(line)
if m_sub:
# Nouveau sous-code → flush le sous-code précédent
_flush_subcode()
current_subcode = m_sub.group(1)
current_subcode_text = [line]
current_subcode_page = page_num
# Ajouter aussi au chunk parent
if current_code3:
current_code3_text.append(line)
elif m3 and not m_sub:
# Nouveau code 3-char → flush tout le bloc précédent
_flush_code3()
current_code3 = m3.group(1)
current_code3_text = [line]
current_code3_page = page_num
current_subcode = None
current_subcode_text = []
current_subcode_page = None
else:
# Ligne de continuation
if current_subcode:
current_subcode_text.append(line)
if current_code3:
current_code3_text.append(line)
# Flush final
_flush_code3()
logger.info("CIM-10 : %d chunks extraits (double chunking sous-codes + parents)", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Chunking Guide Méthodologique MCO
# ---------------------------------------------------------------------------
def _chunk_guide_methodo(pdf_path: Path) -> list[Chunk]:
"""Découpe le Guide Méthodologique MCO par sections/titres."""
chunks: list[Chunk] = []
current_title: str | None = None
current_text: list[str] = []
current_page: int | None = None
# Patterns de titres de sections (chapitres, sous-chapitres)
title_patterns = [
re.compile(r"^((?:CHAPITRE|TITRE|PARTIE)\s+[IVXLCDM0-9]+.*)$", re.IGNORECASE),
re.compile(r"^(\d+\.\d*\s+[A-ZÉÈÊÀÂÔÙÛÜ].{5,})$"),
re.compile(r"^([A-ZÉÈÊÀÂÔÙÛÜ][A-ZÉÈÊÀÂÔÙÛÜ\s]{10,})$"),
]
logger.info("Extraction des chunks Guide Métho depuis %s", pdf_path.name)
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
for line in text.split("\n"):
line = line.strip()
if not line:
continue
is_title = False
for pat in title_patterns:
if pat.match(line):
is_title = True
break
if is_title and len(line) > 8:
# Sauvegarder le chunk précédent
if current_title and current_text:
chunk_text = current_title + "\n" + "\n".join(current_text)
if len(chunk_text.split()) >= 20:
chunks.append(Chunk(
text=chunk_text,
document="guide_methodo",
page=current_page,
))
current_title = line
current_text = []
current_page = page_num
else:
current_text.append(line)
# Dernier chunk
if current_title and current_text:
chunk_text = current_title + "\n" + "\n".join(current_text)
if len(chunk_text.split()) >= 20:
chunks.append(Chunk(
text=chunk_text,
document="guide_methodo",
page=current_page,
))
# Si trop peu de chunks (le PDF ne suit pas les patterns de titre),
# fallback : découper par pages groupées par 3
if len(chunks) < 10:
logger.info("Guide Métho : fallback découpe par pages (peu de titres détectés)")
chunks = []
with pdfplumber.open(pdf_path) as pdf:
page_texts: list[str] = []
start_page = 1
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text:
page_texts.append(text)
if len(page_texts) >= 3:
combined = "\n".join(page_texts)
if len(combined.split()) >= 20:
chunks.append(Chunk(
text=combined,
document="guide_methodo",
page=start_page,
))
page_texts = []
start_page = page_num + 1
if page_texts:
combined = "\n".join(page_texts)
if len(combined.split()) >= 20:
chunks.append(Chunk(
text=combined,
document="guide_methodo",
page=start_page,
))
logger.info("Guide Métho : %d chunks extraits", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Chunking CCAM
# ---------------------------------------------------------------------------
def _chunk_ccam(pdf_path: Path) -> list[Chunk]:
"""Découpe le PDF CCAM en chunks par code d'acte."""
chunks: list[Chunk] = []
ccam_pattern = re.compile(r"([A-Z]{4}\d{3})\s+(.*)")
logger.info("Extraction des chunks CCAM depuis %s", pdf_path.name)
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
current_code: str | None = None
current_lines: list[str] = []
for line in text.split("\n"):
line = line.strip()
if not line:
continue
m = ccam_pattern.match(line)
if m:
if current_code and current_lines:
chunks.append(Chunk(
text="\n".join(current_lines),
document="ccam",
page=page_num,
code=current_code,
))
current_code = m.group(1)
current_lines = [line]
elif current_code:
current_lines.append(line)
if current_code and current_lines:
chunks.append(Chunk(
text="\n".join(current_lines),
document="ccam",
page=page_num,
code=current_code,
))
# Fallback : si aucun code CCAM détecté, indexer par page
if not chunks:
logger.info("CCAM : aucun code détecté, fallback par page")
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text and len(text.split()) >= 10:
chunks.append(Chunk(
text=text,
document="ccam",
page=page_num,
))
logger.info("CCAM : %d chunks extraits", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Chunking CCAM depuis le dictionnaire JSON
# ---------------------------------------------------------------------------
def _chunk_ccam_from_dict() -> list[Chunk]:
"""Génère des chunks CCAM depuis ccam_dict.json (un chunk par code+description).
Prioritaire sur les chunks PDF si le dictionnaire existe.
"""
if not CCAM_DICT_PATH.exists():
return []
import json as _json
with open(CCAM_DICT_PATH, encoding="utf-8") as f:
ccam_dict = _json.load(f)
chunks: list[Chunk] = []
for code, info in ccam_dict.items():
desc = info.get("description", "") if isinstance(info, dict) else str(info)
if not desc:
continue
regroupement = info.get("regroupement", "") if isinstance(info, dict) else ""
tarif = info.get("tarif_s1") if isinstance(info, dict) else None
text_parts = [f"{code} {desc}"]
if regroupement:
text_parts.append(f"Regroupement: {regroupement}")
if tarif is not None:
text_parts.append(f"Tarif S1: {tarif}")
chunks.append(Chunk(
text="\n".join(text_parts),
document="ccam",
code=code,
))
logger.info("CCAM dict : %d chunks générés depuis %s", len(chunks), CCAM_DICT_PATH)
return chunks
# ---------------------------------------------------------------------------
# Chunking CIM-10 Index Alphabétique
# ---------------------------------------------------------------------------
def _chunk_cim10_alpha(pdf_path: Path) -> list[Chunk]:
"""Parse la section INDEX ALPHABÉTIQUE du PDF CIM-10.
Détecte les entrées de type "terme → code" et génère des chunks
avec document="cim10_alpha".
"""
chunks: list[Chunk] = []
# Pattern : ligne avec un terme suivi d'un code CIM-10 en fin de ligne
entry_pattern = re.compile(r"^(.+?)\s+([A-Z]\d{2}(?:\.\d+)?)\s*$")
logger.info("Extraction de l'index alphabétique CIM-10 depuis %s", pdf_path.name)
in_alpha_section = False
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
# Détecter le début de la section index alphabétique
text_upper = text.upper()
if "INDEX ALPHAB" in text_upper:
in_alpha_section = True
# Certaines pages avant l'index : ne pas parser
if not in_alpha_section:
continue
for line in text.split("\n"):
line = line.strip()
if not line:
continue
m = entry_pattern.match(line)
if m:
terme = m.group(1).strip()
code = m.group(2)
if len(terme) >= 3:
chunks.append(Chunk(
text=f"{terme}{code}",
document="cim10_alpha",
page=page_num,
code=code,
))
logger.info("CIM-10 index alphabétique : %d entrées extraites", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Construction de l'index FAISS
# ---------------------------------------------------------------------------
def build_index(force: bool = False) -> None:
"""Construit l'index FAISS à partir des 3 PDFs de référence.
Args:
force: Si True, reconstruit même si l'index existe déjà.
"""
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
index_path = RAG_INDEX_DIR / "faiss.index"
meta_path = RAG_INDEX_DIR / "metadata.json"
if not force and index_path.exists() and meta_path.exists():
logger.info("Index FAISS déjà existant dans %s (use force=True pour reconstruire)", RAG_INDEX_DIR)
return
# Collecter tous les chunks
all_chunks: list[Chunk] = []
for pdf_path, chunk_fn in [
(CIM10_PDF, _chunk_cim10),
(GUIDE_METHODO_PDF, _chunk_guide_methodo),
]:
if pdf_path.exists():
all_chunks.extend(chunk_fn(pdf_path))
else:
logger.warning("PDF non trouvé : %s", pdf_path)
# CCAM : priorité au dictionnaire JSON sur le PDF
ccam_dict_chunks = _chunk_ccam_from_dict()
if ccam_dict_chunks:
all_chunks.extend(ccam_dict_chunks)
elif CCAM_PDF.exists():
all_chunks.extend(_chunk_ccam(CCAM_PDF))
else:
logger.warning("Ni dictionnaire CCAM ni PDF CCAM trouvé")
# CIM-10 index alphabétique (source additionnelle)
if CIM10_PDF.exists():
all_chunks.extend(_chunk_cim10_alpha(CIM10_PDF))
if not all_chunks:
logger.error("Aucun chunk extrait — vérifiez les chemins des PDFs")
return
logger.info("Total : %d chunks à indexer", len(all_chunks))
# Embeddings — GPU si disponible
import torch
_device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info("Chargement du modèle d'embedding %s (%s)...", EMBEDDING_MODEL, _device)
model = SentenceTransformer(EMBEDDING_MODEL, device=_device)
model.max_seq_length = 512 # CamemBERT max position embeddings
texts = [c.text[:2000] for c in all_chunks] # Tronquer les chunks trop longs
logger.info("Calcul des embeddings pour %d chunks...", len(texts))
embeddings = model.encode(
texts, show_progress_bar=True, normalize_embeddings=True, batch_size=64,
)
embeddings = np.array(embeddings, dtype=np.float32)
# Index FAISS (IndexFlatIP = cosine similarity avec vecteurs normalisés)
dim = embeddings.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(embeddings)
# Sauvegarder
RAG_INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(index, str(index_path))
metadata = [asdict(c) for c in all_chunks]
# Ne pas sauvegarder le texte complet dans metadata (trop lourd),
# garder un extrait de 800 chars (les sous-codes sont courts, besoin du contexte)
for m in metadata:
m["extrait"] = m.pop("text")[:800]
meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
logger.info("Index FAISS sauvegardé : %s (%d vecteurs, dim=%d)", index_path, len(all_chunks), dim)
def get_index() -> tuple | None:
"""Charge l'index FAISS et les métadonnées (singleton lazy-loaded).
Returns:
Tuple (faiss_index, metadata_list) ou None si l'index n'existe pas.
"""
global _faiss_index, _metadata
if _faiss_index is not None:
return _faiss_index, _metadata
index_path = RAG_INDEX_DIR / "faiss.index"
meta_path = RAG_INDEX_DIR / "metadata.json"
if not index_path.exists() or not meta_path.exists():
logger.warning("Index FAISS non trouvé dans %s — lancez build_index() d'abord", RAG_INDEX_DIR)
return None
import faiss
_faiss_index = faiss.read_index(str(index_path))
_metadata = json.loads(meta_path.read_text(encoding="utf-8"))
logger.info("Index FAISS chargé : %d vecteurs", _faiss_index.ntotal)
return _faiss_index, _metadata
# ---------------------------------------------------------------------------
# Chunking générique pour fichiers utilisateur (référentiels)
# ---------------------------------------------------------------------------
def chunk_user_file(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un fichier utilisateur en chunks pour indexation FAISS.
Dispatch selon l'extension :
- PDF : pages groupées par 2
- CSV/Excel : une ligne = un chunk
- TXT : paragraphes (blocs séparés par lignes vides)
Args:
file_path: Chemin du fichier.
doc_name: Nom du document (utilisé comme identifiant dans les métadonnées).
Returns:
Liste de Chunk prêts pour l'indexation.
"""
suffix = file_path.suffix.lower()
if suffix == ".pdf":
return _chunk_user_pdf(file_path, doc_name)
elif suffix in (".csv", ".xlsx", ".xls"):
return _chunk_user_tabular(file_path, doc_name)
elif suffix == ".txt":
return _chunk_user_txt(file_path, doc_name)
else:
logger.warning("Extension non supportée pour chunking : %s", suffix)
return []
def _chunk_user_pdf(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un PDF utilisateur en chunks de 2 pages."""
chunks: list[Chunk] = []
try:
with pdfplumber.open(file_path) as pdf:
page_texts: list[str] = []
start_page = 1
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text:
page_texts.append(text)
if len(page_texts) >= 2:
combined = "\n".join(page_texts)
if len(combined.split()) >= 10:
chunks.append(Chunk(
text=combined,
document=doc_name,
page=start_page,
))
page_texts = []
start_page = page_num + 1
if page_texts:
combined = "\n".join(page_texts)
if len(combined.split()) >= 10:
chunks.append(Chunk(
text=combined,
document=doc_name,
page=start_page,
))
except Exception:
logger.warning("Erreur lors du chunking PDF %s", file_path, exc_info=True)
logger.info("Référentiel PDF %s : %d chunks", doc_name, len(chunks))
return chunks
def _chunk_user_tabular(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un CSV/Excel : une ligne = un chunk."""
chunks: list[Chunk] = []
try:
import pandas as pd
suffix = file_path.suffix.lower()
if suffix == ".csv":
df = pd.read_csv(file_path, encoding="utf-8", on_bad_lines="skip")
else:
df = pd.read_excel(file_path)
for idx, row in df.iterrows():
text = " | ".join(str(v) for v in row.values if pd.notna(v))
if len(text.split()) >= 3:
chunks.append(Chunk(
text=text,
document=doc_name,
page=int(idx) + 1,
))
except Exception:
logger.warning("Erreur lors du chunking tabular %s", file_path, exc_info=True)
logger.info("Référentiel tabular %s : %d chunks", doc_name, len(chunks))
return chunks
def _chunk_user_txt(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un fichier TXT en paragraphes (blocs séparés par lignes vides)."""
chunks: list[Chunk] = []
try:
text = file_path.read_text(encoding="utf-8")
paragraphs = re.split(r"\n\s*\n", text)
for i, para in enumerate(paragraphs):
para = para.strip()
if len(para.split()) >= 5:
chunks.append(Chunk(
text=para,
document=doc_name,
page=i + 1,
))
except Exception:
logger.warning("Erreur lors du chunking TXT %s", file_path, exc_info=True)
logger.info("Référentiel TXT %s : %d chunks", doc_name, len(chunks))
return chunks
def add_chunks_to_index(chunks: list[Chunk]) -> int:
"""Ajoute des chunks à l'index FAISS existant (incrémental).
Charge l'index si nécessaire, encode les chunks, ajoute les vecteurs,
et sauvegarde le tout.
Args:
chunks: Liste de Chunk à ajouter.
Returns:
Nombre de chunks effectivement ajoutés.
"""
if not chunks:
return 0
import faiss
import numpy as np
from .rag_search import _get_embed_model
index_path = RAG_INDEX_DIR / "faiss.index"
meta_path = RAG_INDEX_DIR / "metadata.json"
# Charger l'index existant ou en créer un nouveau
if index_path.exists() and meta_path.exists():
faiss_idx = faiss.read_index(str(index_path))
metadata = json.loads(meta_path.read_text(encoding="utf-8"))
else:
model = _get_embed_model()
# Obtenir la dimension via un encodage test
test_vec = model.encode(["test"], normalize_embeddings=True)
dim = test_vec.shape[1]
faiss_idx = faiss.IndexFlatIP(dim)
metadata = []
# Encoder les nouveaux chunks
model = _get_embed_model()
texts = [c.text[:2000] for c in chunks]
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=64)
embeddings = np.array(embeddings, dtype=np.float32)
# Ajouter à l'index
faiss_idx.add(embeddings)
# Ajouter les métadonnées
from dataclasses import asdict
for chunk in chunks:
meta = asdict(chunk)
meta["extrait"] = meta.pop("text")[:800]
metadata.append(meta)
# Sauvegarder
RAG_INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(faiss_idx, str(index_path))
meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
# Invalider le singleton pour forcer le rechargement
reset_index()
logger.info("Index FAISS : %d chunks ajoutés (total : %d)", len(chunks), faiss_idx.ntotal)
return len(chunks)
def reset_index() -> None:
"""Invalide le singleton FAISS pour forcer le rechargement au prochain accès."""
global _faiss_index, _metadata
_faiss_index = None
_metadata = []