"""Pipeline edsnlp pour l'extraction médicale (CIM-10, médicaments, négation).""" from __future__ import annotations import logging from dataclasses import dataclass, field from typing import Optional logger = logging.getLogger(__name__) _nlp = None _available = None @dataclass class CIM10Entity: texte: str code: str negation: bool = False hypothese: bool = False @dataclass class DrugEntity: texte: str code_atc: Optional[str] = None negation: bool = False @dataclass class DateEntity: texte: str value: Optional[str] = None @dataclass class EdsnlpResult: cim10_entities: list[CIM10Entity] = field(default_factory=list) drug_entities: list[DrugEntity] = field(default_factory=list) date_entities: list[DateEntity] = field(default_factory=list) def is_available() -> bool: """Vérifie si edsnlp est installé et utilisable.""" global _available if _available is not None: return _available try: import edsnlp # noqa: F401 _available = True except ImportError: _available = False return _available def get_pipeline(): """Retourne le pipeline edsnlp (singleton lazy-loaded).""" global _nlp if _nlp is not None: return _nlp if not is_available(): raise RuntimeError("edsnlp n'est pas installé") import edsnlp logger.info("Initialisation du pipeline edsnlp...") nlp = edsnlp.blank("eds") nlp.add_pipe("eds.normalizer") nlp.add_pipe("eds.sentences") nlp.add_pipe("eds.cim10", config=dict(attr="NORM", term_matcher="simstring")) nlp.add_pipe("eds.drugs", config=dict(attr="NORM", term_matcher="exact")) nlp.add_pipe("eds.negation") nlp.add_pipe("eds.hypothesis") nlp.add_pipe("eds.dates") _nlp = nlp logger.info("Pipeline edsnlp initialisé avec succès") return _nlp def analyze(text: str) -> EdsnlpResult: """Analyse un texte médical avec edsnlp. Essaie le serveur distant d'abord, puis fallback local. Retourne les entités CIM-10, médicaments et dates détectées. """ result = EdsnlpResult() # Essayer le serveur distant d'abord try: from .remote_embed import ner_remote remote = ner_remote(text) if remote is not None and "error" not in remote: for ent in remote.get("cim10", []): result.cim10_entities.append(CIM10Entity( texte=ent["text"], code=ent["code"], negation=ent.get("negation", False), hypothese=ent.get("hypothesis", False), )) for ent in remote.get("drugs", []): result.drug_entities.append(DrugEntity( texte=ent["text"], code_atc=ent.get("code_atc"), negation=ent.get("negation", False), )) for ent in remote.get("dates", []): result.date_entities.append(DateEntity( texte=ent["text"], value=ent.get("value"), )) logger.debug("edsnlp distant: %d CIM-10, %d drugs, %.0fms", len(result.cim10_entities), len(result.drug_entities), remote.get("time_ms", 0)) return result except ImportError: pass if not is_available(): return result try: nlp = get_pipeline() doc = nlp(text) except Exception: logger.exception("Erreur lors de l'analyse edsnlp") return result for ent in doc.ents: negation = getattr(ent._, "negation", False) or False hypothese = getattr(ent._, "hypothesis", False) or False if ent.label_ == "cim10": code = ent.kb_id_ or "" if code: result.cim10_entities.append(CIM10Entity( texte=ent.text, code=code, negation=negation, hypothese=hypothese, )) elif ent.label_ == "drug": code_atc = ent.kb_id_ or None result.drug_entities.append(DrugEntity( texte=ent.text, code_atc=code_atc, negation=negation, )) # Dates for span in doc.spans.get("dates", []): date_value = None if hasattr(span._, "date"): date_obj = span._.date if date_obj is not None: date_value = str(date_obj) result.date_entities.append(DateEntity( texte=span.text, value=date_value, )) return result def reset(): """Réinitialise le pipeline (utile pour les tests).""" global _nlp, _available _nlp = None _available = None