"""Extraction des résultats biologiques depuis le texte médical.""" from __future__ import annotations import re import unicodedata import logging import numpy as np from ..config import BiologieCle, DossierMedical, load_lab_value_sanity from .bio_normals import BIO_NORMALS, _is_abnormal logger = logging.getLogger(__name__) def _norm_key(s: str) -> str: """Normalise une clé (minuscules, sans accents) pour index YAML.""" s = (s or "").strip().lower() s = unicodedata.normalize("NFKD", s) s = "".join(ch for ch in s if not unicodedata.combining(ch)) return re.sub(r"\s+", " ", s) def _parse_float_and_token(raw: str) -> tuple[float | None, str | None]: """Parse un float et renvoie aussi le token numérique normalisé (avec '.').""" if raw is None: return None, None s = str(raw).strip() m = re.search(r"(-?\d+(?:[\.,]\d+)?)", s) if not m: return None, None token = m.group(1).replace(",", ".") try: return float(token), token except ValueError: return None, None def _sanitize_bio_value(test_name: str, raw_value: str, sanity_cfg: dict) -> tuple[str, float, str, str | None] | None: """Applique des garde-fous anti-artefacts (OCR/PDF). Retour: (token, value_float, quality, reason) ou None si non parsable. quality: ok | suspect | discarded """ val, token = _parse_float_and_token(raw_value) if val is None or token is None: return None key = _norm_key(test_name) tests_cfg = (sanity_cfg or {}).get("tests") or {} cfg = tests_cfg.get(key) or {} hard_min = cfg.get("hard_min") hard_max = cfg.get("hard_max") if hard_min is not None and val < float(hard_min): return token, val, "discarded", f"Valeur hors bornes plausibles (<{hard_min})" if hard_max is not None and val > float(hard_max): return token, val, "discarded", f"Valeur hors bornes plausibles (>{hard_max})" quality = "ok" reason: str | None = None suspect_cfg = cfg.get("suspect") or {} single_digit_over = suspect_cfg.get("single_digit_over") if single_digit_over is not None: # Ex: potassium '8' au lieu de '4.8' (décimale perdue) if re.fullmatch(r"\d", str(raw_value).strip()) and val >= float(single_digit_over): quality = "suspect" reason = f"Valeur à 1 chiffre (possible décimale perdue) : vérifier dans le CR" return token, val, quality, reason def _extract_biologie_faiss(text: str, dossier: DossierMedical) -> None: """Extraction biologique via recherche vectorielle FAISS pour les synonymes. Complète les regex pour les termes non prévus ou les variations complexes. """ from .rag_index import get_index from .rag_search import _get_embed_model res = get_index(kind="bio") if not res: return faiss_index, metadata = res try: model = _get_embed_model() except Exception as e: logger.warning("FAISS Bio: modèle d'embedding indisponible (%s)", e) return # 1. Découpage du texte en segments glissants (phrases ou groupes de mots) lines = [l.strip() for l in text.split("\n") if len(l.strip()) > 5] if not lines: return segments = [] for line in lines: if len(line.split()) > 15: words = line.split() for i in range(0, len(words), 10): segments.append(" ".join(words[i:i+12])) else: segments.append(line) if not segments: return # 2. Encodage des segments try: embeddings = model.encode(segments, normalize_embeddings=True, show_progress_bar=False) embeddings = np.array(embeddings, dtype=np.float32) except Exception as e: logger.warning("FAISS Bio: erreur encodage segments (%s)", e) return # 3. Recherche dans l'index bio MIN_SCORE_BIO = 0.82 scores, indices = faiss_index.search(embeddings, 1) sanity_cfg = load_lab_value_sanity() seen_faiss = set() for i, (score, idx) in enumerate(zip(scores, indices)): s = float(score[0]) if s < MIN_SCORE_BIO or idx[0] < 0: continue meta = metadata[idx[0]] concept_name = meta.get("code") synonym_matched = meta.get("extrait") segment = segments[i] # 4. Capture de la valeur numérique val_match = re.search(r"(?:[=àa:]\s*)?(\d+(?:[.,]\d+)?)\s*(?:[a-zA-Z/%/µ/mm3/G/L/U/I]+)?", segment) if not val_match: continue raw_value = val_match.group(1) entry_key = (concept_name, raw_value) if entry_key in seen_faiss: continue seen_faiss.add(entry_key) sanitized = _sanitize_bio_value(concept_name, raw_value, sanity_cfg) if sanitized: token, val_num, quality, reason = sanitized anomalie = _is_abnormal(concept_name, token, dossier.sejour.age if dossier.sejour else None, dossier.sejour.sexe if dossier.sejour else None) is_dup = any(b.test == concept_name and b.valeur == raw_value for b in dossier.biologie_cle) if is_dup: continue dossier.biologie_cle.append( BiologieCle( test=concept_name, valeur=raw_value, valeur_num=val_num, anomalie=anomalie, quality=quality, discard_reason=reason, ) ) logger.debug("FAISS Bio match: %s (%s) = %s dans '%s'", concept_name, synonym_matched, raw_value, segment) def _extract_biologie(text: str, dossier: DossierMedical) -> None: """Extrait des résultats biologiques clés. Notes: - Supporte des aliases (TGO/TGP, Hb, Na/K…) - Capte plusieurs occurrences (utile pour valider/infirmer des diagnostics) - Reste volontairement *simple* (regex sur texte extrait) : si une valeur est uniquement dans un tableau PDF mal extrait, elle peut manquer. """ # (pattern, test_name) bio_patterns: list[tuple[str, str]] = [ (r"[Ll]ipas[ée]mie\s*(?:[àa=:])?\s*(\d+)\s*(?:UI/L|U/L)?", "Lipasémie"), (r"\bCRP\b\s*[=:àa]?\s*(\d+(?:[.,]\d+)?)\s*(?:mg/[Ll])?", "CRP"), (r"(?:\bASAT\b|\bTGO\b)\s*[=:àa]?\s*([\d.,]+)\s*(?:N|U(?:I)?/L)?", "ASAT"), (r"(?:\bALAT\b|\bTGP\b)\s*[=:àa]?\s*([\d.,]+)\s*(?:N|U(?:I)?/L)?", "ALAT"), (r"\bGGT\b\s*[=:àa]?\s*(\d+)\s*(?:U(?:I)?/L)?", "GGT"), (r"\bPAL\b\s*[=:àa]?\s*(\d+)\s*(?:U(?:I)?/L)?", "PAL"), (r"[Bb]ilirubine\s+(?:totale\s+)?[àa=:]\s*(\d+(?:[.,]\d+)?)\s*(?:µmol/L|mg/dL)?", "Bilirubine totale"), # Ionogramme / électrolytes (r"(?:[Ss]odium|[Nn]atr[ée]mie|(? max_per_test: break anomalie = _is_abnormal(test_name, raw_value, _patient_age, _patient_sexe) dossier.biologie_cle.append( BiologieCle( test=test_name, valeur=raw_value, valeur_num=None, anomalie=anomalie, quality="ok", discard_reason=None, ) ) continue sanitized = _sanitize_bio_value(test_name, raw_value, sanity_cfg) if sanitized is None: continue token, val_num, quality, reason = sanitized if quality == "suspect" and not keep_suspect: quality = "discarded" reason = reason or "Valeur suspecte (policy keep_suspect=false)" # Déduplication sur la valeur normalisée key = (test_name, token) if key in seen: continue seen.add(key) counts[test_name] = counts.get(test_name, 0) + 1 if counts[test_name] > max_per_test: break if quality == "discarded": # On garde la trace pour audit, sans polluer les règles qualité. dossier.biologie_discarded.append( { "test": test_name, "raw": raw_value, "valeur": token, "valeur_num": val_num, "reason": reason, } ) if drop_out_of_range: continue anomalie = _is_abnormal(test_name, token, _patient_age, _patient_sexe) dossier.biologie_cle.append( BiologieCle( test=test_name, valeur=token, valeur_num=val_num, anomalie=anomalie, quality=quality, discard_reason=reason, ) ) # --- Complément par recherche vectorielle (Synonymes) --- _extract_biologie_faiss(text, dossier)