Blur PII server-side (core/anonymisation/pii_blur.py) : - Pipeline OCR (docTR) → NER (EDS-NLP + fallback regex) - Détection ciblée noms/prénoms/adresses/NIR/téléphone/email - Protection explicite CIM-10, CCAM, montants €, dates, IDs techniques - Dual-storage : shot_XXXX_full.png (brut) + _blurred.png (affichage) - 18 tests Client : - RPA_BLUR_SENSITIVE=false par défaut (blur serveur uniquement) - Zéro overhead côté poste utilisateur VLM config : - vlm_config.py : gemma4:latest, fallbacks qwen3-vl:8b + UI-TARS - think=false auto pour gemma4 (bug Ollama 0.20.x) - VLM provider VWB : local-first (Ollama), cloud opt-in via VLM_ALLOW_CLOUD Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
651 lines
23 KiB
Python
651 lines
23 KiB
Python
# core/anonymisation/pii_blur.py
|
|
"""Floutage ciblé des PII côté serveur (Personal Identifiable Information).
|
|
|
|
Contexte
|
|
--------
|
|
L'ancien blur côté client (`agent_v0/agent_v1/vision/blur_sensitive.py`) était
|
|
trop agressif : il floutait TOUTES les zones blanches avec texte, ce qui
|
|
détruisait les codes CIM-10, les montants PMSI, les boutons et rendait les
|
|
screenshots inutilisables pour le replay ou le grounding VLM. De plus,
|
|
`opencv-python` n'était pas listé dans les dépendances de l'agent, donc le blur
|
|
échouait silencieusement en production.
|
|
|
|
Stratégie retenue (avril 2026)
|
|
------------------------------
|
|
1. Agent = zéro blur → envoie les screenshots bruts via TLS.
|
|
2. Serveur = OCR (docTR) + NER (EDS-NLP avec fallback regex).
|
|
3. On floute UNIQUEMENT les entités :
|
|
- PERSON → noms, prénoms
|
|
- LOCATION → adresses, villes
|
|
- PHONE → numéros de téléphone
|
|
- NIR → numéro de sécurité sociale
|
|
- EMAIL → adresses électroniques
|
|
Et on préserve :
|
|
- codes CIM-10 / CCAM
|
|
- montants (1250€, 31,50 €)
|
|
- dates (pas PII au sens RGPD santé)
|
|
- identifiants techniques (shot_0001, session IDs…)
|
|
4. Deux fichiers sont stockés :
|
|
- `shot_XXXX_full.png` → version brute (accès restreint)
|
|
- `shot_XXXX_full_blurred.png` → version pour affichage
|
|
|
|
Performance
|
|
-----------
|
|
Objectif : < 2 s par screenshot sur RTX 5070.
|
|
docTR (db_mobilenet_v3_large + crnn_mobilenet_v3_large) : ~800 ms CPU, ~300 ms GPU.
|
|
EDS-NLP pipeline minimal : ~100 ms pour un texte d'écran typique.
|
|
Fallback regex : < 10 ms.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Iterable, List, Optional, Sequence, Tuple, Union
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Types
|
|
# =============================================================================
|
|
|
|
# Type d'entité PII reconnu. Aligné sur les labels EDS-NLP (`nlp.pipes.eds`)
|
|
# et enrichi par nos propres patterns regex.
|
|
PII_LABELS = frozenset({
|
|
"PERSON", # noms de patient, médecin
|
|
"LOCATION", # adresses, ville, code postal
|
|
"ADDRESS", # alias de LOCATION (certains pipelines le produisent)
|
|
"PHONE", # téléphone
|
|
"NIR", # numéro sécu FR (15 chiffres)
|
|
"SECURITY_NUMBER", # alias de NIR
|
|
"EMAIL", # adresse email
|
|
})
|
|
|
|
# Motifs qu'on NE DOIT PAS flouter même s'ils ressemblent à des PII :
|
|
# - codes CIM-10 : 1 lettre + 2 chiffres + optionnellement .xx
|
|
# - codes CCAM : 4 lettres + 3 chiffres
|
|
# - montants (€, euros)
|
|
# - dates format fr (dd/mm/yyyy, dd-mm-yy)
|
|
# - identifiants techniques (ex: shot_0001, session_xxxxx)
|
|
_RE_ICD10 = re.compile(r"\b[A-Z]\d{2}(\.\d{1,3})?\b")
|
|
_RE_CCAM = re.compile(r"\b[A-Z]{4}\d{3}\b")
|
|
_RE_MONEY = re.compile(r"\b\d{1,3}(?:[.,\s]\d{3})*(?:[.,]\d{1,2})?\s?€\b", re.IGNORECASE)
|
|
_RE_DATE = re.compile(r"\b(0?[1-9]|[12]\d|3[01])[/.-](0?[1-9]|1[0-2])[/.-](\d{2}|\d{4})\b")
|
|
_RE_TECH_ID = re.compile(r"\b(?:shot|session|sess|frame|trace|req|msg)_[\w-]+\b", re.IGNORECASE)
|
|
|
|
|
|
# =============================================================================
|
|
# Entités PII
|
|
# =============================================================================
|
|
|
|
@dataclass(frozen=True)
|
|
class PIIEntity:
|
|
"""Une entité PII détectée dans un screenshot."""
|
|
label: str # PERSON, LOCATION, PHONE, NIR, EMAIL
|
|
text: str # Texte brut détecté
|
|
bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) en pixels
|
|
confidence: float = 1.0 # Score NER (1.0 si regex)
|
|
source: str = "ner" # "ner" (EDS-NLP) ou "regex"
|
|
|
|
|
|
@dataclass
|
|
class PIIBlurResult:
|
|
"""Résultat du pipeline de blur."""
|
|
raw_path: Path
|
|
blurred_path: Path
|
|
entities: List[PIIEntity] = field(default_factory=list)
|
|
elapsed_ms: float = 0.0
|
|
ocr_ms: float = 0.0
|
|
ner_ms: float = 0.0
|
|
blur_ms: float = 0.0
|
|
ocr_engine: str = "doctr"
|
|
ner_engine: str = "regex" # ou "edsnlp"
|
|
|
|
@property
|
|
def count(self) -> int:
|
|
return len(self.entities)
|
|
|
|
|
|
# =============================================================================
|
|
# Fallback NER par regex (utilisé si EDS-NLP indisponible)
|
|
# =============================================================================
|
|
|
|
# Précaution : on ne marque comme PHONE que des suites contiguës de 10 chiffres
|
|
# (FR) ou un format international. Les codes à 3-4 chiffres sont ignorés.
|
|
_RE_PHONE = re.compile(
|
|
r"\b(?:(?:\+?33|0)\s?[1-9])(?:[\s.-]?\d{2}){4}\b"
|
|
)
|
|
_RE_NIR = re.compile(
|
|
r"\b[12]\s?\d{2}\s?(?:0[1-9]|1[0-2]|20)\s?(?:\d{2}|2A|2B)\s?\d{3}\s?\d{3}(?:\s?\d{2})?\b"
|
|
)
|
|
_RE_EMAIL = re.compile(
|
|
r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE
|
|
)
|
|
# Nom : Prénom Nom (au moins 2 majuscules initiales). Attrape aussi
|
|
# "Mme Dupont", "M. Martin", "Dr. Bernard".
|
|
# On utilise [^\S\n] (whitespace SANS newline) pour empêcher le match de sauter
|
|
# de ligne — les lignes sont typiquement des champs distincts dans une UI métier.
|
|
_RE_PERSON = re.compile(
|
|
r"\b(?:M\.?|Mme|Mlle|Dr\.?|Pr\.?|Prof\.?)[^\S\n]+"
|
|
r"[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+"
|
|
r"(?:[^\S\n]+[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+)?"
|
|
)
|
|
# Adresse : "12 rue de la Paix", "3, avenue Victor Hugo"
|
|
# Même principe : on empêche le matching de franchir les sauts de ligne.
|
|
_RE_ADDRESS = re.compile(
|
|
r"\b\d{1,4}(?:[^\S\n]?(?:bis|ter|quater))?[,\s]+(?:rue|avenue|av\.?|bd|boulevard|"
|
|
r"allée|all\.?|place|impasse|chemin|route|rte\.?|quai|cours|voie|passage)"
|
|
r"[^\S\n]+(?:de[^\S\n]+|du[^\S\n]+|des[^\S\n]+|la[^\S\n]+|le[^\S\n]+|les[^\S\n]+|l'|de[^\S\n]+la[^\S\n]+|d')?"
|
|
r"[A-Za-zÀ-ÿ\-' ]{2,40}",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _regex_find_pii(text: str) -> List[Tuple[str, int, int]]:
|
|
"""Retourne une liste de (label, offset_debut, offset_fin) par regex.
|
|
|
|
Les motifs "techniques" (codes CIM, montants, dates) sont explicitement
|
|
exclus même si un autre regex les attrape.
|
|
"""
|
|
# 1. Collecter toutes les plages à NE PAS flouter
|
|
protected: List[Tuple[int, int]] = []
|
|
for rx in (_RE_ICD10, _RE_CCAM, _RE_MONEY, _RE_DATE, _RE_TECH_ID):
|
|
for m in rx.finditer(text):
|
|
protected.append(m.span())
|
|
|
|
def _is_protected(start: int, end: int) -> bool:
|
|
for p_start, p_end in protected:
|
|
# recouvrement non nul
|
|
if start < p_end and end > p_start:
|
|
return True
|
|
return False
|
|
|
|
hits: List[Tuple[str, int, int]] = []
|
|
for label, rx in (
|
|
("NIR", _RE_NIR),
|
|
("EMAIL", _RE_EMAIL),
|
|
("PHONE", _RE_PHONE),
|
|
("PERSON", _RE_PERSON),
|
|
("LOCATION", _RE_ADDRESS),
|
|
):
|
|
for m in rx.finditer(text):
|
|
if _is_protected(m.start(), m.end()):
|
|
continue
|
|
hits.append((label, m.start(), m.end()))
|
|
return hits
|
|
|
|
|
|
# =============================================================================
|
|
# NER via EDS-NLP (optionnel)
|
|
# =============================================================================
|
|
|
|
_edsnlp_pipeline = None
|
|
|
|
|
|
def _get_edsnlp_pipeline():
|
|
"""Charge une pipeline EDS-NLP si le module est disponible.
|
|
|
|
Retourne None si EDS-NLP n'est pas installé — le pipeline retombera
|
|
alors sur le NER regex.
|
|
"""
|
|
global _edsnlp_pipeline
|
|
if _edsnlp_pipeline is not None:
|
|
return _edsnlp_pipeline
|
|
try:
|
|
import edsnlp # type: ignore
|
|
except ImportError:
|
|
logger.info(
|
|
"EDS-NLP non installé — fallback regex utilisé pour la détection PII. "
|
|
"Pour activer EDS-NLP : pip install edsnlp"
|
|
)
|
|
return None
|
|
|
|
try:
|
|
nlp = edsnlp.blank("eds")
|
|
nlp.add_pipe("eds.sentences")
|
|
nlp.add_pipe("eds.normalizer")
|
|
# Les composants disponibles dépendent de la version installée.
|
|
# On les ajoute en try/except pour rester résilient.
|
|
for pipe_name in ("eds.names", "eds.dates", "eds.addresses"):
|
|
try:
|
|
nlp.add_pipe(pipe_name)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("EDS-NLP : composant %s indisponible (%s)", pipe_name, e)
|
|
_edsnlp_pipeline = nlp
|
|
logger.info("EDS-NLP : pipeline chargée")
|
|
return _edsnlp_pipeline
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("EDS-NLP non utilisable (%s) — fallback regex", e)
|
|
return None
|
|
|
|
|
|
def _edsnlp_find_pii(text: str, nlp) -> List[Tuple[str, int, int]]:
|
|
"""Utilise EDS-NLP pour trouver des entités PII.
|
|
|
|
Les labels EDS-NLP sont mappés vers nos labels canoniques.
|
|
"""
|
|
try:
|
|
doc = nlp(text)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("EDS-NLP : échec sur texte de %d chars (%s)", len(text), e)
|
|
return []
|
|
|
|
mapping = {
|
|
"person": "PERSON",
|
|
"name": "PERSON",
|
|
"patient": "PERSON",
|
|
"doctor": "PERSON",
|
|
"location": "LOCATION",
|
|
"address": "LOCATION",
|
|
"city": "LOCATION",
|
|
}
|
|
hits: List[Tuple[str, int, int]] = []
|
|
for ent in getattr(doc, "ents", []):
|
|
raw_label = str(getattr(ent, "label_", "")).lower()
|
|
mapped = mapping.get(raw_label)
|
|
if mapped is None:
|
|
# On accepte aussi si le label EDS-NLP est déjà l'un de nos labels
|
|
upper = raw_label.upper()
|
|
if upper in PII_LABELS:
|
|
mapped = upper
|
|
if mapped:
|
|
hits.append((mapped, ent.start_char, ent.end_char))
|
|
return hits
|
|
|
|
|
|
# =============================================================================
|
|
# OCR avec bounding boxes par mot (docTR)
|
|
# =============================================================================
|
|
|
|
_ocr_predictor = None
|
|
|
|
|
|
def _get_ocr_predictor():
|
|
"""Charge un prédicteur docTR léger (mobilenet) pour l'OCR rapide."""
|
|
global _ocr_predictor
|
|
if _ocr_predictor is not None:
|
|
return _ocr_predictor
|
|
from doctr.models import ocr_predictor # type: ignore
|
|
_ocr_predictor = ocr_predictor(
|
|
det_arch="db_mobilenet_v3_large",
|
|
reco_arch="crnn_mobilenet_v3_large",
|
|
pretrained=True,
|
|
)
|
|
# GPU si disponible
|
|
try:
|
|
import torch # type: ignore
|
|
if torch.cuda.is_available():
|
|
_ocr_predictor = _ocr_predictor.cuda()
|
|
logger.info("pii_blur : docTR chargé sur CUDA")
|
|
else:
|
|
logger.info("pii_blur : docTR chargé sur CPU")
|
|
except Exception: # noqa: BLE001
|
|
logger.info("pii_blur : docTR chargé (device indéterminé)")
|
|
return _ocr_predictor
|
|
|
|
|
|
def _doctr_ocr(image_path: Path) -> Tuple[List[dict], int, int]:
|
|
"""Exécute docTR et retourne une liste de mots avec leurs bbox pixel.
|
|
|
|
Retour : (words, width, height) où words = [{text, x1, y1, x2, y2}, ...]
|
|
"""
|
|
from doctr.io import DocumentFile # type: ignore
|
|
from PIL import Image
|
|
|
|
predictor = _get_ocr_predictor()
|
|
doc = DocumentFile.from_images([str(image_path)])
|
|
result = predictor(doc)
|
|
|
|
# Les coords sont normalisées (0..1). On les remappe vers la taille réelle.
|
|
with Image.open(image_path) as img:
|
|
W, H = img.size
|
|
|
|
words: List[dict] = []
|
|
line_counter = 0
|
|
for page in result.pages:
|
|
for block in page.blocks:
|
|
for line in block.lines:
|
|
for word in line.words:
|
|
text = word.value
|
|
if not text or not text.strip():
|
|
continue
|
|
(nx1, ny1), (nx2, ny2) = word.geometry
|
|
x1 = max(0, int(nx1 * W))
|
|
y1 = max(0, int(ny1 * H))
|
|
x2 = min(W, int(nx2 * W))
|
|
y2 = min(H, int(ny2 * H))
|
|
words.append({
|
|
"text": text,
|
|
"x1": x1, "y1": y1, "x2": x2, "y2": y2,
|
|
"line": line_counter,
|
|
})
|
|
line_counter += 1
|
|
return words, W, H
|
|
|
|
|
|
# =============================================================================
|
|
# Pipeline principal
|
|
# =============================================================================
|
|
|
|
class PIIBlurrer:
|
|
"""Pipeline réutilisable (garde les modèles en mémoire entre appels).
|
|
|
|
Exemple :
|
|
blurrer = PIIBlurrer()
|
|
res = blurrer.blur_image("shot_0001_full.png")
|
|
print(res.count, res.elapsed_ms)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
blur_kernel: Tuple[int, int] = (31, 31),
|
|
blur_sigma: float = 15.0,
|
|
bbox_padding: int = 2,
|
|
use_edsnlp: bool = True,
|
|
) -> None:
|
|
self._blur_kernel = blur_kernel
|
|
self._blur_sigma = blur_sigma
|
|
self._bbox_padding = bbox_padding
|
|
self._use_edsnlp = use_edsnlp
|
|
|
|
# ------------------------------------------------------------------
|
|
# Point d'entrée publique
|
|
# ------------------------------------------------------------------
|
|
def blur_image(
|
|
self,
|
|
input_path: Union[str, Path],
|
|
output_path: Optional[Union[str, Path]] = None,
|
|
) -> PIIBlurResult:
|
|
"""Floute les PII détectées et écrit la version floutée sur disque.
|
|
|
|
Args:
|
|
input_path: Chemin vers le screenshot brut (PNG/JPG).
|
|
output_path: Chemin de sortie. Défaut :
|
|
`<stem>_blurred.png` à côté de l'input.
|
|
|
|
Returns:
|
|
PIIBlurResult avec les timings et la liste des entités détectées.
|
|
"""
|
|
input_path = Path(input_path)
|
|
if not input_path.is_file():
|
|
raise FileNotFoundError(f"Screenshot introuvable : {input_path}")
|
|
|
|
if output_path is None:
|
|
output_path = input_path.with_name(
|
|
f"{input_path.stem}_blurred{input_path.suffix or '.png'}"
|
|
)
|
|
else:
|
|
output_path = Path(output_path)
|
|
|
|
t_start = time.perf_counter()
|
|
|
|
# 1. OCR
|
|
t_ocr = time.perf_counter()
|
|
try:
|
|
words, W, H = _doctr_ocr(input_path)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("pii_blur : OCR docTR échoué (%s) — pas de blur appliqué", e)
|
|
# On copie simplement l'original vers la version "blurred"
|
|
_copy_file(input_path, output_path)
|
|
return PIIBlurResult(
|
|
raw_path=input_path,
|
|
blurred_path=output_path,
|
|
entities=[],
|
|
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
|
)
|
|
ocr_ms = (time.perf_counter() - t_ocr) * 1000
|
|
|
|
if not words:
|
|
_copy_file(input_path, output_path)
|
|
return PIIBlurResult(
|
|
raw_path=input_path,
|
|
blurred_path=output_path,
|
|
entities=[],
|
|
elapsed_ms=(time.perf_counter() - t_start) * 1000,
|
|
ocr_ms=ocr_ms,
|
|
)
|
|
|
|
# 2. Reconstituer le texte ligne par ligne en conservant la correspondance
|
|
# (offset_char → mot) pour pouvoir repérer les bbox des entités.
|
|
text, char_to_word = _build_text_with_map(words)
|
|
|
|
# 3. NER : EDS-NLP si dispo, sinon regex
|
|
t_ner = time.perf_counter()
|
|
ner_engine = "regex"
|
|
entities_spans: List[Tuple[str, int, int]] = []
|
|
if self._use_edsnlp:
|
|
nlp = _get_edsnlp_pipeline()
|
|
if nlp is not None:
|
|
entities_spans = _edsnlp_find_pii(text, nlp)
|
|
ner_engine = "edsnlp"
|
|
# Toujours compléter avec le regex (EDS-NLP ne couvre pas tous les PII
|
|
# fréquents : email, NIR, téléphone français).
|
|
entities_spans.extend(_regex_find_pii(text))
|
|
ner_ms = (time.perf_counter() - t_ner) * 1000
|
|
|
|
# Dédupliquer et normaliser
|
|
entities_spans = _merge_spans(entities_spans)
|
|
|
|
# 4. Convertir (label, start, end) → PIIEntity(label, text, bbox pixel)
|
|
pii_entities: List[PIIEntity] = []
|
|
for label, start, end in entities_spans:
|
|
if label not in PII_LABELS:
|
|
continue
|
|
bbox = _spans_to_bbox(start, end, char_to_word, words, self._bbox_padding, W, H)
|
|
if bbox is None:
|
|
continue
|
|
pii_entities.append(PIIEntity(
|
|
label=label,
|
|
text=text[start:end],
|
|
bbox=bbox,
|
|
confidence=1.0,
|
|
source=("ner" if ner_engine == "edsnlp" else "regex"),
|
|
))
|
|
|
|
# 5. Appliquer le blur gaussien sur les bbox
|
|
t_blur = time.perf_counter()
|
|
_apply_blur(input_path, output_path, pii_entities,
|
|
kernel=self._blur_kernel, sigma=self._blur_sigma)
|
|
blur_ms = (time.perf_counter() - t_blur) * 1000
|
|
|
|
elapsed_ms = (time.perf_counter() - t_start) * 1000
|
|
if pii_entities:
|
|
logger.info(
|
|
"pii_blur : %d PII floutés sur %s (%.0fms : ocr=%.0f ner=%.0f blur=%.0f, ner=%s)",
|
|
len(pii_entities), input_path.name, elapsed_ms,
|
|
ocr_ms, ner_ms, blur_ms, ner_engine,
|
|
)
|
|
else:
|
|
logger.debug(
|
|
"pii_blur : aucune PII détectée dans %s (%.0fms)",
|
|
input_path.name, elapsed_ms,
|
|
)
|
|
|
|
return PIIBlurResult(
|
|
raw_path=input_path,
|
|
blurred_path=output_path,
|
|
entities=pii_entities,
|
|
elapsed_ms=elapsed_ms,
|
|
ocr_ms=ocr_ms,
|
|
ner_ms=ner_ms,
|
|
blur_ms=blur_ms,
|
|
ner_engine=ner_engine,
|
|
)
|
|
|
|
|
|
# Instance singleton (lazy)
|
|
_default_blurrer: Optional[PIIBlurrer] = None
|
|
|
|
|
|
def blur_pii_on_image(
|
|
input_path: Union[str, Path],
|
|
output_path: Optional[Union[str, Path]] = None,
|
|
) -> PIIBlurResult:
|
|
"""Helper fonctionnel : instancie un PIIBlurrer singleton et l'applique."""
|
|
global _default_blurrer
|
|
if _default_blurrer is None:
|
|
_default_blurrer = PIIBlurrer()
|
|
return _default_blurrer.blur_image(input_path, output_path)
|
|
|
|
|
|
# =============================================================================
|
|
# Helpers internes
|
|
# =============================================================================
|
|
|
|
def _copy_file(src: Path, dst: Path) -> None:
|
|
"""Copie bytewise (utilisé quand aucun PII n'est détecté / OCR KO)."""
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(src, "rb") as f_in, open(dst, "wb") as f_out:
|
|
f_out.write(f_in.read())
|
|
|
|
|
|
def _build_text_with_map(words: Sequence[dict]) -> Tuple[str, List[int]]:
|
|
"""Concatène les mots en texte + mappe chaque caractère vers son index de mot.
|
|
|
|
Quand deux mots consécutifs appartiennent à des lignes différentes (champ
|
|
`line` dans le dict), on insère un `\n` au lieu d'un espace. Cela empêche
|
|
les regex gloutons (PERSON, LOCATION…) de matcher à travers des lignes
|
|
logiques, qui sont typiquement des champs distincts dans une UI métier.
|
|
|
|
Returns:
|
|
text : str concaténé (mots séparés par un espace ou un \n)
|
|
char_to_word : list[int] len == len(text), char_to_word[i] = index du mot
|
|
(ou -1 pour les séparateurs).
|
|
"""
|
|
parts: List[str] = []
|
|
char_to_word: List[int] = []
|
|
prev_line: Optional[int] = None
|
|
for i, w in enumerate(words):
|
|
cur_line = w.get("line")
|
|
if i > 0:
|
|
if prev_line is not None and cur_line is not None and cur_line != prev_line:
|
|
sep = "\n"
|
|
else:
|
|
sep = " "
|
|
parts.append(sep)
|
|
char_to_word.append(-1)
|
|
txt = w["text"]
|
|
parts.append(txt)
|
|
char_to_word.extend([i] * len(txt))
|
|
prev_line = cur_line
|
|
return "".join(parts), char_to_word
|
|
|
|
|
|
def _spans_to_bbox(
|
|
start: int,
|
|
end: int,
|
|
char_to_word: Sequence[int],
|
|
words: Sequence[dict],
|
|
padding: int,
|
|
image_w: int,
|
|
image_h: int,
|
|
) -> Optional[Tuple[int, int, int, int]]:
|
|
"""Convertit une plage [start, end[ dans le texte en bbox englobant les mots."""
|
|
if end <= start or start >= len(char_to_word):
|
|
return None
|
|
word_ids = set()
|
|
for i in range(start, min(end, len(char_to_word))):
|
|
wid = char_to_word[i]
|
|
if wid >= 0:
|
|
word_ids.add(wid)
|
|
if not word_ids:
|
|
return None
|
|
xs1, ys1, xs2, ys2 = [], [], [], []
|
|
for wid in word_ids:
|
|
w = words[wid]
|
|
xs1.append(w["x1"]); ys1.append(w["y1"])
|
|
xs2.append(w["x2"]); ys2.append(w["y2"])
|
|
x1 = max(0, min(xs1) - padding)
|
|
y1 = max(0, min(ys1) - padding)
|
|
x2 = min(image_w, max(xs2) + padding)
|
|
y2 = min(image_h, max(ys2) + padding)
|
|
if x2 <= x1 or y2 <= y1:
|
|
return None
|
|
return (x1, y1, x2, y2)
|
|
|
|
|
|
def _merge_spans(
|
|
spans: Sequence[Tuple[str, int, int]],
|
|
) -> List[Tuple[str, int, int]]:
|
|
"""Déduplique et fusionne les plages qui se chevauchent sur un même label.
|
|
|
|
En cas de conflit inter-labels, on garde celui qui couvre le plus large.
|
|
"""
|
|
if not spans:
|
|
return []
|
|
# Trier par start puis par -width (le plus long d'abord pour les ties)
|
|
sorted_spans = sorted(spans, key=lambda s: (s[1], -(s[2] - s[1])))
|
|
merged: List[Tuple[str, int, int]] = []
|
|
for label, s, e in sorted_spans:
|
|
if not merged:
|
|
merged.append((label, s, e))
|
|
continue
|
|
last_label, ls, le = merged[-1]
|
|
if s < le: # chevauchement
|
|
# On garde l'étendue fusionnée avec le label du plus large
|
|
new_start = min(ls, s)
|
|
new_end = max(le, e)
|
|
new_label = last_label if (le - ls) >= (e - s) else label
|
|
merged[-1] = (new_label, new_start, new_end)
|
|
else:
|
|
merged.append((label, s, e))
|
|
return merged
|
|
|
|
|
|
def _apply_blur(
|
|
src: Path,
|
|
dst: Path,
|
|
entities: Sequence[PIIEntity],
|
|
kernel: Tuple[int, int],
|
|
sigma: float,
|
|
) -> None:
|
|
"""Applique un flou gaussien sur les bbox des entités et écrit l'image."""
|
|
from PIL import Image
|
|
|
|
with Image.open(src) as img:
|
|
if img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
|
|
if not entities:
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
img.save(dst, format="PNG", optimize=True)
|
|
return
|
|
|
|
# On privilégie OpenCV s'il est disponible (plus rapide),
|
|
# sinon on utilise PIL ImageFilter.GaussianBlur.
|
|
try:
|
|
import cv2 # type: ignore
|
|
import numpy as np # type: ignore
|
|
arr = np.array(img)
|
|
bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)
|
|
for ent in entities:
|
|
x1, y1, x2, y2 = ent.bbox
|
|
if x2 <= x1 or y2 <= y1:
|
|
continue
|
|
roi = bgr[y1:y2, x1:x2]
|
|
if roi.size == 0:
|
|
continue
|
|
k = (max(3, kernel[0] | 1), max(3, kernel[1] | 1)) # impair
|
|
bgr[y1:y2, x1:x2] = cv2.GaussianBlur(roi, k, sigma)
|
|
out = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
|
|
img = Image.fromarray(out)
|
|
except ImportError:
|
|
from PIL import ImageFilter
|
|
radius = max(sigma / 2, 4.0)
|
|
for ent in entities:
|
|
x1, y1, x2, y2 = ent.bbox
|
|
region = img.crop((x1, y1, x2, y2))
|
|
if region.size[0] == 0 or region.size[1] == 0:
|
|
continue
|
|
blurred = region.filter(ImageFilter.GaussianBlur(radius=radius))
|
|
img.paste(blurred, (x1, y1))
|
|
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
img.save(dst, format="PNG", optimize=True)
|