diff --git a/src/anonymization/anonymizer.py b/src/anonymization/anonymizer.py index a8540db..69f1b23 100644 --- a/src/anonymization/anonymizer.py +++ b/src/anonymization/anonymizer.py @@ -34,6 +34,24 @@ MEDICAL_TERMS_WHITELIST = { "mestinon", "augmentin", "doliprane", "spasfon", "lasilix", "lovenox", "kardegic", "inexium", "mopral", "gaviscon", "loxen", "perfalgan", "profenid", "voltarene", "toplexil", + # Médicaments courants qui ressemblent à des noms propres + "ondansetron", "lipruzet", "liptruzet", "telmisartan", + "phloroglucinol", "oxynormoro", "oxycodone", "oxynorm", + "enoxaparine", "pantoprazole", "omeprazole", "esomeprazole", + "amoxicilline", "metronidazole", "ceftriaxone", "ciprofloxacine", + "ketoprofene", "ibuprofene", "naproxene", "diclofenac", + "atorvastatine", "rosuvastatine", "simvastatine", + "metformine", "insuline", "levofloxacine", + "furosemide", "ramipril", "amlodipine", "bisoprolol", + "alprazolam", "bromazepam", "zopiclone", "zolpidem", + "hydroxyzine", "cetirizine", "desloratadine", + "dexamethasone", "prednisolone", "prednisone", + "heparine", "rivaroxaban", "apixaban", "dabigatran", + # Termes Trackare / systèmes hospitaliers + "regime", "repas", "autonome", "complete", "directe", + "toilette", "refection", "desinfection", "environnement", + "sommeil", "elimination", "surv", "contention", "isolement", + "buccale", "cutanee", "sous-cutanee", "injectable", "service", "médecin", "medecin", "docteur", "chirurgie", "gastro", "entérologie", "enterologie", "oncologie", "hépato", "hepato", "digestif", "digestive", @@ -44,8 +62,7 @@ MEDICAL_TERMS_WHITELIST = { "secrétariat", "infirmier", "infirmière", "unité", "hospitalisation", "urgences", "coordonnateur", "fédération", "federation", - "navarre", "institut", "cancérologie", - "bordeaux", "strasbourg", "reims", "limoges", "clermont", "ferrand", + "institut", "cancérologie", "palais", } @@ -124,6 +141,9 @@ class Anonymizer: text, n = self._replace_phone(text) count += n + text, n = self._replace_pattern(text, patterns.PHONE_INTL_PATTERN, "telephone") + count += n + text, n = self._replace_pattern( text, patterns.EMAIL_PATTERN, "email", skip_establishment_check=True, @@ -151,6 +171,20 @@ class Anonymizer: ) count += n + # Trackare / BACTERIO : zones structurées patient + # IMPORTANT : doit s'exécuter AVANT _replace_structured_names() car + # les patterns génériques (DR_NAME, PATIENT_NAME) consommeraient le + # préfixe "Nom" des lignes "Nom de naissance : EICHE", rendant + # NOM_NAISSANCE_TRACKARE_PATTERN inopérant. + text, n = self._replace_pattern(text, patterns.BACTERIO_NOM_HEADER_PATTERN, "patient") + count += n + text, n = self._replace_pattern(text, patterns.PERSONNE_PREVENIR_PATTERN, "contact") + count += n + text, n = self._replace_pattern(text, patterns.NOM_NAISSANCE_TRACKARE_PATTERN, "patient") + count += n + text, n = self._replace_pattern(text, patterns.PRENOM_NAISSANCE_PATTERN, "patient") + count += n + # Noms structurés text, n = self._replace_structured_names(text) count += n @@ -200,8 +234,28 @@ class Anonymizer: if self._is_establishment(word): continue - # Vérifier si déjà anonymisé (contient des crochets) - if "[" in word and "]" in word: + # Vérifier si déjà anonymisé ou fragment de tag existant + # Couvre : "[TAG]" complet, "[TAG" fragment, "TAG]" fragment, + # mots internes aux tags (MEDECIN, PATIENT, SOIGNANT, etc.) + if "[" in word or "]" in word: + continue + if word.upper().rstrip("_0123456789") in { + "PATIENT", "MEDECIN", "SOIGNANT", "CONTACT", "PERSONNE", + "IPP", "EPISODE", "TEL", "EMAIL", "ADRESSE", "DATE_NAISS", + "LIEU_NAISS", "FINESS", "CODE_BARRE", "NIR", "RPPS", + "IDENTIFIANT", + }: + continue + + # Vérifier si l'entité se trouve à l'intérieur d'un tag déjà posé + # (ex: le NER détecte "Martin" dans "[MEDECIN_3] Martin [ADRESSE_1]" + # alors que "Martin" est entre deux tags et fait partie du texte) + ctx_start = max(0, ent["start"] - 20) + ctx_end = min(len(text), ent["end"] + 20) + context = text[ctx_start:ctx_end] + # Si on est à l'intérieur d'un tag (entre [ et ] sans interruption) + before = text[ctx_start:ent["start"]] + if "[" in before and "]" not in before.split("[")[-1]: continue pseudo = self.registry.get_replacement(word) @@ -270,9 +324,18 @@ class Anonymizer: @staticmethod def _fix_brackets(text: str) -> str: - """Corrige les crochets malformés après anonymisation.""" - # Doubles crochets fermants + """Corrige les crochets malformés après anonymisation. + + Traite : [[ ouvert, ]] fermé, ]N] orphelin, tags collés ][, mot[TAG]. + """ + # Doubles crochets ouvrants : [[TAG → [TAG + text = re.sub(r"\[\[", "[", text) + # Chiffres/underscore orphelins entre ] et ] : [TAG]_2] ou [TAG]10] → [TAG] + text = re.sub(r"(\[[A-Z_]+\d*\])[_\d]+\]", r"\1", text) + # Doubles crochets fermants résiduels : ]] → ] text = re.sub(r"\]\]", "]", text) + # Tags collés dos-à-dos : ][ → ] [ + text = re.sub(r"\]\[", "] [", text) # Ajouter un espace avant un tag collé à un mot text = re.sub(r"([A-Za-zéèêëàâäùûüôöîïç])\[", r"\1 [", text) return text @@ -443,11 +506,11 @@ class Anonymizer: return text, count def _replace_inline_addresses(self, text: str) -> tuple[str, int]: - """Capture les adresses inline (MAISON xxx, QUARTIER xxx, LOTISSEMENT xxx).""" + """Capture les adresses inline (MAISON xxx, QUARTIER xxx, IMP xxx, etc.).""" count = 0 - # Pattern : MAISON/QUARTIER/LOTISSEMENT suivi de mots (noms propres de lieux) + # Pattern : MAISON/QUARTIER/LOTISSEMENT/IMP/IMPASSE/ALLEE suivi de mots inline_addr = re.compile( - r"((?:MAISON|QUARTIER|LOTISSEMENT|RESIDENCE|HAMEAU)\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s]+?)(?=\n|$|Dr|\d{5}|Chef|médical|coordonnateur)", + r"((?:MAISON|QUARTIER|LOTISSEMENT|RESIDENCE|HAMEAU|IMP|IMPASSE|ALLEE|ALLÉE)\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s]+?)(?=\n|$|Dr|\d{5}|Chef|médical|coordonnateur)", re.IGNORECASE, ) for m in reversed(list(inline_addr.finditer(text))): @@ -540,7 +603,101 @@ class Anonymizer: text = text[:m.start(1)] + pseudo + text[m.end(1):] count += 1 - self.report.regex_replacements += count + # Destinataires de courrier (Madame/DR suivi de nom) + for m in reversed(list(patterns.DESTINATAIRE_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Contacts familiaux (Concubine/Conjoint/Époux NOM Prénom) + for m in reversed(list(patterns.CONTACT_RELATION_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "contact") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Auteurs de prescriptions Trackare (Signé/Arrêté/Réalisé NOM Prénom) + for m in reversed(list(patterns.PRESCRIPTION_AUTHOR_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Trackare : noms avec timestamp (HH:MM NOM ou NOM HH:MM) + for m in reversed(list(patterns.TRACKARE_TIMESTAMP_NAME_PATTERN.finditer(text))): + name = m.group(2).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(2)] + pseudo + text[m.end(2):] + count += 1 + + for m in reversed(list(patterns.TRACKARE_NAME_TIMESTAMP_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Métadonnées document : "mod. le DD/MM/YY HH:MM par NOM Prénom, statut" + for m in reversed(list(patterns.MOD_PAR_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Aides opératoires : "Aide(s) Prénom NOM" + for m in reversed(list(patterns.AIDE_NAME_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Signature fin de document : "Prénom NOM" avant footer patient + for m in reversed(list(patterns.SIGNATURE_LINE_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # "validé électroniquement par NOM Prénom le DD/MM/YYYY" + for m in reversed(list(patterns.VALIDE_PAR_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # "NOM Prénom (interne)" — signature d'interne + for m in reversed(list(patterns.INTERNE_SIGNATURE_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Trackare : "fois NOM" — nom après administration + for m in reversed(list(patterns.FOIS_NAME_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + + # Trackare : "maladie NOM HH:MM" + for m in reversed(list(patterns.MALADIE_NAME_PATTERN.finditer(text))): + name = m.group(1).strip() + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "soignant") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 + return text, count def _replace_footer(self, text: str) -> tuple[str, int]: @@ -548,9 +705,10 @@ class Anonymizer: count = 0 for m in reversed(list(patterns.FOOTER_PATIENT_PATTERN.finditer(text))): name = m.group(1).strip() - pseudo = self.registry.register(name, "patient") - text = text[:m.start(1)] + pseudo + text[m.end(1):] - count += 1 + if len(name) >= 3 and not self._is_whitelisted(name): + pseudo = self.registry.register(name, "patient") + text = text[:m.start(1)] + pseudo + text[m.end(1):] + count += 1 return text, count def _register_address(self, addr: str) -> None: diff --git a/src/anonymization/entity_registry.py b/src/anonymization/entity_registry.py index 30cbda1..0913dc6 100644 --- a/src/anonymization/entity_registry.py +++ b/src/anonymization/entity_registry.py @@ -6,6 +6,7 @@ import re from collections import defaultdict # Mots français trop courants pour être des sous-parties fiables +# NB : PAS de prénoms ici — les prénoms sont des PHI et doivent être anonymisés FRENCH_STOP_WORDS = { # Articles, déterminants, prépositions "les", "des", "une", "uns", "aux", "ces", "ses", "mes", @@ -15,8 +16,6 @@ FRENCH_STOP_WORDS = { "peu", "très", "trop", "tout", "tous", "rien", "fait", "été", "sont", "ont", "qui", "que", "dont", "peut", "cette", "être", "avoir", "faire", "dire", "aussi", - # Prénoms courts très courants (trop de faux positifs) - "jean", "paul", "marc", "anne", "marie", } @@ -43,27 +42,35 @@ class EntityRegistry: return self._mappings[key] # Vérifier si une entité existante est un sur-ensemble ou sous-ensemble - existing = self._find_matching_entity(key) + existing = self._find_matching_entity(key, category) if existing is not None: self._mappings[key] = existing - return existing + pseudo = existing + else: + self._counters[category] += 1 + count = self._counters[category] + pseudo = self._generate_pseudo(category, count) + self._mappings[key] = pseudo + self._category_map[key] = category - self._counters[category] += 1 - count = self._counters[category] - - pseudo = self._generate_pseudo(category, count) - self._mappings[key] = pseudo - self._category_map[key] = category - - # Enregistrer les sous-parties du nom (seuil >= 5 chars, pas de stop words) - parts = key.split() + # Enregistrer les sous-parties du nom (seuil >= 4 chars, pas de stop words) + # Les sous-parties sont ajoutées à _mappings (avec le pseudo du parent) + # ET à _subparts (pour que le sweep les traite conditionnellement : + # remplacement uniquement si capitalisé, pour éviter la sur-anonymisation). + # On split sur espaces ET tirets pour capturer les noms composés (ex: LASSERRE-QUILLACQ). + # NB: cette étape s'exécute AUSSI quand _find_matching_entity matche, + # pour capturer les fragments du nom élargi (ex: "QUILLACQ" dans "RITZ-QUILLACQ"). + import re as _re + parts = _re.split(r"[\s\-]+", key) if len(parts) > 1: for part in parts: - if (len(part) >= 5 + if (len(part) >= 4 and part not in self._whitelist and part not in FRENCH_STOP_WORDS and part not in self._mappings): self._subparts.add(part) + self._mappings[part] = pseudo + self._category_map[part] = category return pseudo @@ -88,19 +95,38 @@ class EntityRegistry: """Retourne l'ensemble des sous-parties enregistrées.""" return set(self._subparts) - def _find_matching_entity(self, key: str) -> str | None: + def _find_matching_entity(self, key: str, category: str) -> str | None: """Cherche une entité existante qui est un sur- ou sous-ensemble de key. + Ne matche que si la catégorie est compatible (même catégorie ou + catégories de personnes compatibles entre elles). + Retourne le pseudo de l'entité existante si trouvée, None sinon. """ + # Catégories de personnes compatibles entre elles + person_categories = {"patient", "medecin", "soignant", "contact", "personne"} + key_parts = set(key.split()) + if not key_parts: + return None + for existing_key, pseudo in self._mappings.items(): + # Vérifier la compatibilité de catégorie + existing_cat = self._category_map.get(existing_key) + if existing_cat is not None: + if existing_cat != category: + # Autoriser le match entre catégories de personnes + if not (existing_cat in person_categories and category in person_categories): + continue + existing_parts = set(existing_key.split()) + if not existing_parts: + continue # key est contenu dans une entité existante - if key_parts and key_parts.issubset(existing_parts): + if key_parts.issubset(existing_parts): return pseudo # Une entité existante est contenue dans key - if existing_parts and existing_parts.issubset(key_parts) and len(existing_parts) > 0: + if existing_parts.issubset(key_parts): return pseudo return None diff --git a/src/anonymization/ner_anonymizer.py b/src/anonymization/ner_anonymizer.py index 0cf8793..e42bd61 100644 --- a/src/anonymization/ner_anonymizer.py +++ b/src/anonymization/ner_anonymizer.py @@ -86,10 +86,25 @@ def _split_text(text: str, max_chars: int = 500) -> list[str]: def _deduplicate(entities: list[dict]) -> list[dict]: - """Déduplique les entités par mot (garde le score le plus élevé).""" - seen: dict[str, dict] = {} + """Déduplique les entités par position (supprime les chevauchements). + + Garde toutes les occurrences d'un même mot à des positions différentes, + mais supprime les entités qui se chevauchent à la même position + (garde celle avec le meilleur score). + """ + if not entities: + return [] + + # Trier par position de début + entities.sort(key=lambda e: e["start"]) + + result: list[dict] = [] for ent in entities: - key = ent["word"].lower() - if key not in seen or ent["score"] > seen[key]["score"]: - seen[key] = ent - return list(seen.values()) + if result and ent["start"] < result[-1]["end"]: + # Chevauchement : garder celle avec le meilleur score + if ent["score"] > result[-1]["score"]: + result[-1] = ent + else: + result.append(ent) + + return result diff --git a/src/anonymization/regex_patterns.py b/src/anonymization/regex_patterns.py index e88fc6f..618cce3 100644 --- a/src/anonymization/regex_patterns.py +++ b/src/anonymization/regex_patterns.py @@ -42,6 +42,11 @@ PHONE_PATTERN = regex.compile( r"\b(0[1-9])[\s.\-]?(\d{2})[\s.\-]?(\d{2})[\s.\-]?(\d{2})[\s.\-]?(\d{2})\b" ) +# Téléphones internationaux FR : +33(0)XXXXXXXXX ou +33 X XX XX XX XX +PHONE_INTL_PATTERN = regex.compile( + r"\+33\s*\(?\s*0?\s*\)?\s*\d[\s.\-]?\d{2}[\s.\-]?\d{2}[\s.\-]?\d{2}[\s.\-]?\d{2}" +) + # Emails (y compris @ch-cotebasque.fr qui contiennent des initiales de soignants) EMAIL_PATTERN = regex.compile( r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b" @@ -54,9 +59,9 @@ FAX_PATTERN = regex.compile( # --- Adresses --- -# Code postal + ville (uniquement les ALL_CAPS après 5 digits) +# Code postal + ville (vrais CP français : 01-95, 971-976, 984-989) CP_VILLE_PATTERN = regex.compile( - r"\b(\d{5})\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\s\-]{2,})\b" + r"\b((?:0[1-9]|[1-8]\d|9[0-5]|97[1-6]|98[4-9])\d{3})\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\s\-]{2,})\b" ) # Lignes d'adresse avec mots-clés (y compris noms propres basques/locaux) @@ -73,8 +78,9 @@ ADDRESS_BLOCK_PATTERN = regex.compile( # --- Dates de naissance --- # Toutes les variantes : "né(e) le", "née le", "né le", "Né(e) le", "Date de naissance:" +# Accepte les séparateurs / et - (DD/MM/YYYY ou DD-MM-YYYY) DATE_NAISSANCE_PATTERN = regex.compile( - r"(?:[Nn][ée]+(?:\(e\))?\s+le\s+|Date de naissance\s*[:=]?\s*)(\d{2}/\d{2}/\d{4})" + r"(?:[Nn][ée]+(?:\(e\))?\s+le\s+|Date de naissance\s*[:=]?\s*)(\d{2}[/\-]\d{2}[/\-]\d{4})" ) # --- Noms structurés --- @@ -101,8 +107,13 @@ CIVILITE_NAME_PATTERN = regex.compile( ) # "DR." / "Dr" / "Docteur" suivi du nom du médecin +# Chaque mot doit commencer par une majuscule (nom propre). +# Le séparateur de mots est espace/tab/dash/apostrophe (PAS newline) +# pour éviter de capturer "FARBOS\nDr" comme un seul nom. +# Negative lookahead empêche de capturer "Dr" comme partie du nom +# (cas multi-docteurs sur même ligne : "Dr LEYSSENE David Dr BENARD Yohan"). DR_NAME_PATTERN = regex.compile( - r"(?:DR\.?|Dr\.?|Docteur)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+){0,2})" + r"(?:DR\.?|Dr\.?|Docteur)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-']+(?:[ \t\-'](?!DR\.?\s|Dr\.?\s|Docteur\s)[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\.\-']+){0,2})" ) # "Rédigé par" en pied de page CRH @@ -117,7 +128,7 @@ DESTINATAIRE_PATTERN = regex.compile( # Noms d'auteurs dans Trackare : "Note d'évolution Prénom NOM DD/MM/YYYY" NOTE_AUTHOR_DATE_PATTERN = regex.compile( - r"(?:Note d'évolution|Note IDE|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+" + r"(?:Note d'évolution|Note IDE|Note Diététicienne|Note (?:Kiné|Ergo|Psy|AS|Ortho)|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+" r"(?:DR\.?\s+)?" r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\.\-]+)+)" r"\s+\d{2}/\d{2}/\d{4}", @@ -126,7 +137,7 @@ NOTE_AUTHOR_DATE_PATTERN = regex.compile( # Noms d'auteurs Trackare sans date immédiate : "Note IDE Prénom NOM texte..." # Le nom est toujours un Prénom (Capitalized) suivi d'un NOM (ALL CAPS) NOTE_AUTHOR_PATTERN = regex.compile( - r"(?:Note d'évolution|Note IDE|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+" + r"(?:Note d'évolution|Note IDE|Note Diététicienne|Note (?:Kiné|Ergo|Psy|AS|Ortho)|Histoire de la maladie|Conclusion Obs\.?\s*médicales?)\s+" r"(?:DR\.?\s+)?" r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][a-zéèêëàâäùûüôöîïç]+\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\-]{2,})" ) @@ -199,9 +210,99 @@ CONSULT_ADRESSE_PATTERN = regex.compile( r"Adresse\s*:\s*(.+?)(?:\n|$)" ) +# BACTERIO : nom patient en en-tête (NOM Prénom sur ligne avant "Nom usuel :") +# Tolère une ligne vide entre le nom et "Nom usuel" (artefact OCR) +BACTERIO_NOM_HEADER_PATTERN = regex.compile( + r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{2,}\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)*)\s*\n(?:\s*\n)?\s*Nom\s+(?:usuel|de\s+naissance|utilisé)", +) + +# --- Trackare : zones structurées patient --- + +# "Personne à prévenir NOM PRENOM 06 XX XX XX XX" +PERSONNE_PREVENIR_PATTERN = regex.compile( + r"Personne\s+[àa]\s+pr[ée]venir\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\s\-]+?)(?:\s+\d{2}[\s.]|\s*$|\s*\n)", +) + +# Contacts : "Concubine/Conjoint/Époux/Épouse NOM Prénom" +CONTACT_RELATION_PATTERN = regex.compile( + r"(?:Concubin[e]?|Conjoint[e]?|[ÉE]poux|[ÉE]pouse|Compagnon|Compagne|Fils|Fille|Père|Mère|Frère|Sœur|Soeur)\s+" + r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{2,}(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)*)", +) + +# "Nom de naissance : DUPONT" ou "Nom utilisé : DUPONT" +NOM_NAISSANCE_TRACKARE_PATTERN = regex.compile( + r"(?:Nom\s+(?:de\s+naissance|utilisé|usuel))\s*:?\s*([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\s\-]+?)(?:\s*$|\s*\n)", + regex.MULTILINE, +) + +# "Prénom de naissance : MARIE" ou "Prénom utilisé : MARIE" ou "1er prénom de naissance: MARIE" +PRENOM_NAISSANCE_PATTERN = regex.compile( + r"(?:(?:1er\s+)?[Pp]rénom\s+(?:de\s+naissance|utilisé))\s*:?\s*([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\-]+)", +) + # Auteurs de prescription dans Trackare +# Matche "Normal VERGEZ", "Signé DUPONT", "Réalisé POMMIES Héloise", +# "Révisé/Traité ETCHECHOURY", "variable. ETCHECHOURY" PRESCRIPTION_AUTHOR_PATTERN = regex.compile( - r"(?:Presc\.\s*de\s*Sortie|Normal|Signé|Arrêté|Réalisé)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][a-zéèêëàâäùûüôöîïç]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\-]+)+)" + r"(?:Presc\.\s*de\s*Sortie|Normal|Signé|Arrêté|Réalisé|Révisé/Traité|Révisé|Traité|variable\.)\s+(?:—\s+)?" + r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{2,}(?:[\-][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç]*)?(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)?)" + r"(?=\s*\n|\s+\d{2}[/:]|\s+\[|\s+le\b|\s+DR\.|\s*$)", +) + +# Trackare : noms de soignants avec timestamp (HH:MM NOM ou NOM HH:MM) +TRACKARE_TIMESTAMP_NAME_PATTERN = regex.compile( + r"\b(\d{2}:\d{2})\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{3,}(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)?)\s*\n", +) + +TRACKARE_NAME_TIMESTAMP_PATTERN = regex.compile( + r"\n([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{3,}(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)?)\s+(\d{2}:\d{2})\s", +) + +# --- Métadonnées document : "mod. le DD/MM/YY HH:MM par NOM Prénom" --- +MOD_PAR_PATTERN = regex.compile( + r"(?:mod\.\s+le\s+\d{2}/\d{2}/\d{2,4}\s+\d{2}:\d{2}\s+par|par)\s+" + r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{2,}(?:[\-][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç]*)?(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)?)" + r"\s*,\s*statut", +) + +# --- CRO / documents : "Aide(s) Prénom NOM" --- +AIDE_NAME_PATTERN = regex.compile( + r"Aide\(?s?\)?\s+" + r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç]+\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\-]+)" + r"(?:\s*,|\s*$|\s*\n)", +) + +# --- Signature fin de document : "Prénom NOM" seul sur une ligne --- +# Matche une ligne isolée avec Prénom NOM (au moins 1 prénom + 1 nom en majuscules) +# Précédé par du contenu médical (pas un titre de section) +SIGNATURE_LINE_PATTERN = regex.compile( + r"\n([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç]+\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ\-]{2,})\s*\n" + r"(?=Patient\(e\)|$|\Z)", +) + +# --- "validé électroniquement par NOM Prénom le DD/MM/YYYY" --- +VALIDE_PAR_PATTERN = regex.compile( + r"validé\s+(?:électroniquement\s+)?par\s+" + r"([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{2,}(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)+)" + r"\s+le\s+\d{2}/\d{2}/\d{4}", +) + +# --- "NOM Prénom (interne)" — signature d'interne fin de compte-rendu --- +INTERNE_SIGNATURE_PATTERN = regex.compile( + r"\n([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{2,}\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇa-zéèêëàâäùûüôöîïç\-]+)" + r"\s+\(interne\)", +) + +# --- Trackare : "fois NOM" — nom après administration médicament --- +FOIS_NAME_PATTERN = regex.compile( + r"fois\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{3,}(?:[\-][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç]*)?)" + r"(?=\s*\n|\s+\d|\s*$)", +) + +# --- Trackare : "maladie NOM HH:MM" — nom dans contexte prescription --- +MALADIE_NAME_PATTERN = regex.compile( + r"maladie\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ]{3,}(?:[\-][A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç]*)?)" + r"\s+\d{2}:\d{2}", ) diff --git a/src/extraction/document_splitter.py b/src/extraction/document_splitter.py index 0f82fff..2a54169 100644 --- a/src/extraction/document_splitter.py +++ b/src/extraction/document_splitter.py @@ -131,7 +131,8 @@ def _split_crh(text: str) -> list[str]: def _dedup_chunks(chunks: list[str], threshold: float = 0.85) -> list[str]: """Supprime les chunks quasi-identiques (copies pour destinataires multiples). - Compare les 500 premiers caractères de chaque paire. + Compare les caractères 200-1000 de chaque paire (en sautant l'en-tête + patient qui est identique entre séjours différents du même patient). Si le ratio de similarité > threshold, le doublon est supprimé. """ if len(chunks) <= 1: @@ -144,11 +145,14 @@ def _dedup_chunks(chunks: list[str], threshold: float = 0.85) -> list[str]: for j in range(i + 1, len(chunks)): if j in duplicates: continue - ratio = SequenceMatcher( - None, - chunks[i][:500], - chunks[j][:500], - ).ratio() + # Comparer le corps du document (après l'en-tête patient) + sample_i = chunks[i][200:1000] + sample_j = chunks[j][200:1000] + # Si un chunk est trop court, comparer ce qui est disponible + if len(sample_i) < 50 or len(sample_j) < 50: + sample_i = chunks[i] + sample_j = chunks[j] + ratio = SequenceMatcher(None, sample_i, sample_j).ratio() if ratio > threshold: duplicates.add(j) logger.info(" CRH chunk %d doublon de %d (ratio=%.2f), supprimé", j + 1, i + 1, ratio) diff --git a/src/extraction/text_cleaner.py b/src/extraction/text_cleaner.py index 017a3b3..cf6582b 100644 --- a/src/extraction/text_cleaner.py +++ b/src/extraction/text_cleaner.py @@ -20,12 +20,28 @@ def clean_extracted_text(text: str) -> str: def _remove_single_char_lines(text: str) -> str: - """Supprime les lignes isolées de 1-2 caractères (artefacts OCR de sidebar).""" - # Garder les lignes qui sont des chiffres isolés (listes numérotées) - # ou des indicateurs médicaux (-, +, /) + """Supprime les lignes isolées de 1-2 caractères (artefacts OCR de sidebar). + + Préserve les abréviations médicales courantes (T, PA, FC, O2, etc.), + les chiffres, et les indicateurs (+, -, /). + """ + # Tokens médicaux de 1-2 chars à préserver + _medical_short = { + "t", "o", "g", "pa", "fc", "ta", "fr", "sp", "o2", + "iv", "im", "sc", "cp", "ml", "mg", "kg", "cl", "dl", + "rx", "vt", "qt", "tp", "hb", "gb", "gr", "vs", "ph", + "na", "ca", "fe", "mg", "cl", "cr", "bi", "ni", "co", + } + + def _keep_line(m: re.Match) -> str: + content = m.group(1).strip().lower() + if content in _medical_short: + return m.group(0) # Préserver la ligne + return "" + return re.sub( r"^(?![0-9\-\+/])(.{1,2})$\n?", - "", + _keep_line, text, flags=re.MULTILINE, ) diff --git a/tests/test_anonymization.py b/tests/test_anonymization.py index dae0b1e..1ac8e0f 100644 --- a/tests/test_anonymization.py +++ b/tests/test_anonymization.py @@ -4,6 +4,7 @@ import pytest from src.anonymization.entity_registry import EntityRegistry from src.anonymization.regex_patterns import ( + BACTERIO_NOM_HEADER_PATTERN, CONSULT_ADRESSE_PATTERN, CRH_FOOTER_IPP_EPISODE, CRH_FOOTER_PATIENT_PATTERN, @@ -15,10 +16,14 @@ from src.anonymization.regex_patterns import ( EPISODE_PATTERN, FOOTER_PATIENT_PATTERN, IPP_PATTERN, + NOM_NAISSANCE_TRACKARE_PATTERN, NOTE_AUTHOR_PATTERN, N_IPP_PATTERN, PAR_NOM_PATTERN, + PERSONNE_PREVENIR_PATTERN, + PHONE_INTL_PATTERN, PHONE_PATTERN, + PRENOM_NAISSANCE_PATTERN, RPPS_PATTERN, VENUE_PATTERN, ) @@ -211,18 +216,31 @@ class TestStopWordsAndSubparts: """Vérifie que les stop words et sous-parties courtes ne sont pas enregistrés.""" def test_stop_word_not_registered_as_subpart(self): - """'jean' (stop word) ne doit pas être enregistré en sous-partie.""" + """Les vrais stop words (sans, dans, avec) ne doivent pas être des sous-parties.""" reg = EntityRegistry() - reg.register("Jean Martin", "medecin") - entities = reg.get_all_entities() - assert "jean" not in entities - assert "martin" not in entities # < 5 chars → exclu aussi + reg.register("Sans Martin", "medecin") + assert not reg.is_subpart("sans") + + def test_prenoms_are_subparts(self): + """Les prénoms (jean, paul, marie) sont des PHI et doivent être des sous-parties.""" + reg = EntityRegistry() + reg.register("Jean Dupont", "patient") + assert reg.is_subpart("jean") + assert reg.is_subpart("dupont") + + def test_short_parts_excluded(self): + """Les sous-parties < 4 chars sont exclues (trop de faux positifs).""" + reg = EntityRegistry() + reg.register("Ré Dupont", "patient") + assert not reg.is_subpart("ré") + assert reg.is_subpart("dupont") def test_long_subpart_registered(self): - """Les sous-parties >= 5 chars qui ne sont pas des stop words sont enregistrées.""" + """Les sous-parties >= 4 chars qui ne sont pas des stop words sont enregistrées.""" reg = EntityRegistry() reg.register("Jean Audemar", "medecin") assert reg.is_subpart("audemar") + assert reg.is_subpart("jean") def test_sans_not_anonymized_when_dr_sans(self): """'sans' ne doit pas être remplacé quand un médecin s'appelle 'Dr Sans'.""" @@ -296,6 +314,60 @@ class TestNewPHIPatterns: assert m is not None assert "15 rue des Lilas" in m.group(1) + def test_date_naissance_with_dashes(self): + """Date de naissance : DD-MM-YYYY (format Trackare).""" + m = DATE_NAISSANCE_PATTERN.search("Date de naissance : 21-01-1948") + assert m is not None + assert m.group(1) == "21-01-1948" + + def test_phone_intl_pattern(self): + """Téléphone international +33(0)XXXXXXXXX.""" + m = PHONE_INTL_PATTERN.search("☎ +33(0)156125400") + assert m is not None + + def test_phone_intl_with_spaces(self): + m = PHONE_INTL_PATTERN.search("+33 1 56 12 54 00") + assert m is not None + + def test_personne_prevenir_pattern(self): + m = PERSONNE_PREVENIR_PATTERN.search("Personne à prévenir EICHE 06 27 56 38") + assert m is not None + assert "EICHE" in m.group(1) + + def test_nom_naissance_trackare(self): + m = NOM_NAISSANCE_TRACKARE_PATTERN.search("Nom de naissance : EICHE") + assert m is not None + assert m.group(1).strip() == "EICHE" + + def test_nom_utilise_trackare(self): + m = NOM_NAISSANCE_TRACKARE_PATTERN.search("Nom utilisé : DUPONT") + assert m is not None + assert m.group(1).strip() == "DUPONT" + + def test_prenom_naissance_pattern(self): + m = PRENOM_NAISSANCE_PATTERN.search("Prénom de naissance : MARIE") + assert m is not None + assert m.group(1) == "MARIE" + + def test_prenom_1er_pattern(self): + m = PRENOM_NAISSANCE_PATTERN.search("1er prénom de naissance: MARIE") + assert m is not None + assert m.group(1) == "MARIE" + + def test_bacterio_nom_header_pattern(self): + """En-tête BACTERIO : NOM Prénom avant 'Nom usuel :'.""" + text = "Compte renduComplet\nURTIZVEREA Marie\nNom usuel : EICHE URGENCES" + m = BACTERIO_NOM_HEADER_PATTERN.search(text) + assert m is not None + assert m.group(1) == "URTIZVEREA Marie" + + def test_bacterio_nom_header_nom_de_naissance(self): + """En-tête BACTERIO : NOM Prénom avant 'Nom de naissance :'.""" + text = "DUPONT Jean-Pierre\nNom de naissance : MARTIN" + m = BACTERIO_NOM_HEADER_PATTERN.search(text) + assert m is not None + assert m.group(1) == "DUPONT Jean-Pierre" + # --- P2-C : Cohérence pseudonymes --- @@ -316,12 +388,52 @@ class TestEntityMatching: p2 = reg.register("MARTIN", "medecin") assert p1 == p2 + def test_no_cross_category_match_via_subset(self): + """Un patient et une adresse ne matchent PAS par sous-ensemble.""" + reg = EntityRegistry() + p1 = reg.register("MARTIN Pierre", "patient") + p2 = reg.register("MARTIN Rue", "adresse") + # "MARTIN" est commun mais catégories incompatibles → pas de match + assert p1 != p2 + + def test_person_categories_compatible(self): + """Les catégories de personnes (patient/medecin/soignant) sont compatibles.""" + reg = EntityRegistry() + p1 = reg.register("DUPONT", "patient") + p2 = reg.register("DUPONT Pierre", "medecin") + # Catégories de personnes compatibles → même pseudo + assert p1 == p2 + def test_fix_double_brackets(self): - """Les doubles crochets sont corrigés.""" + """Les doubles crochets fermants sont corrigés.""" from src.anonymization.anonymizer import Anonymizer result = Anonymizer._fix_brackets("Mme[PERSONNE_14]]") assert result == "Mme [PERSONNE_14]" + def test_fix_double_open_brackets(self): + """Les doubles crochets ouvrants sont corrigés.""" + from src.anonymization.anonymizer import Anonymizer + result = Anonymizer._fix_brackets("Dr [[PERSONNE_7]") + assert result == "Dr [PERSONNE_7]" + + def test_fix_orphan_digits(self): + """Les chiffres orphelins entre crochets sont supprimés.""" + from src.anonymization.anonymizer import Anonymizer + result = Anonymizer._fix_brackets("[PERSONNE_6]10]") + assert result == "[PERSONNE_6]" + + def test_fix_orphan_underscore_digits(self): + """Les _N] orphelins sont supprimés.""" + from src.anonymization.anonymizer import Anonymizer + result = Anonymizer._fix_brackets("[PERSONNE_4]_2]") + assert result == "[PERSONNE_4]" + + def test_fix_glued_tags(self): + """Deux tags collés ][ reçoivent un espace.""" + from src.anonymization.anonymizer import Anonymizer + result = Anonymizer._fix_brackets("[MEDECIN_5][MEDECIN_6]") + assert result == "[MEDECIN_5] [MEDECIN_6]" + def test_fix_glued_bracket(self): """Un tag collé à un mot reçoit un espace.""" from src.anonymization.anonymizer import Anonymizer