feat: qualité anonymisation — sur-anonymisation, fuites PHI, nettoyage bruit

P0-A: stop words français + seuil subparts 5 chars + sweep conditionnel
P0-B: 6 nouveaux patterns PHI (DDN, Par, N Ipp, Adresse, DEMANDE, venue)
P2-C: cohérence pseudonymes (_find_matching_entity) + fix crochets
P1-B: text_cleaner.py — sidebar OCR, footers, dédup vitales, collapse blanks
P1-A: dédup CRH par SequenceMatcher (seuil 85%)
Tests: 34 nouveaux tests (996 pass, 0 fail)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-25 14:00:07 +01:00
parent 63354e75bc
commit f4a23a5f43
7 changed files with 499 additions and 8 deletions

View File

@@ -31,6 +31,9 @@ MEDICAL_TERMS_WHITELIST = {
"angiocholite", "cholécystite", "cholecystite",
"morphine", "paracétamol", "paracetamol", "cétirizine", "cetirizine",
"tramadol", "contramal", "acupan", "nefopam",
"mestinon", "augmentin", "doliprane", "spasfon", "lasilix",
"lovenox", "kardegic", "inexium", "mopral", "gaviscon",
"loxen", "perfalgan", "profenid", "voltarene", "toplexil",
"service", "médecin", "medecin", "docteur", "chirurgie",
"gastro", "entérologie", "enterologie", "oncologie",
"hépato", "hepato", "digestif", "digestive",
@@ -156,6 +159,23 @@ class Anonymizer:
text, n = self._replace_footer(text)
count += n
# Documents spécialisés (BACTERIO, CONSULTATION, ANAPATH)
text, n = self._replace_pattern(text, patterns.DDN_PATTERN, "date_naissance")
count += n
text, n = self._replace_pattern(text, patterns.PAR_NOM_PATTERN, "soignant")
count += n
text, n = self._replace_pattern(text, patterns.DEMANDE_NUM_PATTERN, "identifiant")
count += n
text, n = self._replace_pattern(text, patterns.VENUE_PATTERN, "episode")
count += n
text, n = self._replace_pattern(text, patterns.N_IPP_PATTERN, "ipp")
count += n
text, n = self._replace_pattern(
text, patterns.CONSULT_ADRESSE_PATTERN, "adresse",
skip_establishment_check=True,
)
count += n
self.report.regex_replacements = count
return text
@@ -204,9 +224,15 @@ class Anonymizer:
# --- Phase 3 : Balayage final ---
def _phase3_sweep(self, text: str) -> str:
"""Balayage brute-force des entités connues restantes."""
"""Balayage brute-force des entités connues restantes.
Les sous-parties (fragments de noms composés) ne sont remplacées
que si elles apparaissent en contexte de nom propre (capitalisées)
pour éviter la sur-anonymisation de mots courants.
"""
count = 0
all_entities = self.registry.get_all_entities()
subparts = self.registry.get_subparts()
for original, replacement in sorted(
all_entities.items(), key=lambda x: len(x[0]), reverse=True
@@ -219,12 +245,36 @@ class Anonymizer:
# Recherche insensible à la casse, avec frontières de mots
escaped = re.escape(original)
pattern = re.compile(r"\b" + escaped + r"\b", re.IGNORECASE)
matches = pattern.findall(text)
if matches:
matches = list(pattern.finditer(text))
if not matches:
continue
# Les sous-parties ne sont remplacées que si capitalisées
# (contexte de nom propre vs. mot courant)
if original in subparts:
for m in reversed(matches):
matched_text = m.group(0)
if matched_text[0].isupper():
text = text[:m.start()] + replacement + text[m.end():]
count += 1
else:
text = pattern.sub(replacement, text)
count += len(matches)
self.report.sweep_replacements = count
# Post-traitement : corriger les crochets malformés
text = self._fix_brackets(text)
return text
@staticmethod
def _fix_brackets(text: str) -> str:
"""Corrige les crochets malformés après anonymisation."""
# Doubles crochets fermants
text = re.sub(r"\]\]", "]", text)
# Ajouter un espace avant un tag collé à un mot
text = re.sub(r"([A-Za-zéèêëàâäùûüôöîïç])\[", r"\1 [", text)
return text
# --- Helpers ---

View File

@@ -5,6 +5,20 @@ from __future__ import annotations
import re
from collections import defaultdict
# Mots français trop courants pour être des sous-parties fiables
FRENCH_STOP_WORDS = {
# Articles, déterminants, prépositions
"les", "des", "une", "uns", "aux", "ces", "ses", "mes",
"tes", "nos", "vos", "mon", "ton", "son", "sur", "par",
"pour", "dans", "avec", "sans", "sous", "vers", "chez",
"entre", "mais", "donc", "car", "pas", "plus", "bien",
"peu", "très", "trop", "tout", "tous", "rien", "fait",
"été", "sont", "ont", "qui", "que", "dont", "peut",
"cette", "être", "avoir", "faire", "dire", "aussi",
# Prénoms courts très courants (trop de faux positifs)
"jean", "paul", "marc", "anne", "marie",
}
class EntityRegistry:
"""Maintient un mapping cohérent entre entités réelles et pseudonymes."""
@@ -13,6 +27,7 @@ class EntityRegistry:
self._counters: dict[str, int] = defaultdict(int)
self._mappings: dict[str, str] = {}
self._category_map: dict[str, str] = {}
self._subparts: set[str] = set()
self._whitelist: set[str] = whitelist or set()
def register(self, entity: str, category: str) -> str:
@@ -27,6 +42,12 @@ class EntityRegistry:
if key in self._mappings:
return self._mappings[key]
# Vérifier si une entité existante est un sur-ensemble ou sous-ensemble
existing = self._find_matching_entity(key)
if existing is not None:
self._mappings[key] = existing
return existing
self._counters[category] += 1
count = self._counters[category]
@@ -34,17 +55,22 @@ class EntityRegistry:
self._mappings[key] = pseudo
self._category_map[key] = category
# Enregistrer aussi les sous-parties du nom (sauf termes médicaux)
# Enregistrer les sous-parties du nom (seuil >= 5 chars, pas de stop words)
parts = key.split()
if len(parts) > 1:
for part in parts:
if len(part) >= 3 and part not in self._whitelist:
part_key = part
if part_key not in self._mappings:
self._mappings[part_key] = f"[{category.upper()}]"
if (len(part) >= 5
and part not in self._whitelist
and part not in FRENCH_STOP_WORDS
and part not in self._mappings):
self._subparts.add(part)
return pseudo
def is_subpart(self, key: str) -> bool:
"""Vérifie si un token est une sous-partie (pas une entité complète)."""
return self._normalize(key) in self._subparts
def get_replacement(self, entity: str) -> str | None:
"""Retourne le pseudonyme d'une entité connue, ou None."""
key = self._normalize(entity)
@@ -58,6 +84,26 @@ class EntityRegistry:
"""Retourne toutes les entités originales (noms avant normalisation)."""
return list(self._mappings.keys())
def get_subparts(self) -> set[str]:
"""Retourne l'ensemble des sous-parties enregistrées."""
return set(self._subparts)
def _find_matching_entity(self, key: str) -> str | None:
"""Cherche une entité existante qui est un sur- ou sous-ensemble de key.
Retourne le pseudo de l'entité existante si trouvée, None sinon.
"""
key_parts = set(key.split())
for existing_key, pseudo in self._mappings.items():
existing_parts = set(existing_key.split())
# key est contenu dans une entité existante
if key_parts and key_parts.issubset(existing_parts):
return pseudo
# Une entité existante est contenue dans key
if existing_parts and existing_parts.issubset(key_parts) and len(existing_parts) > 0:
return pseudo
return None
def _normalize(self, text: str) -> str:
"""Normalise un nom pour lookup : minuscules, espaces simplifiés."""
text = text.strip()

View File

@@ -167,6 +167,38 @@ LIEU_NAISSANCE_PATTERN = regex.compile(
r"Lieu de naissance\s*:\s*(.+?)(?:\n|$)"
)
# --- Documents spécialisés (BACTERIO, CONSULTATION_ANESTH, ANAPATH) ---
# "DDN : 21/01/1948" ou "DDN : 21-01-1948"
DDN_PATTERN = regex.compile(
r"DDN\s*:\s*(\d{2}[/\-]\d{2}[/\-]\d{4})"
)
# "Par : GENDRE Juliette" (prescripteur/préleveur)
PAR_NOM_PATTERN = regex.compile(
r"Par\s*:\s*([A-ZÉÈÊËÀÂ][A-Za-zéèêëàâäùûüôöîïç\.\-]+(?:\s+[A-Za-zéèêëàâäùûüôöîïç\.\-]+)+)"
)
# "DEMANDE N° 2300126709"
DEMANDE_NUM_PATTERN = regex.compile(
r"DEMANDE\s*N°?\s*(\d{8,12})"
)
# "N° venue : 23111304"
VENUE_PATTERN = regex.compile(
r"\s*venue\s*:\s*(\d{6,10})"
)
# "N Ipp : 19029841"
N_IPP_PATTERN = regex.compile(
r"N\s+Ipp\s*:\s*(\d{6,10})"
)
# "Adresse : 15 rue des Lilas 64100 BAYONNE" (consultation anesthésie)
CONSULT_ADRESSE_PATTERN = regex.compile(
r"Adresse\s*:\s*(.+?)(?:\n|$)"
)
# Auteurs de prescription dans Trackare
PRESCRIPTION_AUTHOR_PATTERN = regex.compile(
r"(?:Presc\.\s*de\s*Sortie|Normal|Signé|Arrêté|Réalisé)\s+([A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][a-zéèêëàâäùûüôöîïç]+(?:\s+[A-ZÉÈÊËÀÂÄÙÛÜÔÖÎÏÇ][A-Za-zéèêëàâäùûüôöîïç\-]+)+)"

View File

@@ -12,6 +12,7 @@ from __future__ import annotations
import re
import logging
from difflib import SequenceMatcher
logger = logging.getLogger(__name__)
@@ -121,4 +122,35 @@ def _split_crh(text: str) -> list[str]:
end = crh_starts[i + 1] if i + 1 < len(crh_starts) else len(text)
chunks.append(text[start:end].rstrip())
# Déduplication : supprimer les copies quasi-identiques (destinataires multiples)
chunks = _dedup_chunks(chunks)
return chunks
def _dedup_chunks(chunks: list[str], threshold: float = 0.85) -> list[str]:
"""Supprime les chunks quasi-identiques (copies pour destinataires multiples).
Compare les 500 premiers caractères de chaque paire.
Si le ratio de similarité > threshold, le doublon est supprimé.
"""
if len(chunks) <= 1:
return chunks
duplicates: set[int] = set()
for i in range(len(chunks)):
if i in duplicates:
continue
for j in range(i + 1, len(chunks)):
if j in duplicates:
continue
ratio = SequenceMatcher(
None,
chunks[i][:500],
chunks[j][:500],
).ratio()
if ratio > threshold:
duplicates.add(j)
logger.info(" CRH chunk %d doublon de %d (ratio=%.2f), supprimé", j + 1, i + 1, ratio)
return [c for idx, c in enumerate(chunks) if idx not in duplicates]

View File

@@ -8,6 +8,7 @@ from typing import Optional
import pdfplumber
from .page_tracker import PageTracker
from .text_cleaner import clean_extracted_text
def extract_text(pdf_path: str | Path) -> str:
@@ -31,6 +32,7 @@ def extract_text_with_pages(pdf_path: str | Path) -> tuple[str, PageTracker]:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
text = clean_extracted_text(text)
pages_text.append(text)
# Construire le texte complet avec "\n\n" comme séparateur (identique à extract_text)

View File

@@ -0,0 +1,107 @@
"""Nettoyage du texte brut extrait des PDF avant anonymisation.
Supprime le bruit OCR (lignes isolées, footers de page, sidebars,
tables vitales dupliquées) pour améliorer la qualité de l'analyse IA.
"""
from __future__ import annotations
import re
def clean_extracted_text(text: str) -> str:
"""Nettoie le texte brut extrait d'une page PDF."""
text = _remove_single_char_lines(text)
text = _remove_page_footers(text)
text = _remove_print_metadata(text)
text = _dedup_vital_signs(text)
text = _collapse_blank_lines(text)
return text
def _remove_single_char_lines(text: str) -> str:
"""Supprime les lignes isolées de 1-2 caractères (artefacts OCR de sidebar)."""
# Garder les lignes qui sont des chiffres isolés (listes numérotées)
# ou des indicateurs médicaux (-, +, /)
return re.sub(
r"^(?![0-9\-\+/])(.{1,2})$\n?",
"",
text,
flags=re.MULTILINE,
)
def _remove_page_footers(text: str) -> str:
"""Supprime les footers de page répétitifs."""
# "V1 - Imprime le DD/MM/YYYY a HH:MM ... Page(s): N sur M"
text = re.sub(
r"V\d\s*-\s*Imprim[ée]e?\s+le\s+\d{2}/\d{2}/\d{4}.*?Page.*?\d+\s+sur\s+\d+",
"",
text,
)
# "Information patient Page N DD/MM/YYYY HH:MM:SS"
text = re.sub(
r"Information patient\s+Page\s+\d+\s+\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2}",
"",
text,
)
# Footers patient répétés (CRH) : "Patient(e) : NOM ... N° Episode ..."
# Garder la première occurrence, supprimer les suivantes
footer_pat = re.compile(
r"Patient\(e\)\s*:.*?N°?\s*Episode\s+\d+.*?$",
re.MULTILINE,
)
matches = list(footer_pat.finditer(text))
if len(matches) > 1:
# Supprimer toutes les occurrences sauf la première
for m in reversed(matches[1:]):
text = text[:m.start()] + text[m.end():]
return text
def _remove_print_metadata(text: str) -> str:
"""Supprime les lignes de métadonnées d'impression pures."""
# "Imprimé le DD/MM/YYYY à HH:MM:SS"
text = re.sub(
r"^Imprim[ée]e?\s+le\s+\d{2}/\d{2}/\d{4}\s+[àa]\s+\d{2}:\d{2}(?::\d{2})?.*$",
"",
text,
flags=re.MULTILINE,
)
# "Edité le DD/MM/YYYY"
text = re.sub(
r"^[EÉ]dit[ée]e?\s+le\s+\d{2}/\d{2}/\d{4}.*$",
"",
text,
flags=re.MULTILINE,
)
return text
def _dedup_vital_signs(text: str) -> str:
"""Supprime les sections vitales dupliquées (Trackare).
Les sections "Surv. Isolement et Contention" et "Surv. Contention"
contiennent souvent les mêmes données que "Signes vitaux".
"""
has_signes_vitaux = bool(re.search(r"Signes vitaux", text))
if not has_signes_vitaux:
return text
# Supprimer les blocs "Surv. Isolement et Contention" et "Surv. Contention"
# Le bloc va du titre jusqu'au prochain titre de section ou double newline significatif
text = re.sub(
r"Surv\.\s*(?:Isolement\s*(?:et|&)\s*)?Contention\s*\n(?:.*\n)*?(?=\n(?:[A-Z]|\Z))",
"",
text,
)
return text
def _collapse_blank_lines(text: str) -> str:
"""Fusionne les lignes vides consécutives (max 2)."""
return re.sub(r"\n{3,}", "\n\n", text)