Files
rpa_vision_v3/core/anonymisation/pii_blur.py
Dom f7b8cddd2b feat(anonymisation): blur PII côté serveur via EDS-NLP + VLM local-first
Blur PII server-side (core/anonymisation/pii_blur.py) :
- Pipeline OCR (docTR) → NER (EDS-NLP + fallback regex)
- Détection ciblée noms/prénoms/adresses/NIR/téléphone/email
- Protection explicite CIM-10, CCAM, montants €, dates, IDs techniques
- Dual-storage : shot_XXXX_full.png (brut) + _blurred.png (affichage)
- 18 tests

Client :
- RPA_BLUR_SENSITIVE=false par défaut (blur serveur uniquement)
- Zéro overhead côté poste utilisateur

VLM config :
- vlm_config.py : gemma4:latest, fallbacks qwen3-vl:8b + UI-TARS
- think=false auto pour gemma4 (bug Ollama 0.20.x)
- VLM provider VWB : local-first (Ollama), cloud opt-in via VLM_ALLOW_CLOUD

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:48:23 +02:00

651 lines
23 KiB
Python

# core/anonymisation/pii_blur.py
"""Floutage ciblé des PII côté serveur (Personal Identifiable Information).
Contexte
--------
L'ancien blur côté client (`agent_v0/agent_v1/vision/blur_sensitive.py`) était
trop agressif : il floutait TOUTES les zones blanches avec texte, ce qui
détruisait les codes CIM-10, les montants PMSI, les boutons et rendait les
screenshots inutilisables pour le replay ou le grounding VLM. De plus,
`opencv-python` n'était pas listé dans les dépendances de l'agent, donc le blur
échouait silencieusement en production.
Stratégie retenue (avril 2026)
------------------------------
1. Agent = zéro blur → envoie les screenshots bruts via TLS.
2. Serveur = OCR (docTR) + NER (EDS-NLP avec fallback regex).
3. On floute UNIQUEMENT les entités :
- PERSON → noms, prénoms
- LOCATION → adresses, villes
- PHONE → numéros de téléphone
- NIR → numéro de sécurité sociale
- EMAIL → adresses électroniques
Et on préserve :
- codes CIM-10 / CCAM
- montants (1250€, 31,50 €)
- dates (pas PII au sens RGPD santé)
- identifiants techniques (shot_0001, session IDs…)
4. Deux fichiers sont stockés :
- `shot_XXXX_full.png` → version brute (accès restreint)
- `shot_XXXX_full_blurred.png` → version pour affichage
Performance
-----------
Objectif : < 2 s par screenshot sur RTX 5070.
docTR (db_mobilenet_v3_large + crnn_mobilenet_v3_large) : ~800 ms CPU, ~300 ms GPU.
EDS-NLP pipeline minimal : ~100 ms pour un texte d'écran typique.
Fallback regex : < 10 ms.
"""
from __future__ import annotations
import logging
import os
import re
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, List, Optional, Sequence, Tuple, Union
logger = logging.getLogger(__name__)
# =============================================================================
# Types
# =============================================================================
# Type d'entité PII reconnu. Aligné sur les labels EDS-NLP (`nlp.pipes.eds`)
# et enrichi par nos propres patterns regex.
PII_LABELS = frozenset({
"PERSON", # noms de patient, médecin
"LOCATION", # adresses, ville, code postal
"ADDRESS", # alias de LOCATION (certains pipelines le produisent)
"PHONE", # téléphone
"NIR", # numéro sécu FR (15 chiffres)
"SECURITY_NUMBER", # alias de NIR
"EMAIL", # adresse email
})
# Motifs qu'on NE DOIT PAS flouter même s'ils ressemblent à des PII :
# - codes CIM-10 : 1 lettre + 2 chiffres + optionnellement .xx
# - codes CCAM : 4 lettres + 3 chiffres
# - montants (€, euros)
# - dates format fr (dd/mm/yyyy, dd-mm-yy)
# - identifiants techniques (ex: shot_0001, session_xxxxx)
_RE_ICD10 = re.compile(r"\b[A-Z]\d{2}(\.\d{1,3})?\b")
_RE_CCAM = re.compile(r"\b[A-Z]{4}\d{3}\b")
_RE_MONEY = re.compile(r"\b\d{1,3}(?:[.,\s]\d{3})*(?:[.,]\d{1,2})?\s?€\b", re.IGNORECASE)
_RE_DATE = re.compile(r"\b(0?[1-9]|[12]\d|3[01])[/.-](0?[1-9]|1[0-2])[/.-](\d{2}|\d{4})\b")
_RE_TECH_ID = re.compile(r"\b(?:shot|session|sess|frame|trace|req|msg)_[\w-]+\b", re.IGNORECASE)
# =============================================================================
# Entités PII
# =============================================================================
@dataclass(frozen=True)
class PIIEntity:
"""Une entité PII détectée dans un screenshot."""
label: str # PERSON, LOCATION, PHONE, NIR, EMAIL
text: str # Texte brut détecté
bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) en pixels
confidence: float = 1.0 # Score NER (1.0 si regex)
source: str = "ner" # "ner" (EDS-NLP) ou "regex"
@dataclass
class PIIBlurResult:
"""Résultat du pipeline de blur."""
raw_path: Path
blurred_path: Path
entities: List[PIIEntity] = field(default_factory=list)
elapsed_ms: float = 0.0
ocr_ms: float = 0.0
ner_ms: float = 0.0
blur_ms: float = 0.0
ocr_engine: str = "doctr"
ner_engine: str = "regex" # ou "edsnlp"
@property
def count(self) -> int:
return len(self.entities)
# =============================================================================
# Fallback NER par regex (utilisé si EDS-NLP indisponible)
# =============================================================================
# Précaution : on ne marque comme PHONE que des suites contiguës de 10 chiffres
# (FR) ou un format international. Les codes à 3-4 chiffres sont ignorés.
_RE_PHONE = re.compile(
r"\b(?:(?:\+?33|0)\s?[1-9])(?:[\s.-]?\d{2}){4}\b"
)
_RE_NIR = re.compile(
r"\b[12]\s?\d{2}\s?(?:0[1-9]|1[0-2]|20)\s?(?:\d{2}|2A|2B)\s?\d{3}\s?\d{3}(?:\s?\d{2})?\b"
)
_RE_EMAIL = re.compile(
r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE
)
# Nom : Prénom Nom (au moins 2 majuscules initiales). Attrape aussi
# "Mme Dupont", "M. Martin", "Dr. Bernard".
# On utilise [^\S\n] (whitespace SANS newline) pour empêcher le match de sauter
# de ligne — les lignes sont typiquement des champs distincts dans une UI métier.
_RE_PERSON = re.compile(
r"\b(?:M\.?|Mme|Mlle|Dr\.?|Pr\.?|Prof\.?)[^\S\n]+"
r"[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+"
r"(?:[^\S\n]+[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+)?"
)
# Adresse : "12 rue de la Paix", "3, avenue Victor Hugo"
# Même principe : on empêche le matching de franchir les sauts de ligne.
_RE_ADDRESS = re.compile(
r"\b\d{1,4}(?:[^\S\n]?(?:bis|ter|quater))?[,\s]+(?:rue|avenue|av\.?|bd|boulevard|"
r"allée|all\.?|place|impasse|chemin|route|rte\.?|quai|cours|voie|passage)"
r"[^\S\n]+(?:de[^\S\n]+|du[^\S\n]+|des[^\S\n]+|la[^\S\n]+|le[^\S\n]+|les[^\S\n]+|l'|de[^\S\n]+la[^\S\n]+|d')?"
r"[A-Za-zÀ-ÿ\-' ]{2,40}",
re.IGNORECASE,
)
def _regex_find_pii(text: str) -> List[Tuple[str, int, int]]:
"""Retourne une liste de (label, offset_debut, offset_fin) par regex.
Les motifs "techniques" (codes CIM, montants, dates) sont explicitement
exclus même si un autre regex les attrape.
"""
# 1. Collecter toutes les plages à NE PAS flouter
protected: List[Tuple[int, int]] = []
for rx in (_RE_ICD10, _RE_CCAM, _RE_MONEY, _RE_DATE, _RE_TECH_ID):
for m in rx.finditer(text):
protected.append(m.span())
def _is_protected(start: int, end: int) -> bool:
for p_start, p_end in protected:
# recouvrement non nul
if start < p_end and end > p_start:
return True
return False
hits: List[Tuple[str, int, int]] = []
for label, rx in (
("NIR", _RE_NIR),
("EMAIL", _RE_EMAIL),
("PHONE", _RE_PHONE),
("PERSON", _RE_PERSON),
("LOCATION", _RE_ADDRESS),
):
for m in rx.finditer(text):
if _is_protected(m.start(), m.end()):
continue
hits.append((label, m.start(), m.end()))
return hits
# =============================================================================
# NER via EDS-NLP (optionnel)
# =============================================================================
_edsnlp_pipeline = None
def _get_edsnlp_pipeline():
"""Charge une pipeline EDS-NLP si le module est disponible.
Retourne None si EDS-NLP n'est pas installé — le pipeline retombera
alors sur le NER regex.
"""
global _edsnlp_pipeline
if _edsnlp_pipeline is not None:
return _edsnlp_pipeline
try:
import edsnlp # type: ignore
except ImportError:
logger.info(
"EDS-NLP non installé — fallback regex utilisé pour la détection PII. "
"Pour activer EDS-NLP : pip install edsnlp"
)
return None
try:
nlp = edsnlp.blank("eds")
nlp.add_pipe("eds.sentences")
nlp.add_pipe("eds.normalizer")
# Les composants disponibles dépendent de la version installée.
# On les ajoute en try/except pour rester résilient.
for pipe_name in ("eds.names", "eds.dates", "eds.addresses"):
try:
nlp.add_pipe(pipe_name)
except Exception as e: # noqa: BLE001
logger.debug("EDS-NLP : composant %s indisponible (%s)", pipe_name, e)
_edsnlp_pipeline = nlp
logger.info("EDS-NLP : pipeline chargée")
return _edsnlp_pipeline
except Exception as e: # noqa: BLE001
logger.warning("EDS-NLP non utilisable (%s) — fallback regex", e)
return None
def _edsnlp_find_pii(text: str, nlp) -> List[Tuple[str, int, int]]:
"""Utilise EDS-NLP pour trouver des entités PII.
Les labels EDS-NLP sont mappés vers nos labels canoniques.
"""
try:
doc = nlp(text)
except Exception as e: # noqa: BLE001
logger.debug("EDS-NLP : échec sur texte de %d chars (%s)", len(text), e)
return []
mapping = {
"person": "PERSON",
"name": "PERSON",
"patient": "PERSON",
"doctor": "PERSON",
"location": "LOCATION",
"address": "LOCATION",
"city": "LOCATION",
}
hits: List[Tuple[str, int, int]] = []
for ent in getattr(doc, "ents", []):
raw_label = str(getattr(ent, "label_", "")).lower()
mapped = mapping.get(raw_label)
if mapped is None:
# On accepte aussi si le label EDS-NLP est déjà l'un de nos labels
upper = raw_label.upper()
if upper in PII_LABELS:
mapped = upper
if mapped:
hits.append((mapped, ent.start_char, ent.end_char))
return hits
# =============================================================================
# OCR avec bounding boxes par mot (docTR)
# =============================================================================
_ocr_predictor = None
def _get_ocr_predictor():
"""Charge un prédicteur docTR léger (mobilenet) pour l'OCR rapide."""
global _ocr_predictor
if _ocr_predictor is not None:
return _ocr_predictor
from doctr.models import ocr_predictor # type: ignore
_ocr_predictor = ocr_predictor(
det_arch="db_mobilenet_v3_large",
reco_arch="crnn_mobilenet_v3_large",
pretrained=True,
)
# GPU si disponible
try:
import torch # type: ignore
if torch.cuda.is_available():
_ocr_predictor = _ocr_predictor.cuda()
logger.info("pii_blur : docTR chargé sur CUDA")
else:
logger.info("pii_blur : docTR chargé sur CPU")
except Exception: # noqa: BLE001
logger.info("pii_blur : docTR chargé (device indéterminé)")
return _ocr_predictor
def _doctr_ocr(image_path: Path) -> Tuple[List[dict], int, int]:
"""Exécute docTR et retourne une liste de mots avec leurs bbox pixel.
Retour : (words, width, height) où words = [{text, x1, y1, x2, y2}, ...]
"""
from doctr.io import DocumentFile # type: ignore
from PIL import Image
predictor = _get_ocr_predictor()
doc = DocumentFile.from_images([str(image_path)])
result = predictor(doc)
# Les coords sont normalisées (0..1). On les remappe vers la taille réelle.
with Image.open(image_path) as img:
W, H = img.size
words: List[dict] = []
line_counter = 0
for page in result.pages:
for block in page.blocks:
for line in block.lines:
for word in line.words:
text = word.value
if not text or not text.strip():
continue
(nx1, ny1), (nx2, ny2) = word.geometry
x1 = max(0, int(nx1 * W))
y1 = max(0, int(ny1 * H))
x2 = min(W, int(nx2 * W))
y2 = min(H, int(ny2 * H))
words.append({
"text": text,
"x1": x1, "y1": y1, "x2": x2, "y2": y2,
"line": line_counter,
})
line_counter += 1
return words, W, H
# =============================================================================
# Pipeline principal
# =============================================================================
class PIIBlurrer:
"""Pipeline réutilisable (garde les modèles en mémoire entre appels).
Exemple :
blurrer = PIIBlurrer()
res = blurrer.blur_image("shot_0001_full.png")
print(res.count, res.elapsed_ms)
"""
def __init__(
self,
blur_kernel: Tuple[int, int] = (31, 31),
blur_sigma: float = 15.0,
bbox_padding: int = 2,
use_edsnlp: bool = True,
) -> None:
self._blur_kernel = blur_kernel
self._blur_sigma = blur_sigma
self._bbox_padding = bbox_padding
self._use_edsnlp = use_edsnlp
# ------------------------------------------------------------------
# Point d'entrée publique
# ------------------------------------------------------------------
def blur_image(
self,
input_path: Union[str, Path],
output_path: Optional[Union[str, Path]] = None,
) -> PIIBlurResult:
"""Floute les PII détectées et écrit la version floutée sur disque.
Args:
input_path: Chemin vers le screenshot brut (PNG/JPG).
output_path: Chemin de sortie. Défaut :
`<stem>_blurred.png` à côté de l'input.
Returns:
PIIBlurResult avec les timings et la liste des entités détectées.
"""
input_path = Path(input_path)
if not input_path.is_file():
raise FileNotFoundError(f"Screenshot introuvable : {input_path}")
if output_path is None:
output_path = input_path.with_name(
f"{input_path.stem}_blurred{input_path.suffix or '.png'}"
)
else:
output_path = Path(output_path)
t_start = time.perf_counter()
# 1. OCR
t_ocr = time.perf_counter()
try:
words, W, H = _doctr_ocr(input_path)
except Exception as e: # noqa: BLE001
logger.warning("pii_blur : OCR docTR échoué (%s) — pas de blur appliqué", e)
# On copie simplement l'original vers la version "blurred"
_copy_file(input_path, output_path)
return PIIBlurResult(
raw_path=input_path,
blurred_path=output_path,
entities=[],
elapsed_ms=(time.perf_counter() - t_start) * 1000,
)
ocr_ms = (time.perf_counter() - t_ocr) * 1000
if not words:
_copy_file(input_path, output_path)
return PIIBlurResult(
raw_path=input_path,
blurred_path=output_path,
entities=[],
elapsed_ms=(time.perf_counter() - t_start) * 1000,
ocr_ms=ocr_ms,
)
# 2. Reconstituer le texte ligne par ligne en conservant la correspondance
# (offset_char → mot) pour pouvoir repérer les bbox des entités.
text, char_to_word = _build_text_with_map(words)
# 3. NER : EDS-NLP si dispo, sinon regex
t_ner = time.perf_counter()
ner_engine = "regex"
entities_spans: List[Tuple[str, int, int]] = []
if self._use_edsnlp:
nlp = _get_edsnlp_pipeline()
if nlp is not None:
entities_spans = _edsnlp_find_pii(text, nlp)
ner_engine = "edsnlp"
# Toujours compléter avec le regex (EDS-NLP ne couvre pas tous les PII
# fréquents : email, NIR, téléphone français).
entities_spans.extend(_regex_find_pii(text))
ner_ms = (time.perf_counter() - t_ner) * 1000
# Dédupliquer et normaliser
entities_spans = _merge_spans(entities_spans)
# 4. Convertir (label, start, end) → PIIEntity(label, text, bbox pixel)
pii_entities: List[PIIEntity] = []
for label, start, end in entities_spans:
if label not in PII_LABELS:
continue
bbox = _spans_to_bbox(start, end, char_to_word, words, self._bbox_padding, W, H)
if bbox is None:
continue
pii_entities.append(PIIEntity(
label=label,
text=text[start:end],
bbox=bbox,
confidence=1.0,
source=("ner" if ner_engine == "edsnlp" else "regex"),
))
# 5. Appliquer le blur gaussien sur les bbox
t_blur = time.perf_counter()
_apply_blur(input_path, output_path, pii_entities,
kernel=self._blur_kernel, sigma=self._blur_sigma)
blur_ms = (time.perf_counter() - t_blur) * 1000
elapsed_ms = (time.perf_counter() - t_start) * 1000
if pii_entities:
logger.info(
"pii_blur : %d PII floutés sur %s (%.0fms : ocr=%.0f ner=%.0f blur=%.0f, ner=%s)",
len(pii_entities), input_path.name, elapsed_ms,
ocr_ms, ner_ms, blur_ms, ner_engine,
)
else:
logger.debug(
"pii_blur : aucune PII détectée dans %s (%.0fms)",
input_path.name, elapsed_ms,
)
return PIIBlurResult(
raw_path=input_path,
blurred_path=output_path,
entities=pii_entities,
elapsed_ms=elapsed_ms,
ocr_ms=ocr_ms,
ner_ms=ner_ms,
blur_ms=blur_ms,
ner_engine=ner_engine,
)
# Instance singleton (lazy)
_default_blurrer: Optional[PIIBlurrer] = None
def blur_pii_on_image(
input_path: Union[str, Path],
output_path: Optional[Union[str, Path]] = None,
) -> PIIBlurResult:
"""Helper fonctionnel : instancie un PIIBlurrer singleton et l'applique."""
global _default_blurrer
if _default_blurrer is None:
_default_blurrer = PIIBlurrer()
return _default_blurrer.blur_image(input_path, output_path)
# =============================================================================
# Helpers internes
# =============================================================================
def _copy_file(src: Path, dst: Path) -> None:
"""Copie bytewise (utilisé quand aucun PII n'est détecté / OCR KO)."""
dst.parent.mkdir(parents=True, exist_ok=True)
with open(src, "rb") as f_in, open(dst, "wb") as f_out:
f_out.write(f_in.read())
def _build_text_with_map(words: Sequence[dict]) -> Tuple[str, List[int]]:
"""Concatène les mots en texte + mappe chaque caractère vers son index de mot.
Quand deux mots consécutifs appartiennent à des lignes différentes (champ
`line` dans le dict), on insère un `\n` au lieu d'un espace. Cela empêche
les regex gloutons (PERSON, LOCATION…) de matcher à travers des lignes
logiques, qui sont typiquement des champs distincts dans une UI métier.
Returns:
text : str concaténé (mots séparés par un espace ou un \n)
char_to_word : list[int] len == len(text), char_to_word[i] = index du mot
(ou -1 pour les séparateurs).
"""
parts: List[str] = []
char_to_word: List[int] = []
prev_line: Optional[int] = None
for i, w in enumerate(words):
cur_line = w.get("line")
if i > 0:
if prev_line is not None and cur_line is not None and cur_line != prev_line:
sep = "\n"
else:
sep = " "
parts.append(sep)
char_to_word.append(-1)
txt = w["text"]
parts.append(txt)
char_to_word.extend([i] * len(txt))
prev_line = cur_line
return "".join(parts), char_to_word
def _spans_to_bbox(
start: int,
end: int,
char_to_word: Sequence[int],
words: Sequence[dict],
padding: int,
image_w: int,
image_h: int,
) -> Optional[Tuple[int, int, int, int]]:
"""Convertit une plage [start, end[ dans le texte en bbox englobant les mots."""
if end <= start or start >= len(char_to_word):
return None
word_ids = set()
for i in range(start, min(end, len(char_to_word))):
wid = char_to_word[i]
if wid >= 0:
word_ids.add(wid)
if not word_ids:
return None
xs1, ys1, xs2, ys2 = [], [], [], []
for wid in word_ids:
w = words[wid]
xs1.append(w["x1"]); ys1.append(w["y1"])
xs2.append(w["x2"]); ys2.append(w["y2"])
x1 = max(0, min(xs1) - padding)
y1 = max(0, min(ys1) - padding)
x2 = min(image_w, max(xs2) + padding)
y2 = min(image_h, max(ys2) + padding)
if x2 <= x1 or y2 <= y1:
return None
return (x1, y1, x2, y2)
def _merge_spans(
spans: Sequence[Tuple[str, int, int]],
) -> List[Tuple[str, int, int]]:
"""Déduplique et fusionne les plages qui se chevauchent sur un même label.
En cas de conflit inter-labels, on garde celui qui couvre le plus large.
"""
if not spans:
return []
# Trier par start puis par -width (le plus long d'abord pour les ties)
sorted_spans = sorted(spans, key=lambda s: (s[1], -(s[2] - s[1])))
merged: List[Tuple[str, int, int]] = []
for label, s, e in sorted_spans:
if not merged:
merged.append((label, s, e))
continue
last_label, ls, le = merged[-1]
if s < le: # chevauchement
# On garde l'étendue fusionnée avec le label du plus large
new_start = min(ls, s)
new_end = max(le, e)
new_label = last_label if (le - ls) >= (e - s) else label
merged[-1] = (new_label, new_start, new_end)
else:
merged.append((label, s, e))
return merged
def _apply_blur(
src: Path,
dst: Path,
entities: Sequence[PIIEntity],
kernel: Tuple[int, int],
sigma: float,
) -> None:
"""Applique un flou gaussien sur les bbox des entités et écrit l'image."""
from PIL import Image
with Image.open(src) as img:
if img.mode != "RGB":
img = img.convert("RGB")
if not entities:
dst.parent.mkdir(parents=True, exist_ok=True)
img.save(dst, format="PNG", optimize=True)
return
# On privilégie OpenCV s'il est disponible (plus rapide),
# sinon on utilise PIL ImageFilter.GaussianBlur.
try:
import cv2 # type: ignore
import numpy as np # type: ignore
arr = np.array(img)
bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)
for ent in entities:
x1, y1, x2, y2 = ent.bbox
if x2 <= x1 or y2 <= y1:
continue
roi = bgr[y1:y2, x1:x2]
if roi.size == 0:
continue
k = (max(3, kernel[0] | 1), max(3, kernel[1] | 1)) # impair
bgr[y1:y2, x1:x2] = cv2.GaussianBlur(roi, k, sigma)
out = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
img = Image.fromarray(out)
except ImportError:
from PIL import ImageFilter
radius = max(sigma / 2, 4.0)
for ent in entities:
x1, y1, x2, y2 = ent.bbox
region = img.crop((x1, y1, x2, y2))
if region.size[0] == 0 or region.size[1] == 0:
continue
blurred = region.filter(ImageFilter.GaussianBlur(radius=radius))
img.paste(blurred, (x1, y1))
dst.parent.mkdir(parents=True, exist_ok=True)
img.save(dst, format="PNG", optimize=True)