# core/anonymisation/pii_blur.py """Floutage ciblé des PII côté serveur (Personal Identifiable Information). Contexte -------- L'ancien blur côté client (`agent_v0/agent_v1/vision/blur_sensitive.py`) était trop agressif : il floutait TOUTES les zones blanches avec texte, ce qui détruisait les codes CIM-10, les montants PMSI, les boutons et rendait les screenshots inutilisables pour le replay ou le grounding VLM. De plus, `opencv-python` n'était pas listé dans les dépendances de l'agent, donc le blur échouait silencieusement en production. Stratégie retenue (avril 2026) ------------------------------ 1. Agent = zéro blur → envoie les screenshots bruts via TLS. 2. Serveur = OCR (docTR) + NER (EDS-NLP avec fallback regex). 3. On floute UNIQUEMENT les entités : - PERSON → noms, prénoms - LOCATION → adresses, villes - PHONE → numéros de téléphone - NIR → numéro de sécurité sociale - EMAIL → adresses électroniques Et on préserve : - codes CIM-10 / CCAM - montants (1250€, 31,50 €) - dates (pas PII au sens RGPD santé) - identifiants techniques (shot_0001, session IDs…) 4. Deux fichiers sont stockés : - `shot_XXXX_full.png` → version brute (accès restreint) - `shot_XXXX_full_blurred.png` → version pour affichage Performance ----------- Objectif : < 2 s par screenshot sur RTX 5070. docTR (db_mobilenet_v3_large + crnn_mobilenet_v3_large) : ~800 ms CPU, ~300 ms GPU. EDS-NLP pipeline minimal : ~100 ms pour un texte d'écran typique. Fallback regex : < 10 ms. """ from __future__ import annotations import logging import os import re import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import Iterable, List, Optional, Sequence, Tuple, Union logger = logging.getLogger(__name__) # ============================================================================= # Types # ============================================================================= # Type d'entité PII reconnu. Aligné sur les labels EDS-NLP (`nlp.pipes.eds`) # et enrichi par nos propres patterns regex. PII_LABELS = frozenset({ "PERSON", # noms de patient, médecin "LOCATION", # adresses, ville, code postal "ADDRESS", # alias de LOCATION (certains pipelines le produisent) "PHONE", # téléphone "NIR", # numéro sécu FR (15 chiffres) "SECURITY_NUMBER", # alias de NIR "EMAIL", # adresse email }) # Motifs qu'on NE DOIT PAS flouter même s'ils ressemblent à des PII : # - codes CIM-10 : 1 lettre + 2 chiffres + optionnellement .xx # - codes CCAM : 4 lettres + 3 chiffres # - montants (€, euros) # - dates format fr (dd/mm/yyyy, dd-mm-yy) # - identifiants techniques (ex: shot_0001, session_xxxxx) _RE_ICD10 = re.compile(r"\b[A-Z]\d{2}(\.\d{1,3})?\b") _RE_CCAM = re.compile(r"\b[A-Z]{4}\d{3}\b") _RE_MONEY = re.compile(r"\b\d{1,3}(?:[.,\s]\d{3})*(?:[.,]\d{1,2})?\s?€\b", re.IGNORECASE) _RE_DATE = re.compile(r"\b(0?[1-9]|[12]\d|3[01])[/.-](0?[1-9]|1[0-2])[/.-](\d{2}|\d{4})\b") _RE_TECH_ID = re.compile(r"\b(?:shot|session|sess|frame|trace|req|msg)_[\w-]+\b", re.IGNORECASE) # ============================================================================= # Entités PII # ============================================================================= @dataclass(frozen=True) class PIIEntity: """Une entité PII détectée dans un screenshot.""" label: str # PERSON, LOCATION, PHONE, NIR, EMAIL text: str # Texte brut détecté bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) en pixels confidence: float = 1.0 # Score NER (1.0 si regex) source: str = "ner" # "ner" (EDS-NLP) ou "regex" @dataclass class PIIBlurResult: """Résultat du pipeline de blur.""" raw_path: Path blurred_path: Path entities: List[PIIEntity] = field(default_factory=list) elapsed_ms: float = 0.0 ocr_ms: float = 0.0 ner_ms: float = 0.0 blur_ms: float = 0.0 ocr_engine: str = "doctr" ner_engine: str = "regex" # ou "edsnlp" @property def count(self) -> int: return len(self.entities) # ============================================================================= # Fallback NER par regex (utilisé si EDS-NLP indisponible) # ============================================================================= # Précaution : on ne marque comme PHONE que des suites contiguës de 10 chiffres # (FR) ou un format international. Les codes à 3-4 chiffres sont ignorés. _RE_PHONE = re.compile( r"\b(?:(?:\+?33|0)\s?[1-9])(?:[\s.-]?\d{2}){4}\b" ) _RE_NIR = re.compile( r"\b[12]\s?\d{2}\s?(?:0[1-9]|1[0-2]|20)\s?(?:\d{2}|2A|2B)\s?\d{3}\s?\d{3}(?:\s?\d{2})?\b" ) _RE_EMAIL = re.compile( r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE ) # Nom : Prénom Nom (au moins 2 majuscules initiales). Attrape aussi # "Mme Dupont", "M. Martin", "Dr. Bernard". # On utilise [^\S\n] (whitespace SANS newline) pour empêcher le match de sauter # de ligne — les lignes sont typiquement des champs distincts dans une UI métier. _RE_PERSON = re.compile( r"\b(?:M\.?|Mme|Mlle|Dr\.?|Pr\.?|Prof\.?)[^\S\n]+" r"[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+" r"(?:[^\S\n]+[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+)?" ) # Adresse : "12 rue de la Paix", "3, avenue Victor Hugo" # Même principe : on empêche le matching de franchir les sauts de ligne. _RE_ADDRESS = re.compile( r"\b\d{1,4}(?:[^\S\n]?(?:bis|ter|quater))?[,\s]+(?:rue|avenue|av\.?|bd|boulevard|" r"allée|all\.?|place|impasse|chemin|route|rte\.?|quai|cours|voie|passage)" r"[^\S\n]+(?:de[^\S\n]+|du[^\S\n]+|des[^\S\n]+|la[^\S\n]+|le[^\S\n]+|les[^\S\n]+|l'|de[^\S\n]+la[^\S\n]+|d')?" r"[A-Za-zÀ-ÿ\-' ]{2,40}", re.IGNORECASE, ) def _regex_find_pii(text: str) -> List[Tuple[str, int, int]]: """Retourne une liste de (label, offset_debut, offset_fin) par regex. Les motifs "techniques" (codes CIM, montants, dates) sont explicitement exclus même si un autre regex les attrape. """ # 1. Collecter toutes les plages à NE PAS flouter protected: List[Tuple[int, int]] = [] for rx in (_RE_ICD10, _RE_CCAM, _RE_MONEY, _RE_DATE, _RE_TECH_ID): for m in rx.finditer(text): protected.append(m.span()) def _is_protected(start: int, end: int) -> bool: for p_start, p_end in protected: # recouvrement non nul if start < p_end and end > p_start: return True return False hits: List[Tuple[str, int, int]] = [] for label, rx in ( ("NIR", _RE_NIR), ("EMAIL", _RE_EMAIL), ("PHONE", _RE_PHONE), ("PERSON", _RE_PERSON), ("LOCATION", _RE_ADDRESS), ): for m in rx.finditer(text): if _is_protected(m.start(), m.end()): continue hits.append((label, m.start(), m.end())) return hits # ============================================================================= # NER via EDS-NLP (optionnel) # ============================================================================= _edsnlp_pipeline = None def _get_edsnlp_pipeline(): """Charge une pipeline EDS-NLP si le module est disponible. Retourne None si EDS-NLP n'est pas installé — le pipeline retombera alors sur le NER regex. """ global _edsnlp_pipeline if _edsnlp_pipeline is not None: return _edsnlp_pipeline try: import edsnlp # type: ignore except ImportError: logger.info( "EDS-NLP non installé — fallback regex utilisé pour la détection PII. " "Pour activer EDS-NLP : pip install edsnlp" ) return None try: nlp = edsnlp.blank("eds") nlp.add_pipe("eds.sentences") nlp.add_pipe("eds.normalizer") # Les composants disponibles dépendent de la version installée. # On les ajoute en try/except pour rester résilient. for pipe_name in ("eds.names", "eds.dates", "eds.addresses"): try: nlp.add_pipe(pipe_name) except Exception as e: # noqa: BLE001 logger.debug("EDS-NLP : composant %s indisponible (%s)", pipe_name, e) _edsnlp_pipeline = nlp logger.info("EDS-NLP : pipeline chargée") return _edsnlp_pipeline except Exception as e: # noqa: BLE001 logger.warning("EDS-NLP non utilisable (%s) — fallback regex", e) return None def _edsnlp_find_pii(text: str, nlp) -> List[Tuple[str, int, int]]: """Utilise EDS-NLP pour trouver des entités PII. Les labels EDS-NLP sont mappés vers nos labels canoniques. """ try: doc = nlp(text) except Exception as e: # noqa: BLE001 logger.debug("EDS-NLP : échec sur texte de %d chars (%s)", len(text), e) return [] mapping = { "person": "PERSON", "name": "PERSON", "patient": "PERSON", "doctor": "PERSON", "location": "LOCATION", "address": "LOCATION", "city": "LOCATION", } hits: List[Tuple[str, int, int]] = [] for ent in getattr(doc, "ents", []): raw_label = str(getattr(ent, "label_", "")).lower() mapped = mapping.get(raw_label) if mapped is None: # On accepte aussi si le label EDS-NLP est déjà l'un de nos labels upper = raw_label.upper() if upper in PII_LABELS: mapped = upper if mapped: hits.append((mapped, ent.start_char, ent.end_char)) return hits # ============================================================================= # OCR avec bounding boxes par mot (docTR) # ============================================================================= _ocr_predictor = None def _get_ocr_predictor(): """Charge un prédicteur docTR léger (mobilenet) pour l'OCR rapide.""" global _ocr_predictor if _ocr_predictor is not None: return _ocr_predictor from doctr.models import ocr_predictor # type: ignore _ocr_predictor = ocr_predictor( det_arch="db_mobilenet_v3_large", reco_arch="crnn_mobilenet_v3_large", pretrained=True, ) # GPU si disponible try: import torch # type: ignore if torch.cuda.is_available(): _ocr_predictor = _ocr_predictor.cuda() logger.info("pii_blur : docTR chargé sur CUDA") else: logger.info("pii_blur : docTR chargé sur CPU") except Exception: # noqa: BLE001 logger.info("pii_blur : docTR chargé (device indéterminé)") return _ocr_predictor def _doctr_ocr(image_path: Path) -> Tuple[List[dict], int, int]: """Exécute docTR et retourne une liste de mots avec leurs bbox pixel. Retour : (words, width, height) où words = [{text, x1, y1, x2, y2}, ...] """ from doctr.io import DocumentFile # type: ignore from PIL import Image predictor = _get_ocr_predictor() doc = DocumentFile.from_images([str(image_path)]) result = predictor(doc) # Les coords sont normalisées (0..1). On les remappe vers la taille réelle. with Image.open(image_path) as img: W, H = img.size words: List[dict] = [] line_counter = 0 for page in result.pages: for block in page.blocks: for line in block.lines: for word in line.words: text = word.value if not text or not text.strip(): continue (nx1, ny1), (nx2, ny2) = word.geometry x1 = max(0, int(nx1 * W)) y1 = max(0, int(ny1 * H)) x2 = min(W, int(nx2 * W)) y2 = min(H, int(ny2 * H)) words.append({ "text": text, "x1": x1, "y1": y1, "x2": x2, "y2": y2, "line": line_counter, }) line_counter += 1 return words, W, H # ============================================================================= # Pipeline principal # ============================================================================= class PIIBlurrer: """Pipeline réutilisable (garde les modèles en mémoire entre appels). Exemple : blurrer = PIIBlurrer() res = blurrer.blur_image("shot_0001_full.png") print(res.count, res.elapsed_ms) """ def __init__( self, blur_kernel: Tuple[int, int] = (31, 31), blur_sigma: float = 15.0, bbox_padding: int = 2, use_edsnlp: bool = True, ) -> None: self._blur_kernel = blur_kernel self._blur_sigma = blur_sigma self._bbox_padding = bbox_padding self._use_edsnlp = use_edsnlp # ------------------------------------------------------------------ # Point d'entrée publique # ------------------------------------------------------------------ def blur_image( self, input_path: Union[str, Path], output_path: Optional[Union[str, Path]] = None, ) -> PIIBlurResult: """Floute les PII détectées et écrit la version floutée sur disque. Args: input_path: Chemin vers le screenshot brut (PNG/JPG). output_path: Chemin de sortie. Défaut : `_blurred.png` à côté de l'input. Returns: PIIBlurResult avec les timings et la liste des entités détectées. """ input_path = Path(input_path) if not input_path.is_file(): raise FileNotFoundError(f"Screenshot introuvable : {input_path}") if output_path is None: output_path = input_path.with_name( f"{input_path.stem}_blurred{input_path.suffix or '.png'}" ) else: output_path = Path(output_path) t_start = time.perf_counter() # 1. OCR t_ocr = time.perf_counter() try: words, W, H = _doctr_ocr(input_path) except Exception as e: # noqa: BLE001 logger.warning("pii_blur : OCR docTR échoué (%s) — pas de blur appliqué", e) # On copie simplement l'original vers la version "blurred" _copy_file(input_path, output_path) return PIIBlurResult( raw_path=input_path, blurred_path=output_path, entities=[], elapsed_ms=(time.perf_counter() - t_start) * 1000, ) ocr_ms = (time.perf_counter() - t_ocr) * 1000 if not words: _copy_file(input_path, output_path) return PIIBlurResult( raw_path=input_path, blurred_path=output_path, entities=[], elapsed_ms=(time.perf_counter() - t_start) * 1000, ocr_ms=ocr_ms, ) # 2. Reconstituer le texte ligne par ligne en conservant la correspondance # (offset_char → mot) pour pouvoir repérer les bbox des entités. text, char_to_word = _build_text_with_map(words) # 3. NER : EDS-NLP si dispo, sinon regex t_ner = time.perf_counter() ner_engine = "regex" entities_spans: List[Tuple[str, int, int]] = [] if self._use_edsnlp: nlp = _get_edsnlp_pipeline() if nlp is not None: entities_spans = _edsnlp_find_pii(text, nlp) ner_engine = "edsnlp" # Toujours compléter avec le regex (EDS-NLP ne couvre pas tous les PII # fréquents : email, NIR, téléphone français). entities_spans.extend(_regex_find_pii(text)) ner_ms = (time.perf_counter() - t_ner) * 1000 # Dédupliquer et normaliser entities_spans = _merge_spans(entities_spans) # 4. Convertir (label, start, end) → PIIEntity(label, text, bbox pixel) pii_entities: List[PIIEntity] = [] for label, start, end in entities_spans: if label not in PII_LABELS: continue bbox = _spans_to_bbox(start, end, char_to_word, words, self._bbox_padding, W, H) if bbox is None: continue pii_entities.append(PIIEntity( label=label, text=text[start:end], bbox=bbox, confidence=1.0, source=("ner" if ner_engine == "edsnlp" else "regex"), )) # 5. Appliquer le blur gaussien sur les bbox t_blur = time.perf_counter() _apply_blur(input_path, output_path, pii_entities, kernel=self._blur_kernel, sigma=self._blur_sigma) blur_ms = (time.perf_counter() - t_blur) * 1000 elapsed_ms = (time.perf_counter() - t_start) * 1000 if pii_entities: logger.info( "pii_blur : %d PII floutés sur %s (%.0fms : ocr=%.0f ner=%.0f blur=%.0f, ner=%s)", len(pii_entities), input_path.name, elapsed_ms, ocr_ms, ner_ms, blur_ms, ner_engine, ) else: logger.debug( "pii_blur : aucune PII détectée dans %s (%.0fms)", input_path.name, elapsed_ms, ) return PIIBlurResult( raw_path=input_path, blurred_path=output_path, entities=pii_entities, elapsed_ms=elapsed_ms, ocr_ms=ocr_ms, ner_ms=ner_ms, blur_ms=blur_ms, ner_engine=ner_engine, ) # Instance singleton (lazy) _default_blurrer: Optional[PIIBlurrer] = None def blur_pii_on_image( input_path: Union[str, Path], output_path: Optional[Union[str, Path]] = None, ) -> PIIBlurResult: """Helper fonctionnel : instancie un PIIBlurrer singleton et l'applique.""" global _default_blurrer if _default_blurrer is None: _default_blurrer = PIIBlurrer() return _default_blurrer.blur_image(input_path, output_path) # ============================================================================= # Helpers internes # ============================================================================= def _copy_file(src: Path, dst: Path) -> None: """Copie bytewise (utilisé quand aucun PII n'est détecté / OCR KO).""" dst.parent.mkdir(parents=True, exist_ok=True) with open(src, "rb") as f_in, open(dst, "wb") as f_out: f_out.write(f_in.read()) def _build_text_with_map(words: Sequence[dict]) -> Tuple[str, List[int]]: """Concatène les mots en texte + mappe chaque caractère vers son index de mot. Quand deux mots consécutifs appartiennent à des lignes différentes (champ `line` dans le dict), on insère un `\n` au lieu d'un espace. Cela empêche les regex gloutons (PERSON, LOCATION…) de matcher à travers des lignes logiques, qui sont typiquement des champs distincts dans une UI métier. Returns: text : str concaténé (mots séparés par un espace ou un \n) char_to_word : list[int] len == len(text), char_to_word[i] = index du mot (ou -1 pour les séparateurs). """ parts: List[str] = [] char_to_word: List[int] = [] prev_line: Optional[int] = None for i, w in enumerate(words): cur_line = w.get("line") if i > 0: if prev_line is not None and cur_line is not None and cur_line != prev_line: sep = "\n" else: sep = " " parts.append(sep) char_to_word.append(-1) txt = w["text"] parts.append(txt) char_to_word.extend([i] * len(txt)) prev_line = cur_line return "".join(parts), char_to_word def _spans_to_bbox( start: int, end: int, char_to_word: Sequence[int], words: Sequence[dict], padding: int, image_w: int, image_h: int, ) -> Optional[Tuple[int, int, int, int]]: """Convertit une plage [start, end[ dans le texte en bbox englobant les mots.""" if end <= start or start >= len(char_to_word): return None word_ids = set() for i in range(start, min(end, len(char_to_word))): wid = char_to_word[i] if wid >= 0: word_ids.add(wid) if not word_ids: return None xs1, ys1, xs2, ys2 = [], [], [], [] for wid in word_ids: w = words[wid] xs1.append(w["x1"]); ys1.append(w["y1"]) xs2.append(w["x2"]); ys2.append(w["y2"]) x1 = max(0, min(xs1) - padding) y1 = max(0, min(ys1) - padding) x2 = min(image_w, max(xs2) + padding) y2 = min(image_h, max(ys2) + padding) if x2 <= x1 or y2 <= y1: return None return (x1, y1, x2, y2) def _merge_spans( spans: Sequence[Tuple[str, int, int]], ) -> List[Tuple[str, int, int]]: """Déduplique et fusionne les plages qui se chevauchent sur un même label. En cas de conflit inter-labels, on garde celui qui couvre le plus large. """ if not spans: return [] # Trier par start puis par -width (le plus long d'abord pour les ties) sorted_spans = sorted(spans, key=lambda s: (s[1], -(s[2] - s[1]))) merged: List[Tuple[str, int, int]] = [] for label, s, e in sorted_spans: if not merged: merged.append((label, s, e)) continue last_label, ls, le = merged[-1] if s < le: # chevauchement # On garde l'étendue fusionnée avec le label du plus large new_start = min(ls, s) new_end = max(le, e) new_label = last_label if (le - ls) >= (e - s) else label merged[-1] = (new_label, new_start, new_end) else: merged.append((label, s, e)) return merged def _apply_blur( src: Path, dst: Path, entities: Sequence[PIIEntity], kernel: Tuple[int, int], sigma: float, ) -> None: """Applique un flou gaussien sur les bbox des entités et écrit l'image.""" from PIL import Image with Image.open(src) as img: if img.mode != "RGB": img = img.convert("RGB") if not entities: dst.parent.mkdir(parents=True, exist_ok=True) img.save(dst, format="PNG", optimize=True) return # On privilégie OpenCV s'il est disponible (plus rapide), # sinon on utilise PIL ImageFilter.GaussianBlur. try: import cv2 # type: ignore import numpy as np # type: ignore arr = np.array(img) bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) for ent in entities: x1, y1, x2, y2 = ent.bbox if x2 <= x1 or y2 <= y1: continue roi = bgr[y1:y2, x1:x2] if roi.size == 0: continue k = (max(3, kernel[0] | 1), max(3, kernel[1] | 1)) # impair bgr[y1:y2, x1:x2] = cv2.GaussianBlur(roi, k, sigma) out = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) img = Image.fromarray(out) except ImportError: from PIL import ImageFilter radius = max(sigma / 2, 4.0) for ent in entities: x1, y1, x2, y2 = ent.bbox region = img.crop((x1, y1, x2, y2)) if region.size[0] == 0 or region.size[1] == 0: continue blurred = region.filter(ImageFilter.GaussianBlur(radius=radius)) img.paste(blurred, (x1, y1)) dst.parent.mkdir(parents=True, exist_ok=True) img.save(dst, format="PNG", optimize=True)