feat: pipeline T2A - anonymisation, extraction CIM-10 et intégration edsnlp

Pipeline complet de traitement de documents médicaux PDF :
- Extraction texte (pdfplumber) et classification (Trackare/CRH)
- Anonymisation multi-couche (regex + NER CamemBERT + sweep)
- Extraction médicale CIM-10 hybride : edsnlp (AP-HP) enrichit les
  diagnostics, médicaments (codes ATC via Romedi) et négation,
  avec fallback regex pour les patterns spécifiques
- Fix sentencepiece pinné à <0.2.0 pour compatibilité CamemBERT

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-10 15:24:12 +01:00
commit 4a12cd2676
25 changed files with 7592 additions and 0 deletions

0
src/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,529 @@
"""Pipeline d'anonymisation en 3 phases : regex → NER → balayage final."""
from __future__ import annotations
import logging
import re
from typing import Any
import regex as regex_mod
from ..config import KEEP_ESTABLISHMENT_NAME, AnonymizationReport
from . import regex_patterns as patterns
from .entity_registry import EntityRegistry
from .ner_anonymizer import extract_person_entities
logger = logging.getLogger(__name__)
# Termes médicaux à ne pas anonymiser même s'ils ressemblent à des noms
MEDICAL_TERMS_WHITELIST = {
"balthazar", "sris", "ras", "atg", "pca", "bcy", "bcr",
"nac", "nacl", "asat", "alat", "ggt", "pal", "crp", "imc",
"en", "pa", "fc", "vvp", "ide", "iao", "mco", "urg", "bh",
"kt", "vbp", "iv", "ap", "am", "ok", "apres", "sous",
"normal", "normaux", "stable", "absent", "absente",
"date", "heure", "type", "note", "etat", "code",
"orale", "intraveineuse", "signé", "arrêté", "réalisé",
# Termes médicaux fréquents à ne jamais anonymiser
"cholécystectomie", "cholecystectomie", "cholangiographie",
"pancréatite", "pancreatite", "lithiase", "lithiases",
"cœlioscopie", "coelioscopie", "cholédoque", "choledoque",
"angiocholite", "cholécystite", "cholecystite",
"morphine", "paracétamol", "paracetamol", "cétirizine", "cetirizine",
"tramadol", "contramal", "acupan", "nefopam",
"service", "médecin", "medecin", "docteur", "chirurgie",
"gastro", "entérologie", "enterologie", "oncologie",
"hépato", "hepato", "digestif", "digestive",
"proctologue", "nutritive", "pôle", "pole",
"fonct", "fonctionnelle", "fonctionnelles",
"praticiens", "hospitaliers", "interne", "clinique",
"desc", "chef",
"secrétariat", "infirmier", "infirmière",
"unité", "hospitalisation", "urgences",
"coordonnateur", "fédération", "federation",
"navarre", "institut", "cancérologie",
"bordeaux", "strasbourg", "reims", "limoges", "clermont", "ferrand",
"palais",
}
# Noms d'établissement à préserver si configuré
ESTABLISHMENT_NAMES = {
"centre hospitalier cote basque",
"centre hospitalier côte basque",
"ch-cotebasque",
"icance",
}
class Anonymizer:
"""Anonymiseur 3 phases pour documents médicaux."""
def __init__(self, parsed_data: dict | None = None):
self.registry = EntityRegistry(whitelist=MEDICAL_TERMS_WHITELIST)
self.report = AnonymizationReport(source_file="")
self._parsed = parsed_data or {}
# Pré-enregistrer les entités connues du parsing
self._register_parsed_entities()
def anonymize(self, text: str) -> str:
"""Exécute les 3 phases d'anonymisation."""
text = self._phase1_regex(text)
text = self._phase2_ner(text)
text = self._phase3_sweep(text)
self.report.total_replacements = (
self.report.regex_replacements
+ self.report.ner_replacements
+ self.report.sweep_replacements
)
return text
# --- Phase 1 : Regex ---
def _phase1_regex(self, text: str) -> str:
"""Anonymisation par patterns regex."""
count = 0
# CRH footer combiné (IPP + Episode sur la même ligne)
text, n = self._replace_crh_footer_ipp_episode(text)
count += n
# Identifiants
text, n = self._replace_pattern(
text, patterns.IPP_PATTERN, "ipp",
group_handler=self._handle_multi_group,
)
count += n
text, n = self._replace_pattern(
text, patterns.EPISODE_PATTERN, "episode",
group_handler=self._handle_multi_group,
)
count += n
text, n = self._replace_pattern(text, patterns.NIR_PATTERN, "nir")
count += n
text, n = self._replace_pattern(text, patterns.FINESS_PATTERN, "finess")
count += n
text, n = self._replace_pattern(text, patterns.RPPS_PATTERN, "rpps")
count += n
text, n = self._replace_pattern(text, patterns.BARCODE_PATTERN, "code_barre")
count += n
text, n = self._replace_pattern(text, patterns.BARCODE_REPEAT_PATTERN, "code_barre")
count += n
# Contact
text, n = self._replace_phone(text)
count += n
text, n = self._replace_pattern(
text, patterns.EMAIL_PATTERN, "email",
skip_establishment_check=True,
)
count += n
text, n = self._replace_fax(text)
count += n
# Adresses
text, n = self._replace_addresses(text)
count += n
# Scanner les patterns d'adresse inline (MAISON xxx, QUARTIER xxx...)
text, n = self._replace_inline_addresses(text)
count += n
# Dates de naissance
text, n = self._replace_date_naissance(text)
count += n
# Lieu de naissance
text, n = self._replace_pattern(
text, patterns.LIEU_NAISSANCE_PATTERN, "lieu_naissance",
)
count += n
# Noms structurés
text, n = self._replace_structured_names(text)
count += n
# Footers (Trackare et CRH)
text, n = self._replace_footer(text)
count += n
self.report.regex_replacements = count
return text
# --- Phase 2 : NER ---
def _phase2_ner(self, text: str) -> str:
"""Anonymisation par NER CamemBERT."""
try:
ner_entities = extract_person_entities(text)
except Exception as e:
logger.warning("NER indisponible (%s), phase 2 ignorée.", e)
return text
count = 0
# Trier par position décroissante pour remplacer de la fin au début
ner_entities.sort(key=lambda e: e["start"], reverse=True)
for ent in ner_entities:
word = ent["word"]
if self._is_whitelisted(word):
continue
if self._is_establishment(word):
continue
# Vérifier si déjà anonymisé (contient des crochets)
if "[" in word and "]" in word:
continue
pseudo = self.registry.get_replacement(word)
if pseudo is None:
pseudo = self.registry.register(word, "personne")
text = text[:ent["start"]] + pseudo + text[ent["end"]:]
count += 1
self.report.entities_found.append({
"original": word,
"replacement": pseudo,
"source": "ner",
"score": ent["score"],
})
self.report.ner_replacements = count
return text
# --- Phase 3 : Balayage final ---
def _phase3_sweep(self, text: str) -> str:
"""Balayage brute-force des entités connues restantes."""
count = 0
all_entities = self.registry.get_all_entities()
for original, replacement in sorted(
all_entities.items(), key=lambda x: len(x[0]), reverse=True
):
if len(original) < 3:
continue
if self._is_whitelisted(original):
continue
# Recherche insensible à la casse, avec frontières de mots
escaped = re.escape(original)
pattern = re.compile(r"\b" + escaped + r"\b", re.IGNORECASE)
matches = pattern.findall(text)
if matches:
text = pattern.sub(replacement, text)
count += len(matches)
self.report.sweep_replacements = count
return text
# --- Helpers ---
def _register_parsed_entities(self) -> None:
"""Pré-enregistre les entités extraites par les parsers."""
patient = self._parsed.get("patient", {})
# Noms patient
for key in ("nom_prenom", "nom_naissance", "nom_complet"):
if patient.get(key):
self.registry.register(patient[key], "patient")
# Adresse patient — enregistrer l'adresse complète et chaque mot significatif
if patient.get("adresse"):
self._register_address(patient["adresse"])
if patient.get("ville"):
self.registry.register(patient["ville"], "adresse")
if patient.get("code_postal"):
cp = patient["code_postal"]
if patient.get("ville"):
self.registry.register(f"{cp} {patient['ville']}", "adresse")
if patient.get("lieu_naissance"):
self.registry.register(patient["lieu_naissance"], "lieu_naissance")
# Médecins
for med in self._parsed.get("medecins", []):
self.registry.register(med, "medecin")
# Scanner le texte brut pour les lignes d'adresse non captées par le parser
raw_text = self._parsed.get("contenu_medical", "")
# Pas disponible ici, on le fera via les patterns dans phase 1
# Contacts
for contact in self._parsed.get("contacts", []):
# Extraire les noms des contacts
names = re.findall(
r"([A-ZÉÈÊËÀÂa-zéèêëàâ]{2,}(?:\s+[A-ZÉÈÊËÀÂa-zéèêëàâ]{2,})+)",
contact,
)
for name in names:
if not self._is_whitelisted(name):
self.registry.register(name, "contact")
def _replace_pattern(
self,
text: str,
pattern: regex_mod.Pattern,
category: str,
group_handler: Any = None,
skip_establishment_check: bool = False,
) -> tuple[str, int]:
"""Remplace les matches d'un pattern."""
count = 0
for m in reversed(list(pattern.finditer(text))):
if group_handler:
matched_text = group_handler(m)
else:
matched_text = m.group(1) if m.lastindex else m.group(0)
if not matched_text:
continue
if not skip_establishment_check and self._is_establishment(matched_text):
continue
pseudo = self.registry.register(matched_text, category)
# Trouver le bon span à remplacer
if group_handler:
# Pour les multi-group, trouver quel groupe a matché
for i in range(1, (m.lastindex or 0) + 1):
if m.group(i) == matched_text:
start, end = m.span(i)
break
else:
start, end = m.span()
elif m.lastindex:
start, end = m.span(1)
else:
start, end = m.span()
text = text[:start] + pseudo + text[end:]
count += 1
self.report.entities_found.append({
"original": matched_text,
"replacement": pseudo,
"source": "regex",
"category": category,
})
return text, count
def _handle_multi_group(self, m: regex_mod.Match) -> str | None:
"""Gère les patterns avec plusieurs groupes alternatifs."""
for i in range(1, (m.lastindex or 0) + 1):
if m.group(i):
return m.group(i)
return None
def _replace_crh_footer_ipp_episode(self, text: str) -> tuple[str, int]:
"""Remplace les IPP/épisode dans les footers CRH (format combiné)."""
count = 0
for m in reversed(list(patterns.CRH_FOOTER_IPP_EPISODE.finditer(text))):
ipp = m.group(1)
episode = m.group(2)
pseudo_ipp = self.registry.register(ipp, "ipp")
pseudo_ep = self.registry.register(episode, "episode")
replacement = f"IPP {pseudo_ipp} / N° Episode {pseudo_ep}"
text = text[:m.start()] + replacement + text[m.end():]
count += 2
return text, count
def _replace_phone(self, text: str) -> tuple[str, int]:
"""Remplace les numéros de téléphone."""
count = 0
for m in reversed(list(patterns.PHONE_PATTERN.finditer(text))):
phone = m.group(0)
# Ne pas anonymiser le standard de l'hôpital si configuré
normalized = phone.replace(".", " ").replace("-", " ")
if KEEP_ESTABLISHMENT_NAME and "05 59 44 35 35" in normalized:
continue
pseudo = self.registry.register(phone, "telephone")
text = text[:m.start()] + pseudo + text[m.end():]
count += 1
return text, count
def _replace_fax(self, text: str) -> tuple[str, int]:
"""Remplace les numéros de fax."""
count = 0
for m in reversed(list(patterns.FAX_PATTERN.finditer(text))):
fax_num = m.group(1)
pseudo = self.registry.register(fax_num, "telephone")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
return text, count
def _replace_addresses(self, text: str) -> tuple[str, int]:
"""Remplace les adresses."""
count = 0
# Lignes d'adresse
for m in reversed(list(patterns.ADDRESS_LINE_PATTERN.finditer(text))):
addr = m.group(1).strip()
if len(addr) > 5 and not self._is_establishment(addr):
pseudo = self.registry.register(addr, "adresse")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Code postal + ville (sauf l'hôpital / Bayonne)
for m in reversed(list(patterns.CP_VILLE_PATTERN.finditer(text))):
ville = m.group(2).strip()
cp = m.group(1)
full = f"{cp} {ville}"
if self._is_establishment(full) or "BAYONNE" in ville.upper():
if not KEEP_ESTABLISHMENT_NAME:
pseudo = self.registry.register(full, "adresse")
text = text[:m.start()] + pseudo + text[m.end():]
count += 1
else:
pseudo = self.registry.register(full, "adresse")
text = text[:m.start()] + pseudo + text[m.end():]
count += 1
return text, count
def _replace_inline_addresses(self, text: str) -> tuple[str, int]:
"""Capture les adresses inline (MAISON xxx, QUARTIER xxx, LOTISSEMENT xxx)."""
count = 0
# Pattern : MAISON/QUARTIER/LOTISSEMENT suivi de mots (noms propres de lieux)
inline_addr = re.compile(
r"((?:MAISON|QUARTIER|LOTISSEMENT|RESIDENCE|HAMEAU)\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s]+?)(?=\n|$|Dr|\d{5}|Chef|médical|coordonnateur)",
re.IGNORECASE,
)
for m in reversed(list(inline_addr.finditer(text))):
addr = m.group(1).strip()
if len(addr) > 5:
self._register_address(addr)
pseudo = self.registry.register(addr, "adresse")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
return text, count
def _replace_date_naissance(self, text: str) -> tuple[str, int]:
"""Remplace les dates de naissance."""
count = 0
for m in reversed(list(patterns.DATE_NAISSANCE_PATTERN.finditer(text))):
date_str = m.group(1)
pseudo = self.registry.register(date_str, "date_naissance")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
return text, count
def _replace_structured_names(self, text: str) -> tuple[str, int]:
"""Remplace les noms détectés par patterns structurels."""
count = 0
# CRH footer patient : "Patient(e) : NOM PRENOM Né(e)"
for m in reversed(list(patterns.CRH_FOOTER_PATIENT_PATTERN.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "patient")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Patient names
for pat in [patterns.PATIENT_NAME_PATTERN, patterns.CIVILITE_NAME_PATTERN]:
for m in reversed(list(pat.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "patient")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Doctor names (tous les patterns)
for pat in [patterns.DR_NAME_PATTERN, patterns.MEDECIN_COURANT_PATTERN,
patterns.MEDECIN_TRAITANT_PATTERN, patterns.MEDECIN_PEC_PATTERN]:
for m in reversed(list(pat.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "medecin")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Note authors (with date suffix)
for m in reversed(list(patterns.NOTE_AUTHOR_DATE_PATTERN.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "soignant")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Note authors (Prénom NOM pattern, sans date)
for m in reversed(list(patterns.NOTE_AUTHOR_PATTERN.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "soignant")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# IAO
for m in reversed(list(patterns.IAO_PATTERN.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "soignant")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Rédigé par
for m in reversed(list(patterns.REDIGE_PAR_PATTERN.finditer(text))):
name = m.group(1).strip()
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "soignant")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
# Staff names from header
for m in reversed(list(patterns.STAFF_NAME_PATTERN.finditer(text))):
name = m.group(1).strip() if m.group(1) else ""
if len(name) >= 3 and not self._is_whitelisted(name):
pseudo = self.registry.register(name, "soignant")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
self.report.regex_replacements += count
return text, count
def _replace_footer(self, text: str) -> tuple[str, int]:
"""Remplace les infos patient dans les footers (Trackare et CRH)."""
count = 0
for m in reversed(list(patterns.FOOTER_PATIENT_PATTERN.finditer(text))):
name = m.group(1).strip()
pseudo = self.registry.register(name, "patient")
text = text[:m.start(1)] + pseudo + text[m.end(1):]
count += 1
return text, count
def _register_address(self, addr: str) -> None:
"""Enregistre une adresse et ses mots significatifs."""
self.registry.register(addr, "adresse")
skip_words = {
"maison", "quartier", "lotissement", "rue", "avenue",
"boulevard", "chemin", "place", "route", "résidence",
"hameau", "lieu", "dit", "impasse", "allée", "batiment",
"bp", "cedex",
}
for word in addr.split():
word_clean = word.strip(",.")
if len(word_clean) >= 4 and word_clean.lower() not in skip_words:
self.registry.register(word_clean, "adresse")
def _is_whitelisted(self, text: str) -> bool:
"""Vérifie si un terme est dans la whitelist médicale."""
return text.lower().strip() in MEDICAL_TERMS_WHITELIST
def _is_establishment(self, text: str) -> bool:
"""Vérifie si le texte fait référence à l'établissement."""
if not KEEP_ESTABLISHMENT_NAME:
return False
text_lower = text.lower().strip()
return any(est in text_lower for est in ESTABLISHMENT_NAMES)

View File

@@ -0,0 +1,86 @@
"""Registre d'entités pour assurer la cohérence des remplacements."""
from __future__ import annotations
import re
from collections import defaultdict
class EntityRegistry:
"""Maintient un mapping cohérent entre entités réelles et pseudonymes."""
def __init__(self, whitelist: set[str] | None = None):
self._counters: dict[str, int] = defaultdict(int)
self._mappings: dict[str, str] = {}
self._category_map: dict[str, str] = {}
self._whitelist: set[str] = whitelist or set()
def register(self, entity: str, category: str) -> str:
"""Enregistre une entité et retourne son pseudonyme.
Si l'entité est déjà connue, retourne le même pseudonyme.
"""
key = self._normalize(entity)
if not key:
return entity
if key in self._mappings:
return self._mappings[key]
self._counters[category] += 1
count = self._counters[category]
pseudo = self._generate_pseudo(category, count)
self._mappings[key] = pseudo
self._category_map[key] = category
# Enregistrer aussi les sous-parties du nom (sauf termes médicaux)
parts = key.split()
if len(parts) > 1:
for part in parts:
if len(part) >= 3 and part not in self._whitelist:
part_key = part
if part_key not in self._mappings:
self._mappings[part_key] = f"[{category.upper()}]"
return pseudo
def get_replacement(self, entity: str) -> str | None:
"""Retourne le pseudonyme d'une entité connue, ou None."""
key = self._normalize(entity)
return self._mappings.get(key)
def get_all_entities(self) -> dict[str, str]:
"""Retourne tous les mappings entity → pseudo."""
return dict(self._mappings)
def get_all_original_names(self) -> list[str]:
"""Retourne toutes les entités originales (noms avant normalisation)."""
return list(self._mappings.keys())
def _normalize(self, text: str) -> str:
"""Normalise un nom pour lookup : minuscules, espaces simplifiés."""
text = text.strip()
text = re.sub(r"\s+", " ", text)
return text.lower()
def _generate_pseudo(self, category: str, count: int) -> str:
"""Génère un pseudonyme selon la catégorie."""
labels = {
"patient": f"[PATIENT_{count}]",
"medecin": f"[MEDECIN_{count}]",
"soignant": f"[SOIGNANT_{count}]",
"contact": f"[CONTACT_{count}]",
"personne": f"[PERSONNE_{count}]",
"ipp": f"[IPP_{count}]",
"episode": f"[EPISODE_{count}]",
"nir": f"[NIR_{count}]",
"telephone": f"[TEL_{count}]",
"email": f"[EMAIL_{count}]",
"adresse": f"[ADRESSE_{count}]",
"date_naissance": f"[DATE_NAISS_{count}]",
"lieu_naissance": f"[LIEU_NAISS_{count}]",
"finess": f"[FINESS]",
"code_barre": f"[CODE_BARRE_{count}]",
}
return labels.get(category, f"[{category.upper()}_{count}]")

View File

@@ -0,0 +1,95 @@
"""NER via CamemBERT pour détecter les noms en texte libre."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from ..config import NER_CONFIDENCE_THRESHOLD, NER_MODEL
if TYPE_CHECKING:
from transformers import Pipeline
logger = logging.getLogger(__name__)
_pipeline: Pipeline | None = None
def _get_pipeline() -> Pipeline:
"""Charge le modèle NER (lazy loading)."""
global _pipeline
if _pipeline is None:
logger.info("Chargement du modèle NER %s...", NER_MODEL)
from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline
tokenizer = AutoTokenizer.from_pretrained(NER_MODEL)
model = AutoModelForTokenClassification.from_pretrained(NER_MODEL)
_pipeline = pipeline(
"ner",
model=model,
tokenizer=tokenizer,
aggregation_strategy="simple",
)
logger.info("Modèle NER chargé.")
return _pipeline
def extract_person_entities(text: str) -> list[dict]:
"""Extrait les entités de type PER (personnes) du texte.
Retourne une liste de dicts avec 'word', 'start', 'end', 'score'.
"""
pipe = _get_pipeline()
# CamemBERT a une limite de tokens — découper en chunks
chunks = _split_text(text, max_chars=500)
entities: list[dict] = []
offset = 0
for chunk in chunks:
results = pipe(chunk)
for ent in results:
if ent["entity_group"] == "PER" and ent["score"] >= NER_CONFIDENCE_THRESHOLD:
word = ent["word"].strip()
if len(word) >= 2:
entities.append({
"word": word,
"start": ent["start"] + offset,
"end": ent["end"] + offset,
"score": float(ent["score"]),
})
offset += len(chunk)
return _deduplicate(entities)
def _split_text(text: str, max_chars: int = 500) -> list[str]:
"""Découpe le texte en chunks de taille raisonnable aux limites de phrases."""
if len(text) <= max_chars:
return [text]
chunks: list[str] = []
start = 0
while start < len(text):
end = start + max_chars
if end < len(text):
# Chercher la fin de phrase la plus proche
for sep in ["\n", ". ", ", ", " "]:
pos = text.rfind(sep, start, end)
if pos > start:
end = pos + len(sep)
break
chunks.append(text[start:end])
start = end
return chunks
def _deduplicate(entities: list[dict]) -> list[dict]:
"""Déduplique les entités par mot (garde le score le plus élevé)."""
seen: dict[str, dict] = {}
for ent in entities:
key = ent["word"].lower()
if key not in seen or ent["score"] > seen[key]["score"]:
seen[key] = ent
return list(seen.values())

View File

@@ -0,0 +1,194 @@
"""Patterns regex pour la détection de données personnelles dans les documents médicaux FR."""
from __future__ import annotations
import regex
# --- Identifiants ---
# IPP : séquence de 6-10 chiffres après "IPP" (avec ou sans :)
IPP_PATTERN = regex.compile(
r"(?:IPP\s*[:=]?\s*)(\d{6,10})"
r"|"
r"\((\d{8})\s*\)", # Footer "(01306172 )"
)
# Numéro d'épisode (toutes les variantes)
EPISODE_PATTERN = regex.compile(
r"(?:Episode\s*(?:No|N°|N\.?)\s*[:=]?\s*)(\d{6,10})"
r"|"
r"(?:N°\s*Episode\s+)(\d{6,10})",
)
# NIR / Numéro de sécurité sociale (15 chiffres)
NIR_PATTERN = regex.compile(r"\b([12]\d{2}(?:0[1-9]|1[0-2])\d{2,3}\d{6}\s?\d{2})\b")
# FINESS (9 chiffres, souvent précédé de "Finess")
FINESS_PATTERN = regex.compile(r"(?:Finess|FINESS)\s*[:\s]*\*?(\d{9})\*?")
# RPPS (11 chiffres)
RPPS_PATTERN = regex.compile(r"RPPS\s*[:=]?\s*(\d{11})")
# Code-barres (nombre entre astérisques)
BARCODE_PATTERN = regex.compile(r"\*(\d{9,15})\*")
# Numéro isolé après code-barres (même numéro répété sans astérisques)
BARCODE_REPEAT_PATTERN = regex.compile(r"\*\d{9,15}\*\s*\n(\d{9,15})")
# --- Contact ---
# Téléphones FR : 10 chiffres avec séparateurs variés
PHONE_PATTERN = regex.compile(
r"\b(0[1-9])[\s.\-]?(\d{2})[\s.\-]?(\d{2})[\s.\-]?(\d{2})[\s.\-]?(\d{2})\b"
)
# Emails (y compris @ch-cotebasque.fr qui contiennent des initiales de soignants)
EMAIL_PATTERN = regex.compile(
r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b"
)
# Fax
FAX_PATTERN = regex.compile(
r"Fax\s*:\s*(0[1-9][\s.\-]?\d{2}[\s.\-]?\d{2}[\s.\-]?\d{2}[\s.\-]?\d{2})"
)
# --- Adresses ---
# Code postal + ville (uniquement les ALL_CAPS après 5 digits)
CP_VILLE_PATTERN = regex.compile(
r"\b(\d{5})\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\s\-]{2,})\b"
)
# Lignes d'adresse avec mots-clés (y compris noms propres basques/locaux)
ADDRESS_LINE_PATTERN = regex.compile(
r"^((?:(?:\d+\s*,?\s*)?(?:MAISON|LOTISSEMENT|QUARTIER|RUE|AVENUE|BOULEVARD|IMPASSE|CHEMIN|PLACE|ALLEE|ALLÉE|ROUTE|LIEU[\s-]DIT|RESIDENCE|RÉSIDENCE|BATIMENT|BÂTIMENT|HAMEAU)[\s\w\-''ÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]+))$",
regex.MULTILINE | regex.IGNORECASE,
)
# Adresse complète multi-ligne (après nom patient dans CRH/Trackare)
ADDRESS_BLOCK_PATTERN = regex.compile(
r"(?:Adresse\s*:\s*)(.+?)(?:\s+Ville|\n)",
)
# --- Dates de naissance ---
# Toutes les variantes : "né(e) le", "née le", "né le", "Né(e) le", "Date de naissance:"
DATE_NAISSANCE_PATTERN = regex.compile(
r"(?:[Nn][ée]+(?:\(e\))?\s+le\s+|Date de naissance\s*[:=]?\s*)(\d{2}/\d{2}/\d{4})"
)
# --- Noms structurés ---
# Footer CRH : "Patient(e) : NOM PRENOM Né(e) le"
CRH_FOOTER_PATIENT_PATTERN = regex.compile(
r"Patient(?:\(e\))?\s*:\s*([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s\-]+?)\s+(?:Né|né)"
)
# Footer CRH : "IPP NNNNNNNN / N° Episode NNNNNNNN"
CRH_FOOTER_IPP_EPISODE = regex.compile(
r"IPP\s+(\d{6,10})\s*/\s*N°\s*Episode\s+(\d{6,10})"
)
# Après "Nom de naissance:", "Nom et Prénom:", "Patient(e):"
PATIENT_NAME_PATTERN = regex.compile(
r"(?:Patient(?:\(e\))?\s*:\s*|Nom de naissance\s*:\s*|Nom et Prénom\s*:\s*)"
r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s\-]+)",
)
# "MME/Mme/M./MR/Madame/Monsieur" suivi du nom
CIVILITE_NAME_PATTERN = regex.compile(
r"(?:MME|Mme|Madame|M\.|Mr|MR|Monsieur)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s\.\-]+?)(?:\s+[Nn]é|\s+Date|\n|,)"
)
# "DR." / "Dr" / "Docteur" suivi du nom du médecin
DR_NAME_PATTERN = regex.compile(
r"(?:DR\.?|Dr\.?|Docteur)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+){0,2})"
)
# "Rédigé par" en pied de page CRH
REDIGE_PAR_PATTERN = regex.compile(
r"Rédigé par\s*:?\s*(.+?)(?:\n|$)"
)
# "Liste des destinataires:" suivi de noms
DESTINATAIRE_PATTERN = regex.compile(
r"(?:Madame|Monsieur|DR\.?|Dr\.?)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s\.\-]+?)(?:\n|$)"
)
# Noms d'auteurs dans Trackare : "Note d'évolution Prénom NOM DD/MM/YYYY"
NOTE_AUTHOR_DATE_PATTERN = regex.compile(
r"(?:Note d'évolution|Note IDE|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+"
r"(?:DR\.?\s+)?"
r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+)+)"
r"\s+\d{2}/\d{2}/\d{4}",
)
# Noms d'auteurs Trackare sans date immédiate : "Note IDE Prénom NOM texte..."
# Le nom est toujours un Prénom (Capitalized) suivi d'un NOM (ALL CAPS)
NOTE_AUTHOR_PATTERN = regex.compile(
r"(?:Note d'évolution|Note IDE|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+"
r"(?:DR\.?\s+)?"
r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][a-zéèêëàâäùûüôöîïç]+\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\-]{2,})"
)
# Footer Trackare : "Patient: NOM PRENOM - Date de naissance: ..."
FOOTER_PATIENT_PATTERN = regex.compile(
r"Patient\s*:\s*([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s\-]+?)\s*-\s*Date de naissance"
)
# "Médecin traitant" block
MEDECIN_TRAITANT_PATTERN = regex.compile(
r"Médecin traitant\s*\n\s*(?:Nom\s+Adresse\s+.*\n)?\s*(?:DR\.?\s+)?(.+?)(?:\s+(?:Lotissement|Rue|Avenue|\d{5}))",
regex.IGNORECASE,
)
# "Médecin courant:"
MEDECIN_COURANT_PATTERN = regex.compile(
r"Médecin courant\s*:\s*(?:DR\.?\s+)?([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+)*)"
)
# "Médecin de la prise en charge médicale NOM"
MEDECIN_PEC_PATTERN = regex.compile(
r"(?:Médecin de (?:la )?(?:prise en charge|décision)\s+médicale)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+)*)"
)
# IAO
IAO_PATTERN = regex.compile(
r"IAO\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+)*)"
)
# Cadre / personnel nommé dans l'en-tête CRH
STAFF_NAME_PATTERN = regex.compile(
r"(?:Mme|M\.)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\.\-]+\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\.\-]+)"
)
# Lieu de naissance
LIEU_NAISSANCE_PATTERN = regex.compile(
r"Lieu de naissance\s*:\s*(.+?)(?:\n|$)"
)
# Auteurs de prescription dans Trackare
PRESCRIPTION_AUTHOR_PATTERN = regex.compile(
r"(?:Presc\.\s*de\s*Sortie|Normal|Signé|Arrêté|Réalisé)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][a-zéèêëàâäùûüôöîïç]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\-]+)+)"
)
def get_all_name_patterns():
"""Retourne la liste des patterns qui capturent des noms de personnes."""
return [
PATIENT_NAME_PATTERN,
CIVILITE_NAME_PATTERN,
DR_NAME_PATTERN,
REDIGE_PAR_PATTERN,
NOTE_AUTHOR_DATE_PATTERN,
NOTE_AUTHOR_PATTERN,
FOOTER_PATIENT_PATTERN,
CRH_FOOTER_PATIENT_PATTERN,
MEDECIN_TRAITANT_PATTERN,
MEDECIN_COURANT_PATTERN,
MEDECIN_PEC_PATTERN,
IAO_PATTERN,
STAFF_NAME_PATTERN,
DESTINATAIRE_PATTERN,
PRESCRIPTION_AUTHOR_PATTERN,
]

99
src/config.py Normal file
View File

@@ -0,0 +1,99 @@
"""Configuration globale et modèles de données pour le pipeline T2A."""
from __future__ import annotations
from pathlib import Path
from typing import Optional
from pydantic import BaseModel, Field
# --- Chemins ---
BASE_DIR = Path(__file__).resolve().parent.parent
INPUT_DIR = BASE_DIR / "input"
OUTPUT_DIR = BASE_DIR / "output"
ANONYMIZED_DIR = OUTPUT_DIR / "anonymized"
STRUCTURED_DIR = OUTPUT_DIR / "structured"
REPORTS_DIR = OUTPUT_DIR / "reports"
for d in (INPUT_DIR, ANONYMIZED_DIR, STRUCTURED_DIR, REPORTS_DIR):
d.mkdir(parents=True, exist_ok=True)
# --- Configuration anonymisation ---
KEEP_ESTABLISHMENT_NAME = True
NER_MODEL = "Jean-Baptiste/camembert-ner"
NER_CONFIDENCE_THRESHOLD = 0.80
# --- Modèles de données CIM-10 ---
class Sejour(BaseModel):
sexe: Optional[str] = None
age: Optional[int] = None
date_entree: Optional[str] = None
date_sortie: Optional[str] = None
duree_sejour: Optional[int] = None
mode_entree: Optional[str] = None
mode_sortie: Optional[str] = None
imc: Optional[float] = None
poids: Optional[float] = None
taille: Optional[float] = None
class Diagnostic(BaseModel):
texte: str
cim10_suggestion: Optional[str] = None
class ActeCCAM(BaseModel):
texte: str
code_ccam_suggestion: Optional[str] = None
date: Optional[str] = None
class Traitement(BaseModel):
medicament: str
posologie: Optional[str] = None
code_atc: Optional[str] = None
class BiologieCle(BaseModel):
test: str
valeur: Optional[str] = None
anomalie: Optional[bool] = None
class Imagerie(BaseModel):
type: str
conclusion: Optional[str] = None
score: Optional[str] = None
class DossierMedical(BaseModel):
source_file: str = ""
document_type: str = ""
sejour: Sejour = Field(default_factory=Sejour)
diagnostic_principal: Optional[Diagnostic] = None
diagnostics_associes: list[Diagnostic] = Field(default_factory=list)
actes_ccam: list[ActeCCAM] = Field(default_factory=list)
antecedents: list[str] = Field(default_factory=list)
traitements_sortie: list[Traitement] = Field(default_factory=list)
biologie_cle: list[BiologieCle] = Field(default_factory=list)
imagerie: list[Imagerie] = Field(default_factory=list)
complications: list[str] = Field(default_factory=list)
# --- Rapport d'anonymisation ---
class AnonymizationReport(BaseModel):
source_file: str
total_replacements: int = 0
regex_replacements: int = 0
ner_replacements: int = 0
sweep_replacements: int = 0
entities_found: list[dict] = Field(default_factory=list)

View File

View File

@@ -0,0 +1,129 @@
"""Parsing des Comptes Rendus d'Hospitalisation (CRH)."""
from __future__ import annotations
import re
def parse_crh(text: str) -> dict:
"""Parse un CRH et retourne les sections structurées."""
result: dict = {
"type": "crh",
"patient": {},
"sejour": {},
"medecins": [],
"contenu_medical": "",
"sections": {},
}
_extract_patient_info(text, result)
_extract_sejour_info(text, result)
_extract_medecins(text, result)
_extract_medical_content(text, result)
return result
def _extract_patient_info(text: str, result: dict) -> None:
"""Extrait les informations patient du CRH."""
# "MME NARBAIS AUDREY" ou "M. NOM PRENOM"
m = re.search(
r"(?:MME|M\.|MR)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\- ]+)",
text[:2000],
)
if m:
result["patient"]["nom_complet"] = m.group(1).strip()
# Adresse sous le nom patient — capturer les lignes entre le nom et le CP+Ville
addr_match = re.search(
r"(?:MME|M\.|MR|Madame|Monsieur)\s+[A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\s\-]+\n((?:.*\n){1,4}?\d{5}\s+[A-Z][A-Z\s\-]+)",
text[:3000],
)
if addr_match:
result["patient"]["adresse"] = addr_match.group(1).strip()
# "née le DD/MM/YYYY" ou "né le DD/MM/YYYY"
m = re.search(r"n[ée]+\s+le\s+(\d{2}/\d{2}/\d{4})", text)
if m:
result["patient"]["date_naissance"] = m.group(1)
# Sexe depuis le titre
if re.search(r"\bMME\b", text[:2000]):
result["patient"]["sexe"] = "F"
elif re.search(r"\b(?:M\.|MR)\b", text[:2000]):
result["patient"]["sexe"] = "M"
# "Votre patiente" / "Votre patient"
if "patiente" in text[:3000].lower():
result["patient"]["sexe"] = "F"
elif "patient" in text[:3000].lower():
result["patient"].setdefault("sexe", "M")
def _extract_sejour_info(text: str, result: dict) -> None:
"""Extrait les dates et motif de séjour."""
# "du DD/MM/YYYY au DD/MM/YYYY"
m = re.search(
r"du\s+(\d{2}/\d{2}/\d{4})\s+au\s+(\d{2}/\d{2}/\d{4})", text
)
if m:
result["sejour"]["date_entree"] = m.group(1)
result["sejour"]["date_sortie"] = m.group(2)
# "pour le motif suivant:" ou "pour le motif suivant :\n..."
m = re.search(
r"pour\s+le\s+motif\s+suivant\s*[:\s]*\n?(.*?)(?:\n\n|\.\s+[A-Z])",
text,
re.DOTALL,
)
if m:
result["sejour"]["motif"] = m.group(1).strip()
def _extract_medecins(text: str, result: dict) -> None:
"""Extrait les noms de médecins mentionnés."""
# "Dr NOM" ou "DR NOM" ou "Dr. NOM" ou "Docteur NOM" ou "Dr F. NOM"
for m in re.finditer(
r"(?:Dr\.?|DR\.?|Docteur)\s+(?:[A-Z]\.\s+)?([A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\-]+(?:\s+[A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\-]+)?)",
text,
):
name = m.group(1).strip()
if name not in result["medecins"] and len(name) > 2:
result["medecins"].append(name)
def _extract_medical_content(text: str, result: dict) -> None:
"""Extrait le contenu médical principal."""
# Chercher après "Mon cher confrère," et les infos d'hospitalisation
m = re.search(
r"(?:motif\s+suivant\s*[:\s]*\n?)(.*?)(?:Rédigé par|Cordialement|Confraternellement|Dr\s+\w+\s*$)",
text,
re.DOTALL,
)
if m:
result["contenu_medical"] = m.group(1).strip()
else:
# Fallback : prendre tout après "Mon cher confrère"
m = re.search(
r"Mon cher confrère,?\s*\n(.*?)(?:Rédigé par|$)",
text,
re.DOTALL,
)
if m:
result["contenu_medical"] = m.group(1).strip()
# Sections spécifiques
section_patterns = [
("motif_hospitalisation", r"(?:motif\s+(?:d'hospitalisation|suivant))\s*[:\s]*\n?(.*?)(?=\n\s*(?:Antécédents|Histoire|Examen|Au total|Devenir|TTT)|$)"),
("antecedents", r"(?:Antécédents?)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Histoire|Examen|Traitement|Au total|Devenir)|$)"),
("histoire_maladie", r"(?:Histoire de la maladie)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Examen|Biologie|Au total|Devenir)|$)"),
("examen_clinique", r"(?:Examen clinique)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Biologie|Imagerie|Au total|Devenir)|$)"),
("conclusion", r"(?:Au total|Conclusion)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|TTT|Traitement)|$)"),
("traitement_sortie", r"(?:TTT de sortie|Traitement de sortie)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|Rédigé|Cordialement)|$)"),
("devenir", r"(?:Devenir)\s*[:\s]*\n?(.*?)(?=\n\s*(?:TTT|Traitement|Rédigé|Cordialement)|$)"),
]
for key, pattern in section_patterns:
m = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if m:
result["sections"][key] = m.group(1).strip()

View File

@@ -0,0 +1,45 @@
"""Détection du type de document : CRH vs Trackare."""
from __future__ import annotations
def classify(text: str) -> str:
"""Classifie un document extrait en CRH ou Trackare.
Retourne "crh" ou "trackare".
"""
text_lower = text[:3000].lower()
trackare_markers = [
"dossier patient",
"détails des patients",
"détails épisode",
"liste des contacts",
"notes paramédicales",
"signes vitaux",
"traitements médicamenteux",
"observations médicales",
]
trackare_score = sum(1 for m in trackare_markers if m in text_lower)
crh_markers = [
"mon cher confrère",
"cher confrère",
"compte rendu d'hospitalisation",
"compte-rendu",
"service de gastro",
"pôle spécialités",
"votre patient",
]
crh_score = sum(1 for m in crh_markers if m in text_lower)
if trackare_score >= 2:
return "trackare"
if crh_score >= 2:
return "crh"
# Heuristique : Trackare contient des tableaux avec IPP
if "ipp:" in text_lower or "episode no:" in text_lower:
return "trackare"
return "crh"

View File

@@ -0,0 +1,36 @@
"""Extraction de texte et tableaux depuis les PDF via pdfplumber."""
from __future__ import annotations
from pathlib import Path
import pdfplumber
def extract_text(pdf_path: str | Path) -> str:
"""Extrait le texte de toutes les pages d'un PDF."""
pages_text: list[str] = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
pages_text.append(text)
return "\n\n".join(pages_text)
def extract_pages(pdf_path: str | Path) -> list[str]:
"""Extrait le texte page par page."""
pages: list[str] = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
pages.append(page.extract_text() or "")
return pages
def extract_tables(pdf_path: str | Path) -> list[list[list[str | None]]]:
"""Extrait tous les tableaux détectés dans le PDF."""
all_tables: list[list[list[str | None]]] = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
tables = page.extract_tables() or []
all_tables.extend(tables)
return all_tables

View File

@@ -0,0 +1,419 @@
"""Parsing des exports Trackare (dossier patient complet)."""
from __future__ import annotations
import re
def parse_trackare(text: str) -> dict:
"""Parse un export Trackare et retourne les sections structurées."""
result: dict = {
"type": "trackare",
"patient": {},
"sejour": {},
"contacts": [],
"medecins": [],
"urgences": {},
"observations_medicales": [],
"notes_paramedicales": [],
"signes_vitaux": {},
"diagnostics": [],
"traitements": [],
"contenu_medical": "",
}
_extract_patient_info(text, result)
_extract_sejour_info(text, result)
_extract_contacts(text, result)
_extract_medecins(text, result)
_extract_urgences(text, result)
_extract_observations(text, result)
_extract_notes_param(text, result)
_extract_diagnostics(text, result)
_extract_traitements(text, result)
_extract_vitals(text, result)
_build_medical_content(result)
return result
def _extract_patient_info(text: str, result: dict) -> None:
"""Extrait les informations du bloc 'Détails des patients'."""
# Nom de naissance
m = re.search(r"Nom de naissance:\s*(\S+)", text)
if m:
result["patient"]["nom_naissance"] = m.group(1).strip()
# Nom et Prénom
m = re.search(r"Nom et Prénom:\s*(.+?)(?:\s+Date de naissance|\n)", text)
if m:
result["patient"]["nom_prenom"] = m.group(1).strip()
# IPP
m = re.search(r"IPP:\s*(\d+)", text)
if m:
result["patient"]["ipp"] = m.group(1)
# Date de naissance
m = re.search(r"Date de naissance:\s*(\d{2}/\d{2}/\d{4})", text)
if m:
result["patient"]["date_naissance"] = m.group(1)
# Sexe
m = re.search(r"Sexe:\s*(\S+)", text)
if m:
sexe_raw = m.group(1).strip().lower()
result["patient"]["sexe"] = "F" if "fém" in sexe_raw else "M"
# Lieu de naissance
m = re.search(r"Lieu de naissance:\s*(.+?)(?:\n|$)", text)
if m:
result["patient"]["lieu_naissance"] = m.group(1).strip()
# Adresse
m = re.search(r"Adresse:\s*(.+?)(?:\s+Ville de résidence|\n)", text)
if m:
result["patient"]["adresse"] = m.group(1).strip()
# Code postal et ville
m = re.search(r"Code Postal:\s*(\d{5})", text)
if m:
result["patient"]["code_postal"] = m.group(1)
m = re.search(r"Ville de résidence:\s*(.+?)(?:\n|$)", text)
if m:
result["patient"]["ville"] = m.group(1).strip()
# Taille, Poids, IMC (footer)
m = re.search(r"Taille:\s*(\d+)\s*cm", text)
if m:
result["patient"]["taille_cm"] = int(m.group(1))
m = re.search(r"Poids:\s*([\d.]+)\s*kg", text)
if m:
result["patient"]["poids_kg"] = float(m.group(1))
m = re.search(r"IMC:\s*([\d.]+)", text)
if m:
result["patient"]["imc"] = float(m.group(1))
def _extract_sejour_info(text: str, result: dict) -> None:
"""Extrait les détails de l'épisode."""
m = re.search(r"Episode No:\s*(\d+)", text)
if m:
result["sejour"]["episode"] = m.group(1)
m = re.search(r"Date d'admission:\s*(\d{2}/\d{2}/\d{4})", text)
if m:
result["sejour"]["date_entree"] = m.group(1)
m = re.search(r"Heure d'admission:\s*(\d{2}:\d{2})", text)
if m:
result["sejour"]["heure_entree"] = m.group(1)
m = re.search(r"Date de sortie:\s*(\d{2}/\d{2}/\d{4})", text)
if m:
result["sejour"]["date_sortie"] = m.group(1)
m = re.search(r"Heure de sortie:\s*(\d{2}:\d{2})", text)
if m:
result["sejour"]["heure_sortie"] = m.group(1)
m = re.search(r"Localisation:\s*(.+?)(?:\s+Médecin courant|\n)", text)
if m:
result["sejour"]["service"] = m.group(1).strip()
m = re.search(r"Médecin courant:\s*(.+?)(?:\n|$)", text)
if m:
result["sejour"]["medecin_courant"] = m.group(1).strip()
def _extract_contacts(text: str, result: dict) -> None:
"""Extrait la liste des contacts."""
# Bloc "Liste des contacts"
contact_block = re.search(
r"Liste des contacts\n(.*?)(?=Passage aux Urgences|Signes Vitaux|Observations médicales)",
text,
re.DOTALL,
)
if not contact_block:
return
block = contact_block.group(1)
# Chaque ligne de contact contient relation, nom, prénom, tél
for line in block.split("\n"):
line = line.strip()
if not line or line.startswith("Type de contact") or line.startswith("Tél"):
continue
# Chercher les noms et téléphones
tel_match = re.search(r"(\d{2}[.\-\s]\d{2}[.\-\s]\d{2}[.\-\s]\d{2}[.\-\s]\d{2})", line)
if tel_match or re.search(r"(?:Epoux|Époux|Épouse|Conjoint|Père|Mère|Fils|Fille|Frère|Soeur)", line, re.IGNORECASE):
result["contacts"].append(line)
def _extract_medecins(text: str, result: dict) -> None:
"""Extrait les noms de médecins/soignants."""
seen: set[str] = set()
def _add(name: str) -> None:
name = _clean_person_name(name)
if name and len(name) > 2 and name.lower() not in seen:
seen.add(name.lower())
result["medecins"].append(name)
# "DR. Prénom NOM" ou "Dr NOM" ou "Docteur NOM Prénom"
for m in re.finditer(
r"(?:DR\.?|Dr\.?|Docteur)\s+([A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+(?:\s+[A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\-]+){0,2})",
text,
):
_add(m.group(1))
# Auteurs d'observations : "Note d'évolution NOM Prénom DD/MM/YYYY"
# ou multi-ligne "Note IDE Prénom\nNOM DD/MM/YYYY"
for m in re.finditer(
r"(?:Note d'évolution|Note IDE|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+"
r"(?:DR\.?\s+)?"
r"([A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+(?:[\s\n]+[A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+)*?)"
r"\s+\d{2}/\d{2}/\d{4}",
text,
):
_add(m.group(1))
# Médecin de prise en charge / décision médicale
for m in re.finditer(
r"(?:Médecin de (?:la )?(?:prise en charge|décision)\s+médicale)\s+"
r"([A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+(?:\s+[A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+){0,2})",
text,
):
_add(m.group(1))
# IAO NOM Prénom
for m in re.finditer(
r"IAO\s+([A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+(?:\s+[A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+){0,2})",
text,
):
_add(m.group(1))
# Prénom seul sur la ligne avant "DD/MM/YYYY...Note IDE...\nNOM HH:MM"
# Ex: "Argitxu 02/03/2023\nNote IDE ...\nHIRIGOYEN 14:05"
# ou "Stephanie 27/02/2023 TDM fait et à voir\nNote IDE\nCONSTANTIN 08:54"
for m in re.finditer(
r"([A-ZÉÈÊËÀÂ][a-zéèêëàâäùûüôöîïç]+)\s+\d{2}/\d{2}/\d{4}[^\n]*\n\s*Note IDE[^\n]*\n\s*([A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂa-zéèêëàâ\-]+)\s+\d{2}:\d{2}",
text,
):
prenom = m.group(1)
nom = m.group(2)
_add(f"{prenom} {nom}")
# Mots qui ne sont pas des noms de personnes
_NOT_NAMES = {
"non", "pas", "une", "des", "les", "par", "sur", "pour", "dans",
"avec", "sans", "qui", "que", "est", "sont", "date", "heure",
"cholecystectomie", "cholécystectomie", "cholangiographie",
"complication", "vasculaire", "nécessaire", "donc", "note",
"douleurs", "absence", "douleur", "lotissement", "priorité",
"prescriptions", "technique", "alimentaire", "signé", "réalisé",
"selles", "covid", "devenir", "algique", "normal", "regime",
"reprise", "biprofenid", "orale", "gelule", "comprime",
"glyc", "inj", "lipase", "protéines", "ionogramme",
"créatinine", "glucose", "num", "crp", "ta", "bilirubine",
"tp", "tca", "bh", "bs", "sortie", "transfert",
}
def _clean_person_name(raw: str) -> str:
"""Nettoie un nom extrait en supprimant le texte parasite."""
name = re.sub(r"\n+", " ", raw).strip()
parts = name.split()
clean: list[str] = []
for part in parts:
p = part.strip(".-")
if not p:
continue
if p.lower() in _NOT_NAMES:
break
# Un mot-nom : commence par une majuscule
if re.match(r"^[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]", p):
clean.append(p)
else:
break
result = " ".join(clean).strip()
# Rejeter si un seul mot de 1-2 lettres (initiale)
if len(result) <= 2:
return ""
return result
def _extract_urgences(text: str, result: dict) -> None:
"""Extrait les données du passage aux urgences."""
urg_block = re.search(
r"Passage aux Urgences\n(.*?)(?=Signes Vitaux|Observations médicales|Antécédents)",
text,
re.DOTALL,
)
if not urg_block:
return
block = urg_block.group(1)
m = re.search(r"Mode de transport.*?:\s*(.+)", block)
if m:
result["urgences"]["mode_transport"] = m.group(1).strip()
m = re.search(r"Mode d'entrée\s+(.+)", block)
if m:
result["urgences"]["mode_entree"] = m.group(1).strip()
m = re.search(r"Priorité\s+(Priorité \d)", block)
if m:
result["urgences"]["priorite"] = m.group(1)
# Motifs de prise en charge
motifs = re.findall(
r"Motif de prise en charge\s+(.+?)(?=\n(?:Observ\.|Médecin|Date|IAO))",
block,
re.DOTALL,
)
if motifs:
result["urgences"]["motifs"] = [
line.strip()
for motif in motifs
for line in motif.split("\n")
if line.strip()
]
def _extract_observations(text: str, result: dict) -> None:
"""Extrait les observations médicales."""
obs_block = re.search(
r"Observations médicales\n(.*?)(?=Notes paramédicales|Surveillance Psychiatrie|Traitements médicamenteux|$)",
text,
re.DOTALL,
)
if not obs_block:
return
block = obs_block.group(1)
# Découper par type d'observation
entries = re.split(
r"(Note d'évolution|Conclusion Obs\.\s*médicales|Histoire de la maladie)",
block,
)
i = 1
while i < len(entries) - 1:
obs_type = entries[i].strip()
content = entries[i + 1].strip()
# Extraire auteur et date
m = re.match(
r"(?:DR\.?\s+)?([A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+(?:\s+[A-ZÉÈÊËÀÂa-zéèêëàâ\.\-]+)*)\s+(\d{2}/\d{2}/\d{4})\s+(\d{2}:\d{2})\s*(.*)",
content,
re.DOTALL,
)
if m:
result["observations_medicales"].append({
"type": obs_type,
"auteur": m.group(1).strip(),
"date": m.group(2),
"heure": m.group(3),
"contenu": m.group(4).strip(),
})
else:
result["observations_medicales"].append({
"type": obs_type,
"contenu": content,
})
i += 2
def _extract_notes_param(text: str, result: dict) -> None:
"""Extrait les notes paramédicales."""
notes_block = re.search(
r"Notes paramédicales\n(.*?)(?=Traitements médicamenteux|Surveillance|$)",
text,
re.DOTALL,
)
if not notes_block:
return
block = notes_block.group(1)
for m in re.finditer(
r"Note IDE\s+([A-Za-zéèêëàâäùûüôöîïçÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\.\-\s]+?)\s+(\d{2}/\d{2}/\d{4})\s+(\d{2}:\d{2})\s+(.*?)(?=Note IDE|$)",
block,
re.DOTALL,
):
result["notes_paramedicales"].append({
"auteur": m.group(1).strip(),
"date": m.group(2),
"heure": m.group(3),
"contenu": m.group(4).strip(),
})
def _extract_diagnostics(text: str, result: dict) -> None:
"""Extrait les diagnostics codés."""
# "Principal actif CODE DESCRIPTION"
for m in re.finditer(
r"(Principal|Associé|Significatif)\s+(actif|inactif)\s+([A-Z]\d{2}(?:\.\d{1,2})?)\s+(.+?)(?:\s+\[.*?\])?\s+\d{2}/\d{2}/\d{4}",
text,
):
result["diagnostics"].append({
"type": m.group(1),
"statut": m.group(2),
"code_cim10": m.group(3),
"libelle": m.group(4).strip(),
})
def _extract_traitements(text: str, result: dict) -> None:
"""Extrait les traitements médicamenteux."""
ttt_block = re.search(
r"Traitements médicamenteux\n(.*?)$",
text,
re.DOTALL,
)
if not ttt_block:
return
block = ttt_block.group(1)
# Chercher les noms de médicaments (en majuscules)
for m in re.finditer(
r"([A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂ0-9\s\-/%.,'`]+(?:MG|ML|SOL|INJ|CPR|GEL|AMP|POCHE)[A-ZÉÈÊËÀÂ0-9\s\-/%.,'`\(\)\[\]]*)\s+([\d\s]+\s*(?:mg|G|GEL|CPR|AMP|ML)?)\s*[-]\s*(.+?)(?=\n[A-Z]|\Z)",
block,
re.DOTALL,
):
result["traitements"].append({
"medicament": m.group(1).strip(),
"dose": m.group(2).strip(),
"frequence": m.group(3).strip().split("\n")[0],
})
def _extract_vitals(text: str, result: dict) -> None:
"""Extrait les données anthropométriques clés."""
m = re.search(r"Taille \[cm\]\s+([\d.]+)", text)
if m:
result["signes_vitaux"]["taille_cm"] = float(m.group(1))
m = re.search(r"Poids \[kg\]\s+([\d.]+)", text)
if m:
result["signes_vitaux"]["poids_kg"] = float(m.group(1))
m = re.search(r"Indice\s*\n?\s*de masse\s+([\d.]+)", text)
if m:
result["signes_vitaux"]["imc"] = float(m.group(1))
def _build_medical_content(result: dict) -> None:
"""Construit le texte médical complet à partir des observations."""
parts: list[str] = []
if result["urgences"].get("motifs"):
parts.append("Motifs: " + ", ".join(result["urgences"]["motifs"]))
for obs in result["observations_medicales"]:
parts.append(obs.get("contenu", ""))
for note in result["notes_paramedicales"]:
parts.append(note.get("contenu", ""))
result["contenu_medical"] = "\n\n".join(parts)

184
src/main.py Normal file
View File

@@ -0,0 +1,184 @@
"""CLI + orchestrateur du pipeline d'anonymisation et extraction CIM-10."""
from __future__ import annotations
import argparse
import json
import logging
import sys
from pathlib import Path
from .anonymization.anonymizer import Anonymizer
from .config import ANONYMIZED_DIR, REPORTS_DIR, STRUCTURED_DIR, AnonymizationReport, DossierMedical
from .extraction.document_classifier import classify
from .extraction.crh_parser import parse_crh
from .extraction.pdf_extractor import extract_text
from .extraction.trackare_parser import parse_trackare
from .medical.cim10_extractor import extract_medical_info
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
# Flag global pour désactiver edsnlp
_use_edsnlp = True
def process_pdf(pdf_path: Path) -> tuple[str, DossierMedical, AnonymizationReport]:
"""Traite un PDF : extraction → parsing → anonymisation → extraction CIM-10."""
logger.info("Traitement de %s", pdf_path.name)
# 1. Extraction texte
raw_text = extract_text(pdf_path)
logger.info(" Texte extrait : %d caractères", len(raw_text))
# 2. Classification
doc_type = classify(raw_text)
logger.info(" Type de document : %s", doc_type)
# 3. Parsing
if doc_type == "trackare":
parsed = parse_trackare(raw_text)
else:
parsed = parse_crh(raw_text)
# 4. Anonymisation
anonymizer = Anonymizer(parsed_data=parsed)
anonymized_text = anonymizer.anonymize(raw_text)
report = anonymizer.report
report.source_file = pdf_path.name
logger.info(
" Anonymisation : %d remplacements (regex=%d, ner=%d, sweep=%d)",
report.total_replacements,
report.regex_replacements,
report.ner_replacements,
report.sweep_replacements,
)
# 5. Analyse edsnlp (optionnelle)
edsnlp_result = None
if _use_edsnlp:
edsnlp_result = _run_edsnlp(anonymized_text)
# 6. Extraction médicale CIM-10
dossier = extract_medical_info(parsed, anonymized_text, edsnlp_result)
dossier.source_file = pdf_path.name
dossier.document_type = doc_type
logger.info(" DP : %s", dossier.diagnostic_principal)
logger.info(" DAS : %d, Actes : %d", len(dossier.diagnostics_associes), len(dossier.actes_ccam))
return anonymized_text, dossier, report
def _run_edsnlp(text: str):
"""Exécute l'analyse edsnlp avec fallback gracieux."""
try:
from .medical.edsnlp_pipeline import analyze, is_available
if not is_available():
logger.info(" edsnlp non disponible, utilisation du mode regex seul")
return None
result = analyze(text)
logger.info(
" edsnlp : %d CIM-10, %d médicaments, %d dates",
len(result.cim10_entities),
len(result.drug_entities),
len(result.date_entities),
)
return result
except Exception:
logger.warning(" edsnlp : erreur lors de l'analyse, fallback regex", exc_info=True)
return None
def write_outputs(
stem: str,
anonymized_text: str,
dossier: DossierMedical,
report: AnonymizationReport,
) -> None:
"""Écrit les fichiers de sortie."""
# Texte anonymisé
anon_path = ANONYMIZED_DIR / f"{stem}_anonymized.txt"
anon_path.write_text(anonymized_text, encoding="utf-8")
logger.info("%s", anon_path)
# JSON structuré
json_path = STRUCTURED_DIR / f"{stem}_cim10.json"
json_path.write_text(
dossier.model_dump_json(indent=2, exclude_none=True),
encoding="utf-8",
)
logger.info("%s", json_path)
# Rapport d'anonymisation
report_path = REPORTS_DIR / f"{stem}_report.json"
report_path.write_text(
report.model_dump_json(indent=2),
encoding="utf-8",
)
logger.info("%s", report_path)
def main(input_path: str | None = None) -> None:
"""Point d'entrée principal."""
global _use_edsnlp
parser = argparse.ArgumentParser(
description="Anonymisation de documents médicaux PDF et extraction CIM-10",
)
parser.add_argument(
"input",
nargs="?",
default=input_path or "input/",
help="Chemin vers un PDF ou un dossier de PDFs (défaut: input/)",
)
parser.add_argument(
"--no-ner",
action="store_true",
help="Désactiver la phase NER (plus rapide, moins précis)",
)
parser.add_argument(
"--no-edsnlp",
action="store_true",
help="Désactiver l'analyse edsnlp (mode regex seul)",
)
args = parser.parse_args()
if args.no_ner:
# Monkey-patch pour désactiver NER
from .anonymization import ner_anonymizer
ner_anonymizer.extract_person_entities = lambda text: []
if args.no_edsnlp:
_use_edsnlp = False
input_p = Path(args.input)
if input_p.is_file():
pdfs = [input_p]
elif input_p.is_dir():
pdfs = sorted(input_p.glob("*.pdf"))
else:
logger.error("Chemin introuvable : %s", input_p)
sys.exit(1)
if not pdfs:
logger.warning("Aucun PDF trouvé dans %s", input_p)
sys.exit(0)
logger.info("Traitement de %d PDF(s)...", len(pdfs))
for pdf_path in pdfs:
try:
anonymized_text, dossier, report = process_pdf(pdf_path)
stem = pdf_path.stem.replace(" ", "_")
write_outputs(stem, anonymized_text, dossier, report)
except Exception:
logger.exception("Erreur lors du traitement de %s", pdf_path.name)
logger.info("Terminé.")
if __name__ == "__main__":
main()

0
src/medical/__init__.py Normal file
View File

View File

@@ -0,0 +1,606 @@
"""Extraction d'informations médicales structurées pour le codage CIM-10."""
from __future__ import annotations
import re
from datetime import datetime
from typing import Optional
from ..config import (
ActeCCAM,
BiologieCle,
Diagnostic,
DossierMedical,
Imagerie,
Sejour,
Traitement,
)
try:
from .edsnlp_pipeline import EdsnlpResult
except ImportError:
EdsnlpResult = None # type: ignore[assignment,misc]
# Mapping diagnostics fréquents → codes CIM-10
CIM10_MAP: dict[str, str] = {
# Pancréatite
"pancréatite aiguë biliaire": "K85.1",
"pancréatite aigue biliaire": "K85.1",
"pancréatite aiguë lithiasique": "K85.1",
"pancréatite aigue lithiasique": "K85.1",
"pancréatite aiguë": "K85.9",
"pancréatite aigue": "K85.9",
"pancréatite": "K85.9",
# Lithiases biliaires
"lithiase cholédoque": "K80.5",
"lithiase du cholédoque": "K80.5",
"calcul des canaux biliaires": "K80.5",
"lithiase vésiculaire": "K80.2",
"lithiases vésiculaires": "K80.2",
"vésicule lithiasique": "K80.2",
"colique hépatique": "K80.2",
# Cholécystite
"cholécystite aiguë": "K81.0",
"cholecystite aigue": "K81.0",
"angiocholite": "K83.0",
# Obésité
"obésité": "E66.0",
"obesite": "E66.0",
"surpoids": "E66.0",
# Réactions médicamenteuses
"éruption médicamenteuse": "L27.0",
"eruption medicamenteuse": "L27.0",
"éruption cutanée médicamenteuse": "L27.0",
"toxidermie": "L27.0",
"réaction au tramadol": "L27.0",
"allergie médicamenteuse": "T88.7",
# Douleur
"douleur abdominale": "R10.4",
"douleur hypochondre droit": "R10.1",
# Ictère
"ictère": "R17",
"jaunisse": "R17",
# HTA
"hypertension artérielle": "I10",
"hta": "I10",
# Diabète
"diabète type 2": "E11.9",
"diabète de type 2": "E11.9",
"diabète type 1": "E10.9",
}
# Mapping actes → codes CCAM
CCAM_MAP: dict[str, str] = {
"cholécystectomie": "HMFC004",
"cholecystectomie": "HMFC004",
"cholécystectomie par cœlioscopie": "HMFC004",
"cholecystectomie par coelioscopie": "HMFC004",
"cholangiographie": "HHHE002",
"cholangiographie peropératoire": "HHHE002",
"cpre": "HHHE002",
"sphinctérotomie endoscopique": "HHHE003",
"scanner abdominal": "ZCQK002",
"tdm abdominal": "ZCQK002",
"échographie abdominale": "ZCQJ001",
"echo abdominale": "ZCQJ001",
"irm abdominale": "ZCQN001",
}
def extract_medical_info(
parsed_data: dict,
anonymized_text: str,
edsnlp_result: Optional[EdsnlpResult] = None,
) -> DossierMedical:
"""Extrait les informations médicales structurées depuis les données parsées et le texte."""
dossier = DossierMedical()
dossier.document_type = parsed_data.get("type", "")
_extract_sejour(parsed_data, dossier)
_extract_diagnostics(parsed_data, anonymized_text, dossier, edsnlp_result)
_extract_actes(anonymized_text, dossier)
_extract_antecedents(anonymized_text, dossier)
_extract_traitements(parsed_data, anonymized_text, dossier, edsnlp_result)
_extract_biologie(anonymized_text, dossier)
_extract_imagerie(anonymized_text, dossier)
_extract_complications(anonymized_text, dossier, edsnlp_result)
return dossier
def _extract_sejour(parsed: dict, dossier: DossierMedical) -> None:
"""Extrait les informations de séjour."""
patient = parsed.get("patient", {})
sejour_data = parsed.get("sejour", {})
dossier.sejour = Sejour(
sexe=patient.get("sexe"),
date_entree=sejour_data.get("date_entree"),
date_sortie=sejour_data.get("date_sortie"),
mode_entree=parsed.get("urgences", {}).get("mode_entree"),
)
# Calcul de l'âge à partir de la date de naissance et de la date d'entrée
dob = patient.get("date_naissance")
date_entree = sejour_data.get("date_entree")
if dob and date_entree:
try:
dob_dt = datetime.strptime(dob, "%d/%m/%Y")
entree_dt = datetime.strptime(date_entree, "%d/%m/%Y")
age = entree_dt.year - dob_dt.year
if (entree_dt.month, entree_dt.day) < (dob_dt.month, dob_dt.day):
age -= 1
dossier.sejour.age = age
except ValueError:
pass
# Durée de séjour
if sejour_data.get("date_entree") and sejour_data.get("date_sortie"):
try:
d1 = datetime.strptime(sejour_data["date_entree"], "%d/%m/%Y")
d2 = datetime.strptime(sejour_data["date_sortie"], "%d/%m/%Y")
dossier.sejour.duree_sejour = (d2 - d1).days
except ValueError:
pass
# IMC, poids, taille
vitals = parsed.get("signes_vitaux", {})
if vitals.get("imc"):
dossier.sejour.imc = vitals["imc"]
elif patient.get("imc"):
dossier.sejour.imc = patient["imc"]
if vitals.get("poids_kg"):
dossier.sejour.poids = vitals["poids_kg"]
elif patient.get("poids_kg"):
dossier.sejour.poids = patient["poids_kg"]
if vitals.get("taille_cm"):
dossier.sejour.taille = vitals["taille_cm"]
elif patient.get("taille_cm"):
dossier.sejour.taille = patient["taille_cm"]
def _extract_diagnostics(
parsed: dict,
text: str,
dossier: DossierMedical,
edsnlp_result: Optional[EdsnlpResult] = None,
) -> None:
"""Extrait le diagnostic principal et les diagnostics associés."""
text_lower = text.lower()
# Diagnostics codés depuis Trackare (prioritaires)
for diag in parsed.get("diagnostics", []):
d = Diagnostic(
texte=diag.get("libelle", ""),
cim10_suggestion=diag.get("code_cim10"),
)
if diag.get("type", "").lower() == "principal":
dossier.diagnostic_principal = d
else:
dossier.diagnostics_associes.append(d)
# Extraction du texte "Au total:" ou conclusion
conclusion = ""
m = re.search(
r"Au total\s*[:]?\s*(.*?)(?=\n\s*(?:Devenir|TTT|Sortie|$))",
text,
re.DOTALL | re.IGNORECASE,
)
if m:
conclusion = m.group(1).strip()
# Enrichissement via edsnlp (CIM-10)
edsnlp_codes: dict[str, str] = {}
if edsnlp_result:
for ent in edsnlp_result.cim10_entities:
if not ent.negation and not ent.hypothese:
edsnlp_codes[ent.code] = ent.texte
# Si pas de DP depuis le codage, chercher dans le texte
if not dossier.diagnostic_principal:
# D'abord essayer le fallback regex (plus précis pour les patterns spécifiques)
dp = _find_diagnostic_principal(text_lower, conclusion)
if dp:
dossier.diagnostic_principal = dp
elif edsnlp_codes:
# Utiliser la première entité CIM-10 edsnlp comme DP
code, texte = next(iter(edsnlp_codes.items()))
dossier.diagnostic_principal = Diagnostic(
texte=texte.capitalize(), cim10_suggestion=code,
)
# Diagnostics associés depuis le texte (regex)
das = _find_diagnostics_associes(text_lower, conclusion, dossier)
dossier.diagnostics_associes.extend(das)
# Enrichissement DAS depuis edsnlp
if edsnlp_result:
existing_codes = set()
if dossier.diagnostic_principal:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
existing_codes.add(d.cim10_suggestion)
for ent in edsnlp_result.cim10_entities:
if ent.negation or ent.hypothese:
continue
if ent.code not in existing_codes:
dossier.diagnostics_associes.append(Diagnostic(
texte=ent.texte.capitalize(),
cim10_suggestion=ent.code,
))
existing_codes.add(ent.code)
def _find_diagnostic_principal(text_lower: str, conclusion: str) -> Diagnostic | None:
"""Trouve le diagnostic principal dans le texte."""
conclusion_lower = conclusion.lower()
# Chercher dans la conclusion d'abord
for terme, code in CIM10_MAP.items():
if terme in conclusion_lower:
return Diagnostic(texte=terme.capitalize(), cim10_suggestion=code)
# Patterns courants pour le DP
dp_patterns = [
r"pancréatite\s+aigu[eë]\s+(?:d'origine\s+)?lithiasique",
r"pancréatite\s+aigu[eë]\s+biliaire",
r"pancréatite\s+aigu[eë]",
]
for pat in dp_patterns:
if re.search(pat, text_lower):
matched = re.search(pat, text_lower).group(0)
code = _lookup_cim10(matched)
return Diagnostic(texte=matched.capitalize(), cim10_suggestion=code)
return None
def _find_diagnostics_associes(
text_lower: str, conclusion: str, dossier: DossierMedical
) -> list[Diagnostic]:
"""Trouve les diagnostics associés."""
das: list[Diagnostic] = []
existing_codes = set()
if dossier.diagnostic_principal:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
existing_codes.add(d.cim10_suggestion)
# Lithiase cholédoque
if re.search(r"lithiase\s+(?:du\s+)?(?:bas\s+)?cholédoque", text_lower):
if "K80.5" not in existing_codes:
das.append(Diagnostic(texte="Lithiase du cholédoque", cim10_suggestion="K80.5"))
existing_codes.add("K80.5")
# Éruption médicamenteuse
if re.search(r"éruption\s+cutanée|eruption\s+cutanée|toxidermie|réaction\s+au\s+tramadol", text_lower):
if "L27.0" not in existing_codes:
das.append(Diagnostic(texte="Éruption cutanée médicamenteuse", cim10_suggestion="L27.0"))
existing_codes.add("L27.0")
# Obésité (IMC >= 30)
if re.search(r"imc\s*[:=]?\s*(\d{2,3}[.,]\d+)", text_lower):
m = re.search(r"imc\s*[:=]?\s*(\d{2,3}[.,]\d+)", text_lower)
if m:
imc_val = float(m.group(1).replace(",", "."))
if imc_val >= 30 and "E66.0" not in existing_codes:
das.append(Diagnostic(texte=f"Obésité (IMC {imc_val})", cim10_suggestion="E66.0"))
existing_codes.add("E66.0")
# Lithiases vésiculaires
if re.search(r"vésicule\s+lithiasique|lithiases?\s+vésiculaire", text_lower):
if "K80.2" not in existing_codes:
das.append(Diagnostic(texte="Lithiase vésiculaire", cim10_suggestion="K80.2"))
existing_codes.add("K80.2")
return das
def _extract_actes(text: str, dossier: DossierMedical) -> None:
"""Extrait les actes CCAM."""
text_lower = text.lower()
# Cholécystectomie par cœlioscopie
if re.search(r"chol[ée]cystectomie\s+par\s+c[oœ][ea]lioscopie", text_lower):
date = _find_act_date(text, r"chol[ée]cystectomie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholécystectomie par cœlioscopie",
code_ccam_suggestion="HMFC004",
date=date,
))
elif re.search(r"chol[ée]cystectomie|cholecystectomie", text_lower):
date = _find_act_date(text, r"chol[ée]cystectomie|cholecystectomie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholécystectomie",
code_ccam_suggestion="HMFC004",
date=date,
))
# Cholangiographie
if re.search(r"cholangiographie", text_lower):
date = _find_act_date(text, r"cholangiographie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholangiographie peropératoire",
code_ccam_suggestion="HHHE002",
date=date,
))
# TDM
if re.search(r"(?:tdm|scanner|tomodensitométrie)", text_lower):
date = _find_act_date(text, r"(?:TDM|scanner)")
dossier.actes_ccam.append(ActeCCAM(
texte="TDM abdominal",
code_ccam_suggestion="ZCQK002",
date=date,
))
def _extract_antecedents(text: str, dossier: DossierMedical) -> None:
"""Extrait les antécédents."""
m = re.search(
r"Antécédents?\s*[:]?\s*\n?(.*?)(?=\n\s*(?:Traitements?\s*[:]|Allergie|Histoire de la maladie|Examen clinique|\n\n))",
text,
re.DOTALL | re.IGNORECASE,
)
if m:
block = m.group(1).strip()
for line in block.split("\n"):
line = line.strip().lstrip("- •")
# Filtrer les lignes non pertinentes
if (line and len(line) > 5 and line != "0"
and not re.match(r"^\d", line)
and "Item de" not in line
and "surveillance" not in line.lower()
and "Température" not in line
and "Signes Vitaux" not in line
and "Pouls" not in line
and "Type de note" not in line
and "Aucune donnée" not in line
and "renseignée" not in line
and "habitudes de vie" not in line
and "Systolique" not in line
and "Diastolique" not in line
and "Saturation" not in line):
dossier.antecedents.append(line)
def _extract_traitements(
parsed: dict,
text: str,
dossier: DossierMedical,
edsnlp_result: Optional[EdsnlpResult] = None,
) -> None:
"""Extrait les traitements de sortie."""
# Construire un index des médicaments edsnlp avec codes ATC
drug_atc: dict[str, str] = {}
if edsnlp_result:
for drug in edsnlp_result.drug_entities:
if not drug.negation and drug.code_atc:
drug_atc[drug.texte.lower()] = drug.code_atc
# Depuis le texte — section "TTT de sortie" (limiter à quelques lignes)
m = re.search(
r"(?:TTT|Traitement)\s+de\s+sortie\s*[:]?\s*\n?(.*?)(?=\n\s*(?:Devenir|Rédigé|Cordialement|Patient:|Episode|Le \d{2}/\d{2}|\n\n)|$)",
text,
re.DOTALL | re.IGNORECASE,
)
if m:
block = m.group(1).strip()
lines = block.split("\n")
for line in lines[:10]: # Limiter à 10 lignes max
line = line.strip().lstrip("- •")
if not line or len(line) <= 2:
continue
# Ignorer les footers et lignes non-médicament
if re.match(r"^(Patient|Episode|Le \d|Page|V\d)", line):
break
med = line
poso = None
# Séparer médicament et posologie
poso_match = re.search(r"\s+(si besoin|matin|soir|midi|\d+\s*(?:mg|cp|gel).*)", line, re.IGNORECASE)
if poso_match:
med = line[:poso_match.start()].strip()
poso = poso_match.group(1).strip()
# Chercher le code ATC via edsnlp
code_atc = _match_drug_atc(med, drug_atc)
dossier.traitements_sortie.append(Traitement(
medicament=med,
posologie=poso,
code_atc=code_atc,
))
# Si rien trouvé, chercher les prescriptions "Presc. de Sortie"
if not dossier.traitements_sortie:
for m_presc in re.finditer(
r"([A-ZÉÈÊËÀÂ][A-ZÉÈÊËÀÂ0-9\s\-/%.]+?)(?:\s+\d+\s*(?:mg|G|CPR|GEL))?.*?Presc\.\s*de\s*Sortie",
text,
):
med = m_presc.group(1).strip()
if len(med) > 3:
code_atc = _match_drug_atc(med, drug_atc)
dossier.traitements_sortie.append(Traitement(
medicament=med, code_atc=code_atc,
))
def _match_drug_atc(med_name: str, drug_atc: dict[str, str]) -> Optional[str]:
"""Cherche un code ATC correspondant au médicament dans les résultats edsnlp."""
if not drug_atc:
return None
med_lower = med_name.lower().strip()
# Correspondance exacte
if med_lower in drug_atc:
return drug_atc[med_lower]
# Correspondance partielle : le nom edsnlp est contenu dans le nom du médicament
for drug_text, atc in drug_atc.items():
if drug_text in med_lower or med_lower in drug_text:
return atc
return None
def _extract_biologie(text: str, dossier: DossierMedical) -> None:
"""Extrait les résultats biologiques clés."""
bio_patterns = [
(r"[Ll]ipas[ée]mie\s*(?:[àa=:])?\s*(\d+)", "Lipasémie", None),
(r"CRP\s*[=:à]?\s*(\d+(?:[.,]\d+)?)", "CRP", None),
(r"ASAT\s*[=:à]?\s*([\d.,]+)\s*(?:N|U/L)?", "ASAT", None),
(r"ALAT\s*[=:à]?\s*([\d.,]+)\s*(?:N|U/L)?", "ALAT", None),
(r"GGT\s*[=:à]?\s*(\d+)\s*(?:U/L)?", "GGT", None),
(r"PAL\s*[=:à]?\s*(\d+)\s*(?:U/L)?", "PAL", None),
(r"[Bb]ilirubine\s+(?:totale\s+)?[àa=:]\s*(\d+)\s*(?:µmol/L)?", "Bilirubine totale", None),
(r"troponine\s+(négative|positive|normale)", "Troponine", None),
]
for pattern, test_name, _ in bio_patterns:
m = re.search(pattern, text)
if m:
value = m.group(1)
anomalie = _is_abnormal(test_name, value)
dossier.biologie_cle.append(BiologieCle(
test=test_name,
valeur=value,
anomalie=anomalie,
))
def _extract_imagerie(text: str, dossier: DossierMedical) -> None:
"""Extrait les résultats d'imagerie."""
# TDM
tdm_match = re.search(
r"(?:TDM|[Ss]canner|tomodensitométrie).*?(?:retrouve|montre|objective)\s*[:]?\s*(.*?)(?=\n\s*(?:Cholécystectomie|Au total|Devenir|\n\n))",
text,
re.DOTALL | re.IGNORECASE,
)
if tdm_match:
conclusion = tdm_match.group(1).strip()
# Score de Balthazar
score = None
m = re.search(r"[Bb]althazar\s*(?:[àa=:])?\s*(\d+|[A-E])", text)
if m:
score = f"Balthazar {m.group(1)}"
dossier.imagerie.append(Imagerie(
type="TDM abdominal",
conclusion=conclusion[:500],
score=score,
))
# Échographie
echo_match = re.search(
r"(?:[ée]cho(?:graphie)?)\s*.*?(?:retrouve|montre|objective)\s*[:]?\s*(.*?)(?=\n\n)",
text,
re.DOTALL | re.IGNORECASE,
)
if echo_match:
dossier.imagerie.append(Imagerie(
type="Échographie",
conclusion=echo_match.group(1).strip()[:500],
))
def _extract_complications(
text: str,
dossier: DossierMedical,
edsnlp_result: Optional[EdsnlpResult] = None,
) -> None:
"""Extrait les complications mentionnées."""
text_lower = text.lower()
# Termes de négation détectés par edsnlp pour chaque entité
edsnlp_negated_terms: set[str] = set()
if edsnlp_result:
for ent in edsnlp_result.cim10_entities:
if ent.negation:
edsnlp_negated_terms.add(ent.texte.lower())
complication_terms = [
"éruption cutanée",
"eruption cutanée",
"fièvre",
"infection",
"hémorragie",
"hématome",
"abcès",
"fistule",
"iléus",
"occlusion",
]
for term in complication_terms:
if term in text_lower:
# Vérifier la négation via edsnlp d'abord
if edsnlp_result and _is_negated_by_edsnlp(term, edsnlp_negated_terms):
continue
# Fallback regex pour la négation
pattern = rf"(?:pas de|sans|absence de|aucun[e]?)\s+{re.escape(term)}"
if not re.search(pattern, text_lower):
dossier.complications.append(term.capitalize())
def _is_negated_by_edsnlp(term: str, negated_terms: set[str]) -> bool:
"""Vérifie si un terme est nié selon edsnlp."""
term_lower = term.lower()
for neg_term in negated_terms:
if term_lower in neg_term or neg_term in term_lower:
return True
return False
def _find_act_date(text: str, act_pattern: str) -> str | None:
"""Trouve la date associée à un acte."""
# Chercher "acte le DD/MM" ou "acte le DD/MM/YYYY"
m = re.search(
rf"{act_pattern}.*?(?:le\s+)?(\d{{2}}/\d{{2}}(?:/\d{{4}})?)",
text,
re.IGNORECASE,
)
if m:
return m.group(1)
# Chercher dans la ligne d'observation juste avant
m = re.search(
rf"(\d{{2}}/\d{{2}}/\d{{4}}).*?{act_pattern}",
text,
re.IGNORECASE,
)
if m:
return m.group(1)
return None
def _lookup_cim10(text: str) -> str | None:
"""Cherche un code CIM-10 pour un texte donné."""
text_lower = text.lower().strip()
for terme, code in CIM10_MAP.items():
if terme in text_lower:
return code
return None
def _is_abnormal(test: str, value: str) -> bool | None:
"""Détermine si un résultat biologique est anormal."""
try:
val = float(value.replace(",", "."))
except (ValueError, AttributeError):
if value.lower() in ("négative", "negative", "normale", "normal"):
return False
if value.lower() in ("positive", "positif", "élevée", "elevee"):
return True
return None
normals: dict[str, tuple[float, float]] = {
"Lipasémie": (0, 60),
"CRP": (0, 5),
"ASAT": (0, 40),
"ALAT": (0, 40),
"GGT": (0, 60),
"PAL": (0, 150),
"Bilirubine totale": (0, 17),
}
if test in normals:
lo, hi = normals[test]
return val > hi or val < lo
return None

View File

@@ -0,0 +1,140 @@
"""Pipeline edsnlp pour l'extraction médicale (CIM-10, médicaments, négation)."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
_nlp = None
_available = None
@dataclass
class CIM10Entity:
texte: str
code: str
negation: bool = False
hypothese: bool = False
@dataclass
class DrugEntity:
texte: str
code_atc: Optional[str] = None
negation: bool = False
@dataclass
class DateEntity:
texte: str
value: Optional[str] = None
@dataclass
class EdsnlpResult:
cim10_entities: list[CIM10Entity] = field(default_factory=list)
drug_entities: list[DrugEntity] = field(default_factory=list)
date_entities: list[DateEntity] = field(default_factory=list)
def is_available() -> bool:
"""Vérifie si edsnlp est installé et utilisable."""
global _available
if _available is not None:
return _available
try:
import edsnlp # noqa: F401
_available = True
except ImportError:
_available = False
return _available
def get_pipeline():
"""Retourne le pipeline edsnlp (singleton lazy-loaded)."""
global _nlp
if _nlp is not None:
return _nlp
if not is_available():
raise RuntimeError("edsnlp n'est pas installé")
import edsnlp
logger.info("Initialisation du pipeline edsnlp...")
nlp = edsnlp.blank("eds")
nlp.add_pipe("eds.normalizer")
nlp.add_pipe("eds.sentences")
nlp.add_pipe("eds.cim10", config=dict(attr="NORM", term_matcher="simstring"))
nlp.add_pipe("eds.drugs", config=dict(attr="NORM", term_matcher="exact"))
nlp.add_pipe("eds.negation")
nlp.add_pipe("eds.hypothesis")
nlp.add_pipe("eds.dates")
_nlp = nlp
logger.info("Pipeline edsnlp initialisé avec succès")
return _nlp
def analyze(text: str) -> EdsnlpResult:
"""Analyse un texte médical avec edsnlp.
Retourne les entités CIM-10, médicaments et dates détectées.
"""
result = EdsnlpResult()
if not is_available():
return result
try:
nlp = get_pipeline()
doc = nlp(text)
except Exception:
logger.exception("Erreur lors de l'analyse edsnlp")
return result
for ent in doc.ents:
negation = getattr(ent._, "negation", False) or False
hypothese = getattr(ent._, "hypothesis", False) or False
if ent.label_ == "cim10":
code = ent.kb_id_ or ""
if code:
result.cim10_entities.append(CIM10Entity(
texte=ent.text,
code=code,
negation=negation,
hypothese=hypothese,
))
elif ent.label_ == "drug":
code_atc = ent.kb_id_ or None
result.drug_entities.append(DrugEntity(
texte=ent.text,
code_atc=code_atc,
negation=negation,
))
# Dates
for span in doc.spans.get("dates", []):
date_value = None
if hasattr(span._, "date"):
date_obj = span._.date
if date_obj is not None:
date_value = str(date_obj)
result.date_entities.append(DateEntity(
texte=span.text,
value=date_value,
))
return result
def reset():
"""Réinitialise le pipeline (utile pour les tests)."""
global _nlp, _available
_nlp = None
_available = None