feat: découpage PDFs multi-dossiers (Trackare multi-épisodes, CRH concaténés)

Ajoute une étape de splitting entre extraction texte et parsing. Chaque chunk
est traité indépendamment par le pipeline existant, avec suffixe _partN en sortie.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-12 09:08:37 +01:00
parent 86d7ec5ea4
commit a00e5f1147
3 changed files with 397 additions and 35 deletions

View File

@@ -0,0 +1,124 @@
"""Découpage de PDFs multi-dossiers en chunks indépendants.
Certains PDFs contiennent plusieurs séjours/épisodes :
- Trackare : plusieurs Episode No dans un même export
- CRH : plusieurs lettres de sortie concaténées
Ce module insère une étape de splitting entre l'extraction texte et le parsing.
Chaque chunk est ensuite traité indépendamment par le pipeline existant.
"""
from __future__ import annotations
import re
import logging
logger = logging.getLogger(__name__)
def split_documents(text: str, doc_type: str) -> list[str]:
"""Point d'entrée : découpe un texte en chunks selon le type de document.
Retourne toujours au moins [text] (pas de split si un seul dossier).
"""
if doc_type == "trackare":
return _split_trackare(text)
elif doc_type == "crh":
return _split_crh(text)
return [text]
def _split_trackare(text: str) -> list[str]:
"""Découpe un export Trackare multi-épisodes.
Stratégie :
1. Compter les occurrences de "Episode No:"
2. Si une seule → pas de split
3. Si plusieurs → couper sur "Détails épisode" (ou second "Episode No:")
4. Préfixer le bloc patient à chaque chunk
"""
episodes = list(re.finditer(r"Episode No:\s*\d+", text))
if len(episodes) <= 1:
return [text]
logger.info(" Trackare multi-épisodes détecté : %d épisodes", len(episodes))
# Identifier le bloc patient (avant le premier épisode/détails épisode)
# Le bloc patient va du début jusqu'à "Détails épisode" ou le premier "Episode No:"
first_episode_start = episodes[0].start()
# Chercher "Détails épisode" qui précède chaque bloc épisode
details_markers = list(re.finditer(r"Détails épisode", text))
if len(details_markers) >= 2:
# Couper sur "Détails épisode"
split_points = [m.start() for m in details_markers]
# Le bloc patient = tout avant le premier "Détails épisode"
patient_block = text[:split_points[0]].rstrip()
else:
# Fallback : couper sur "Episode No:"
split_points = [m.start() for m in episodes]
# Le bloc patient = tout avant le premier "Episode No:"
# Remonter pour inclure "Détails épisode" s'il existe avant
if details_markers:
patient_block = text[:details_markers[0].start()].rstrip()
else:
patient_block = text[:split_points[0]].rstrip()
chunks: list[str] = []
for i, start in enumerate(split_points):
end = split_points[i + 1] if i + 1 < len(split_points) else len(text)
episode_text = text[start:end].rstrip()
# Préfixer le bloc patient pour que le parser ait les infos complètes
chunk = patient_block + "\n\n" + episode_text
chunks.append(chunk)
return chunks
def _split_crh(text: str) -> list[str]:
"""Découpe un PDF contenant plusieurs CRH concaténés.
Stratégie :
1. Détecter les frontières par headers patient (MME|M\\.|MR) suivis de
patterns CRH (dates séjour, "Mon cher confrère")
2. Si une seule occurrence → pas de split
3. Si plusieurs → couper sur chaque header patient
"""
# Chercher les headers patient typiques d'un début de CRH
# On cherche le pattern complet : titre + nom en majuscules
headers = list(re.finditer(
r"(?:^|\n)(?=\s*(?:MME|M\.|MR)\s+[A-ZÉÈÊËÀÂ]{2,})",
text,
))
if len(headers) <= 1:
return [text]
# Filtrer : ne garder que les headers qui sont vraiment des débuts de CRH
# (suivis dans les 2000 chars par un pattern CRH typique)
crh_starts: list[int] = []
for h in headers:
pos = h.start()
# Sauter le \n initial si présent
if text[pos:pos + 1] == "\n":
pos += 1
lookahead = text[pos:pos + 2000].lower()
if (re.search(r"du\s+\d{2}/\d{2}/\d{4}\s+au\s+\d{2}/\d{2}/\d{4}", lookahead)
or "mon cher confrère" in lookahead
or "cher confrère" in lookahead
or "chère consœur" in lookahead
or "compte rendu" in lookahead):
crh_starts.append(pos)
if len(crh_starts) <= 1:
return [text]
logger.info(" CRH multi-documents détecté : %d CRH", len(crh_starts))
chunks: list[str] = []
for i, start in enumerate(crh_starts):
end = crh_starts[i + 1] if i + 1 < len(crh_starts) else len(text)
chunks.append(text[start:end].rstrip())
return chunks

View File

@@ -13,6 +13,7 @@ from .anonymization.anonymizer import Anonymizer
from .config import ANONYMIZED_DIR, REPORTS_DIR, STRUCTURED_DIR, AnonymizationReport, DossierMedical
from .extraction.document_classifier import classify
from .extraction.crh_parser import parse_crh
from .extraction.document_splitter import split_documents
from .extraction.pdf_extractor import extract_text
from .extraction.trackare_parser import parse_trackare
from .medical.cim10_extractor import extract_medical_info
@@ -28,8 +29,11 @@ _use_edsnlp = True
_use_rag = True
def process_pdf(pdf_path: Path) -> tuple[str, DossierMedical, AnonymizationReport]:
"""Traite un PDF : extraction → parsing → anonymisation → extraction CIM-10."""
def process_pdf(pdf_path: Path) -> list[tuple[str, DossierMedical, AnonymizationReport]]:
"""Traite un PDF : extraction → splitting → parsing → anonymisation → extraction CIM-10.
Retourne une liste de (texte_anonymisé, dossier, rapport) — un par dossier détecté.
"""
t0 = time.time()
logger.info("Traitement de %s", pdf_path.name)
@@ -41,40 +45,53 @@ def process_pdf(pdf_path: Path) -> tuple[str, DossierMedical, AnonymizationRepor
doc_type = classify(raw_text)
logger.info(" Type de document : %s", doc_type)
# 3. Parsing
if doc_type == "trackare":
parsed = parse_trackare(raw_text)
else:
parsed = parse_crh(raw_text)
# 3. Splitting multi-dossiers
chunks = split_documents(raw_text, doc_type)
if len(chunks) > 1:
logger.info(" Découpage : %d dossiers détectés dans %s", len(chunks), pdf_path.name)
# 4. Anonymisation
anonymizer = Anonymizer(parsed_data=parsed)
anonymized_text = anonymizer.anonymize(raw_text)
report = anonymizer.report
report.source_file = pdf_path.name
logger.info(
" Anonymisation : %d remplacements (regex=%d, ner=%d, sweep=%d)",
report.total_replacements,
report.regex_replacements,
report.ner_replacements,
report.sweep_replacements,
)
results: list[tuple[str, DossierMedical, AnonymizationReport]] = []
for i, chunk_text in enumerate(chunks):
part_label = f" [part {i+1}/{len(chunks)}]" if len(chunks) > 1 else ""
logger.info(" Traitement%s...", part_label)
# 5. Analyse edsnlp (optionnelle)
edsnlp_result = None
if _use_edsnlp:
edsnlp_result = _run_edsnlp(anonymized_text)
# 4. Parsing
if doc_type == "trackare":
parsed = parse_trackare(chunk_text)
else:
parsed = parse_crh(chunk_text)
# 6. Extraction médicale CIM-10
dossier = extract_medical_info(parsed, anonymized_text, edsnlp_result, use_rag=_use_rag)
dossier.source_file = pdf_path.name
dossier.document_type = doc_type
dossier.processing_time_s = round(time.time() - t0, 2)
logger.info(" DP : %s", dossier.diagnostic_principal)
logger.info(" DAS : %d, Actes : %d", len(dossier.diagnostics_associes), len(dossier.actes_ccam))
logger.info(" Temps de traitement : %.2fs", dossier.processing_time_s)
# 5. Anonymisation
anonymizer = Anonymizer(parsed_data=parsed)
anonymized_text = anonymizer.anonymize(chunk_text)
report = anonymizer.report
report.source_file = pdf_path.name
logger.info(
" Anonymisation%s : %d remplacements (regex=%d, ner=%d, sweep=%d)",
part_label,
report.total_replacements,
report.regex_replacements,
report.ner_replacements,
report.sweep_replacements,
)
return anonymized_text, dossier, report
# 6. Analyse edsnlp (optionnelle)
edsnlp_result = None
if _use_edsnlp:
edsnlp_result = _run_edsnlp(anonymized_text)
# 7. Extraction médicale CIM-10
dossier = extract_medical_info(parsed, anonymized_text, edsnlp_result, use_rag=_use_rag)
dossier.source_file = pdf_path.name
dossier.document_type = doc_type
dossier.processing_time_s = round(time.time() - t0, 2)
logger.info(" DP%s : %s", part_label, dossier.diagnostic_principal)
logger.info(" DAS : %d, Actes : %d", len(dossier.diagnostics_associes), len(dossier.actes_ccam))
results.append((anonymized_text, dossier, report))
logger.info(" Temps total : %.2fs", time.time() - t0)
return results
def _run_edsnlp(text: str):
@@ -252,10 +269,13 @@ def main(input_path: str | None = None) -> None:
group_dossiers: list[DossierMedical] = []
for pdf_path in pdfs:
try:
anonymized_text, dossier, report = process_pdf(pdf_path)
pdf_results = process_pdf(pdf_path)
stem = pdf_path.stem.replace(" ", "_")
write_outputs(stem, anonymized_text, dossier, report, subdir=subdir)
group_dossiers.append(dossier)
multi = len(pdf_results) > 1
for part_idx, (anonymized_text, dossier, report) in enumerate(pdf_results):
part_stem = f"{stem}_part{part_idx + 1}" if multi else stem
write_outputs(part_stem, anonymized_text, dossier, report, subdir=subdir)
group_dossiers.append(dossier)
except Exception:
logger.exception("Erreur lors du traitement de %s", pdf_path.name)