diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 72925e6..0baf6bb 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -257,6 +257,50 @@ _MEDICAL_STOP_WORDS_SET = { "proctologue", "oncologue", "anesthésiste", "pneumologue", "gérontologue", "cardiologue", "néphrologue", "urologue", "gériatre", "hépatologue", "endocrinologue", "stomatologue", + # Termes médicaux / titres fréquemment détectés comme NOM par le NER + "supplémentation", "supplementation", "endocrinologie", "monsieur", "madame", + "suivi", "sortie", "emog", "ophtalmo", + # Médicaments détectés comme NOM/PRENOM par EDS-Pseudo + "eliquis", "trulicity", "saos", "wind", "taxotere", "eupantol", "ezetimibe", + "lansoyl", "xatral", "xenetix", "trimbow", "buspirone", "cetirizine", + "depakote", "versatis", "durogesic", "montelukast", "metformine", "viatris", + "rosuvastatine", "gliclazide", "amlodipine", "perindopril", "nebivolol", + "pravastatine", "bisoprolol", "amoxicilline", "kardegic", "lovenox", + # Termes médicaux / soins / actes détectés comme NOM + "partielle", "cutanee", "cutané", "cutanée", "osseuse", "diabetique", + "diabétique", "transdermique", "transderm", "diarrhees", "diarrhées", + "ionogramme", "scintigraphie", "thoraco", "thorax", "négative", "negative", + "diététicienne", "pressurise", "pressuriser", "inhalee", "inhalée", "inhal", + # Mots courants français détectés comme NOM dans les trackare + "toilette", "repas", "poche", "installation", "education", "éducation", + "refection", "réfection", "complete", "complète", "regime", "régime", + "normal", "traité", "traite", "arrêté", "arrete", "volume", + "commentaires", "france", "covid", "framboise", "epoux", "époux", + # Abréviations médicales courtes (3-4 chars) détectées comme NOM + "ide", "ipp", "pcr", "tap", "gel", "ahl", "ssr", "hds", "tca", "etp", + "mcg", "sdz", "iao", "ser", "orod", "clav", "disp", "cart", "atcd", "mdrd", + "amox", "endoc", "microg", "item", "pyélo", "néphro", + # En-têtes de colonnes / mots structurels trackare + "observations", "observation", "commentaires", "commentaire", + "surveillance", "température", "temperature", "glycémie", "glycemie", + "diurèse", "diurese", "balance", "pouls", "systolique", "diastolique", + "saturation", "fréquence", "frequence", "respiratoire", "douleur", + "alertes", "alerte", "antécédents", "antecedents", "habitus", + "allergies", "prescriptions", "prescription", "administration", + "catégorie", "categorie", "expiration", "message", + "destination", "diagnostique", "diagnostiques", + "date", "note", "nom", "heure", "type", "code", "etat", + "comprime", "comprimé", "gelule", "gélule", "solution", "injectable", + # Médicaments supplémentaires détectés dans les trackare + "depakote", "versatis", "humalog", "forxiga", "durogesic", + "montelukast", "rosuvastatine", + # Abréviations pharma courtes + "cpr", "sol", "bic", "agt", "poche", "inhal", "regina", + # Faux positifs EDS supplémentaires + "psy", "inhales", "inhalés", "kwikpen", "lansoprazole", "tiorfan", "smecta", + "axa", "ttt", "anionique", "abdomino", "cod", "omi", "urg", "med", + "10mg", "20mg", "40mg", "100mg", "300ui", "500ml", "innohep", "coaprovel", + "actiskenan", "simvastatine", "forlax", # Mots clés de contexte document "compétences", "maladies", "inflammatoires", "systémiques", "rares", "fret", "fax", "contexte", "résultat", "resultat", "résultats", "resultats", @@ -374,7 +418,8 @@ RE_CODE_POSTAL = re.compile( r"(?:(?:[Cc]ode\s*[Pp]ostal|CP)\s*[:\-]?\s*(\d{5}))" r"|" # 5 chiffres + nom de ville (Title Case ou MAJUSCULES), pas précédé d'un chiffre (évite RPPS) - r"(?:(? Tuple[set, List[PiiHit]]: names.add(tok) # --- Identité patient --- - # Nom de naissance: DIEGO - m = re.search(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE) - if m: + # Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire) + for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Nom et Prénom: DIEGO PATRICIA - m = re.search(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE) - if m: + for m in re.finditer(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Lieu de naissance: BAYONNE → masquer comme VILLE - m = re.search(r"Lieu\s+de\s+naissance\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE) - if m: + for m in re.finditer(r"Lieu\s+de\s+naissance\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) names.add(val) # Ville de résidence: TARNOS → masquer comme VILLE - m = re.search(r"Ville\s+de\s+r[ée]sidence\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE) - if m: + for m in re.finditer(r"Ville\s+de\s+r[ée]sidence\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) names.add(val) - # Code Postal (seul sur la ligne "Nationalité: FRANCE Code Postal: 40220") - m = re.search(r"[Cc]ode\s*[Pp]ostal\s*:\s*(\d{5})", full_text) - if m: + # Code Postal (toutes occurrences) + for m in re.finditer(r"[Cc]ode\s*[Pp]ostal\s*:\s*(\d{5})", full_text): hits.append(PiiHit(-1, "CODE_POSTAL", m.group(1), PLACEHOLDERS["CODE_POSTAL"])) - # Adresse patient - m = re.search(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE) - if m: + # N° épisode (= NDA, identifiant de séjour) + for m in re.finditer(r"Episode\s*N[o°.]?\s*\.?\s*:\s*(\d{5,})", full_text): + hits.append(PiiHit(-1, "EPISODE", m.group(1), PLACEHOLDERS.get("NDA", "[NDA]"))) + + # Adresse patient (toutes les occurrences) + for m in re.finditer(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() if len(val) > 3: hits.append(PiiHit(-1, "ADRESSE", val, PLACEHOLDERS["ADRESSE"])) @@ -812,14 +855,12 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: for m in re.finditer(r"Patient\s*:\s*(.+?)\s*-\s*Date\s+de\s+naissance", full_text): _add_name(m.group(1).strip()) - # --- Médecin courant --- - m = re.search(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE) - if m: + # --- Médecin courant (toutes occurrences) --- + for m in re.finditer(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # --- Médecin traitant (ligne après "Nom Adresse Téléphone") --- - m = re.search(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE) - if m: + for m in re.finditer(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # --- Contacts structurés --- @@ -853,6 +894,16 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: _add_name(m.group(1)) _add_name(m.group(2)) + # --- Noms soignants multi-lignes : "Prénom\nNOM" dans les tableaux de prescriptions/soins --- + for m in re.finditer( + r'\b([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{2,})\s*\n\s*([A-ZÉÈÀÙÂÊÎÔÛ]{3,})\b', + full_text + ): + prenom, nom = m.group(1), m.group(2) + if prenom.lower() not in _MEDICAL_STOP_WORDS_SET and nom.lower() not in _MEDICAL_STOP_WORDS_SET: + _add_name(prenom) + _add_name(nom) + # Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement) city_tokens = {h.original for h in hits if h.kind == "VILLE"} filtered = set() @@ -967,7 +1018,9 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: """Remplace globalement chaque nom extrait dans le texte.""" placeholder = PLACEHOLDERS["NOM"] - for token in sorted(names, key=len, reverse=True): + # Filtrer les stop words et tokens trop courts en dernière ligne de défense + safe_names = {n for n in names if len(n) >= 3 and n.lower() not in _MEDICAL_STOP_WORDS_SET} + for token in sorted(safe_names, key=len, reverse=True): pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE) new_text = [] last_end = 0 @@ -1119,8 +1172,18 @@ def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, continue if len(w) <= 2: continue - placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"]) + # Filtrer les faux positifs NOM/PRENOM (médicaments, acronymes médicaux) label = e.get("entity_group", "EDS") + if label in ("NOM", "PRENOM"): + if w.lower() in _MEDICAL_STOP_WORDS_SET: + continue + # Filtrer aussi les tokens multi-mots dont un composant est un stop word + if " " in w and any(part.lower() in _MEDICAL_STOP_WORDS_SET for part in w.split()): + continue + # Filtrer les dosages détectés comme noms (ex: "10MG", "300UI", "1 000") + if re.match(r"^\d[\d\s]*(?:mg|MG|ml|ML|UI|µg|mcg|g|kg|%)?$", w.strip()): + continue + placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"]) audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder)) out = repl_once(out, w, placeholder) return out @@ -1224,6 +1287,18 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str: # ----------------- PDF Redaction ----------------- +def _search_whole_word(page, token: str) -> list: + """Cherche un token comme mot entier (pas substring) via get_text('words'). + Évite les faux positifs de page.search_for() qui fait du substring matching.""" + rects = [] + token_lower = token.lower().strip() + for w in page.get_text("words"): + # w = (x0, y0, x1, y1, word, block_no, line_no, word_no) + word_text = w[4].strip(".,;:!?()[]{}\"'«»-–—/\\") + if word_text.lower() == token_lower: + rects.append(fitz.Rect(w[0], w[1], w[2], w[3])) + return rects + def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") @@ -1232,6 +1307,12 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> by_page: Dict[int, List[PiiHit]] = {} for h in audit: by_page.setdefault(h.page, []).append(h) + # Kinds à ne pas chercher dans le PDF (dates masquées uniquement dans le texte, + # pas dans le PDF où elles rendent les tableaux illisibles) + _VECTOR_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU"} + # Kinds dont les tokens courts (< 5) risquent le substring matching via page.search_for() + _VECTOR_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", + "EDS_HOPITAL", "ETAB", "ETAB_GLOBAL"} for pno in range(len(doc)): page = doc[pno] hits = by_page.get(pno, []) + by_page.get(-1, []) @@ -1241,6 +1322,17 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> token = h.original.strip() if not token: continue + # Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF) + if h.kind in _VECTOR_SKIP_KINDS: + continue + # Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter + # les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations") + if h.kind in _VECTOR_SHORT_TOKEN_KINDS and len(token) < 5: + if token.lower() not in _MEDICAL_STOP_WORDS_SET: + rects = _search_whole_word(page, token) + for r in rects: + page.add_redact_annot(r, fill=(0,0,0)) + continue rects = page.search_for(token) if not rects and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) @@ -1250,7 +1342,7 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") - if len(word) < 3 or word.lower() in _MEDICAL_STOP_WORDS_SET: + if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue if not word[0].isupper(): continue @@ -1273,10 +1365,22 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp for pno in range(len(doc)): page = doc[pno] rects = [] + _RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU"} + _RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", + "EDS_HOPITAL", "ETAB", "ETAB_GLOBAL"} hits = [x for x in audit if x.page in {pno, -1}] for h in hits: token = h.original.strip() if not token: continue + # Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF) + if h.kind in _RASTER_SKIP_KINDS: + continue + # Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter + # les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations") + if h.kind in _RASTER_SHORT_TOKEN_KINDS and len(token) < 5: + if token.lower() not in _MEDICAL_STOP_WORDS_SET: + rects.extend(_search_whole_word(page, token)) + continue found = page.search_for(token) if not found and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) @@ -1286,9 +1390,7 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") - if len(word) < 3: - continue - if word.lower() in _MEDICAL_STOP_WORDS_SET: + if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue # Ne garder que les mots qui ressemblent à des noms propres if not word[0].isupper(): @@ -1447,6 +1549,23 @@ def process_pdf( for val in values: anon.audit.append(PiiHit(page=-1, kind=f"{kind}_GLOBAL", original=val, placeholder=placeholder)) + # 4e) Appliquer les tokens globaux sur le texte pseudonymisé + _GLOBAL_SKIP_KINDS = {"EDS_DATE_GLOBAL", "DATE_NAISSANCE_GLOBAL"} + for h in anon.audit: + if h.page != -1: + continue + if not (h.kind == "NOM_GLOBAL" or h.kind.endswith("_GLOBAL")): + continue + if h.kind in _GLOBAL_SKIP_KINDS: + continue + token = h.original.strip() + if not token or len(token) < 3: + continue + try: + final_text = re.sub(rf"\b{re.escape(token)}\b", h.placeholder, final_text) + except re.error: + final_text = final_text.replace(token, h.placeholder) + # Log OCR dans l'audit if ocr_used: anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder=""))