diff --git a/.env.example b/.env.example index 8c596a7c6..b1f74f606 100644 --- a/.env.example +++ b/.env.example @@ -30,7 +30,9 @@ DASHBOARD_PORT=5001 CLIP_MODEL=ViT-B-32 CLIP_PRETRAINED=openai CLIP_DEVICE=cpu # cpu or cuda -VLM_MODEL=qwen3-vl:8b +RPA_VLM_MODEL=gemma4:latest # gemma4:latest (défaut), qwen3-vl:8b, ui-tars (fallback) +VLM_MODEL=gemma4:latest # alias de compatibilité +# VLM_ALLOW_CLOUD=false # true pour activer les APIs cloud en fallback (OpenAI, Gemini, Anthropic) VLM_ENDPOINT=http://localhost:11434 OWL_MODEL=google/owlv2-base-patch16-ensemble OWL_CONFIDENCE_THRESHOLD=0.1 diff --git a/core/anonymisation/__init__.py b/core/anonymisation/__init__.py new file mode 100644 index 000000000..990bb594f --- /dev/null +++ b/core/anonymisation/__init__.py @@ -0,0 +1,31 @@ +# core/anonymisation/__init__.py +"""Module de floutage ciblé des PII côté serveur. + +Remplace l'ancien blur client-side (`agent_v0/agent_v1/vision/blur_sensitive.py`) +qui floutait toutes les zones de texte claires, cassant les codes CIM, les +montants PMSI et les boutons. + +Stratégie : + 1. OCR (docTR) sur le screenshot → texte + bounding boxes + 2. NER (EDS-NLP si disponible, sinon regex) → détection des PII + 3. Filtrage : ne conserver que PERSON / LOCATION / PHONE / NIR / EMAIL + 4. Blur gaussien uniquement sur les bbox des PII filtrées + +Usage : + from core.anonymisation import blur_pii_on_image + blurred_path = blur_pii_on_image("shot_0001_full.png") +""" + +from .pii_blur import ( + PIIBlurResult, + PIIEntity, + PIIBlurrer, + blur_pii_on_image, +) + +__all__ = [ + "PIIBlurResult", + "PIIEntity", + "PIIBlurrer", + "blur_pii_on_image", +] diff --git a/core/anonymisation/pii_blur.py b/core/anonymisation/pii_blur.py new file mode 100644 index 000000000..af2ec152f --- /dev/null +++ b/core/anonymisation/pii_blur.py @@ -0,0 +1,650 @@ +# core/anonymisation/pii_blur.py +"""Floutage ciblé des PII côté serveur (Personal Identifiable Information). + +Contexte +-------- +L'ancien blur côté client (`agent_v0/agent_v1/vision/blur_sensitive.py`) était +trop agressif : il floutait TOUTES les zones blanches avec texte, ce qui +détruisait les codes CIM-10, les montants PMSI, les boutons et rendait les +screenshots inutilisables pour le replay ou le grounding VLM. De plus, +`opencv-python` n'était pas listé dans les dépendances de l'agent, donc le blur +échouait silencieusement en production. + +Stratégie retenue (avril 2026) +------------------------------ +1. Agent = zéro blur → envoie les screenshots bruts via TLS. +2. Serveur = OCR (docTR) + NER (EDS-NLP avec fallback regex). +3. On floute UNIQUEMENT les entités : + - PERSON → noms, prénoms + - LOCATION → adresses, villes + - PHONE → numéros de téléphone + - NIR → numéro de sécurité sociale + - EMAIL → adresses électroniques + Et on préserve : + - codes CIM-10 / CCAM + - montants (1250€, 31,50 €) + - dates (pas PII au sens RGPD santé) + - identifiants techniques (shot_0001, session IDs…) +4. Deux fichiers sont stockés : + - `shot_XXXX_full.png` → version brute (accès restreint) + - `shot_XXXX_full_blurred.png` → version pour affichage + +Performance +----------- +Objectif : < 2 s par screenshot sur RTX 5070. +docTR (db_mobilenet_v3_large + crnn_mobilenet_v3_large) : ~800 ms CPU, ~300 ms GPU. +EDS-NLP pipeline minimal : ~100 ms pour un texte d'écran typique. +Fallback regex : < 10 ms. +""" + +from __future__ import annotations + +import logging +import os +import re +import tempfile +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Iterable, List, Optional, Sequence, Tuple, Union + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Types +# ============================================================================= + +# Type d'entité PII reconnu. Aligné sur les labels EDS-NLP (`nlp.pipes.eds`) +# et enrichi par nos propres patterns regex. +PII_LABELS = frozenset({ + "PERSON", # noms de patient, médecin + "LOCATION", # adresses, ville, code postal + "ADDRESS", # alias de LOCATION (certains pipelines le produisent) + "PHONE", # téléphone + "NIR", # numéro sécu FR (15 chiffres) + "SECURITY_NUMBER", # alias de NIR + "EMAIL", # adresse email +}) + +# Motifs qu'on NE DOIT PAS flouter même s'ils ressemblent à des PII : +# - codes CIM-10 : 1 lettre + 2 chiffres + optionnellement .xx +# - codes CCAM : 4 lettres + 3 chiffres +# - montants (€, euros) +# - dates format fr (dd/mm/yyyy, dd-mm-yy) +# - identifiants techniques (ex: shot_0001, session_xxxxx) +_RE_ICD10 = re.compile(r"\b[A-Z]\d{2}(\.\d{1,3})?\b") +_RE_CCAM = re.compile(r"\b[A-Z]{4}\d{3}\b") +_RE_MONEY = re.compile(r"\b\d{1,3}(?:[.,\s]\d{3})*(?:[.,]\d{1,2})?\s?€\b", re.IGNORECASE) +_RE_DATE = re.compile(r"\b(0?[1-9]|[12]\d|3[01])[/.-](0?[1-9]|1[0-2])[/.-](\d{2}|\d{4})\b") +_RE_TECH_ID = re.compile(r"\b(?:shot|session|sess|frame|trace|req|msg)_[\w-]+\b", re.IGNORECASE) + + +# ============================================================================= +# Entités PII +# ============================================================================= + +@dataclass(frozen=True) +class PIIEntity: + """Une entité PII détectée dans un screenshot.""" + label: str # PERSON, LOCATION, PHONE, NIR, EMAIL + text: str # Texte brut détecté + bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) en pixels + confidence: float = 1.0 # Score NER (1.0 si regex) + source: str = "ner" # "ner" (EDS-NLP) ou "regex" + + +@dataclass +class PIIBlurResult: + """Résultat du pipeline de blur.""" + raw_path: Path + blurred_path: Path + entities: List[PIIEntity] = field(default_factory=list) + elapsed_ms: float = 0.0 + ocr_ms: float = 0.0 + ner_ms: float = 0.0 + blur_ms: float = 0.0 + ocr_engine: str = "doctr" + ner_engine: str = "regex" # ou "edsnlp" + + @property + def count(self) -> int: + return len(self.entities) + + +# ============================================================================= +# Fallback NER par regex (utilisé si EDS-NLP indisponible) +# ============================================================================= + +# Précaution : on ne marque comme PHONE que des suites contiguës de 10 chiffres +# (FR) ou un format international. Les codes à 3-4 chiffres sont ignorés. +_RE_PHONE = re.compile( + r"\b(?:(?:\+?33|0)\s?[1-9])(?:[\s.-]?\d{2}){4}\b" +) +_RE_NIR = re.compile( + r"\b[12]\s?\d{2}\s?(?:0[1-9]|1[0-2]|20)\s?(?:\d{2}|2A|2B)\s?\d{3}\s?\d{3}(?:\s?\d{2})?\b" +) +_RE_EMAIL = re.compile( + r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE +) +# Nom : Prénom Nom (au moins 2 majuscules initiales). Attrape aussi +# "Mme Dupont", "M. Martin", "Dr. Bernard". +# On utilise [^\S\n] (whitespace SANS newline) pour empêcher le match de sauter +# de ligne — les lignes sont typiquement des champs distincts dans une UI métier. +_RE_PERSON = re.compile( + r"\b(?:M\.?|Mme|Mlle|Dr\.?|Pr\.?|Prof\.?)[^\S\n]+" + r"[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+" + r"(?:[^\S\n]+[A-ZÉÈÀÂÎÔÛÇ][a-zéèàâîôûç\-]+)?" +) +# Adresse : "12 rue de la Paix", "3, avenue Victor Hugo" +# Même principe : on empêche le matching de franchir les sauts de ligne. +_RE_ADDRESS = re.compile( + r"\b\d{1,4}(?:[^\S\n]?(?:bis|ter|quater))?[,\s]+(?:rue|avenue|av\.?|bd|boulevard|" + r"allée|all\.?|place|impasse|chemin|route|rte\.?|quai|cours|voie|passage)" + r"[^\S\n]+(?:de[^\S\n]+|du[^\S\n]+|des[^\S\n]+|la[^\S\n]+|le[^\S\n]+|les[^\S\n]+|l'|de[^\S\n]+la[^\S\n]+|d')?" + r"[A-Za-zÀ-ÿ\-' ]{2,40}", + re.IGNORECASE, +) + + +def _regex_find_pii(text: str) -> List[Tuple[str, int, int]]: + """Retourne une liste de (label, offset_debut, offset_fin) par regex. + + Les motifs "techniques" (codes CIM, montants, dates) sont explicitement + exclus même si un autre regex les attrape. + """ + # 1. Collecter toutes les plages à NE PAS flouter + protected: List[Tuple[int, int]] = [] + for rx in (_RE_ICD10, _RE_CCAM, _RE_MONEY, _RE_DATE, _RE_TECH_ID): + for m in rx.finditer(text): + protected.append(m.span()) + + def _is_protected(start: int, end: int) -> bool: + for p_start, p_end in protected: + # recouvrement non nul + if start < p_end and end > p_start: + return True + return False + + hits: List[Tuple[str, int, int]] = [] + for label, rx in ( + ("NIR", _RE_NIR), + ("EMAIL", _RE_EMAIL), + ("PHONE", _RE_PHONE), + ("PERSON", _RE_PERSON), + ("LOCATION", _RE_ADDRESS), + ): + for m in rx.finditer(text): + if _is_protected(m.start(), m.end()): + continue + hits.append((label, m.start(), m.end())) + return hits + + +# ============================================================================= +# NER via EDS-NLP (optionnel) +# ============================================================================= + +_edsnlp_pipeline = None + + +def _get_edsnlp_pipeline(): + """Charge une pipeline EDS-NLP si le module est disponible. + + Retourne None si EDS-NLP n'est pas installé — le pipeline retombera + alors sur le NER regex. + """ + global _edsnlp_pipeline + if _edsnlp_pipeline is not None: + return _edsnlp_pipeline + try: + import edsnlp # type: ignore + except ImportError: + logger.info( + "EDS-NLP non installé — fallback regex utilisé pour la détection PII. " + "Pour activer EDS-NLP : pip install edsnlp" + ) + return None + + try: + nlp = edsnlp.blank("eds") + nlp.add_pipe("eds.sentences") + nlp.add_pipe("eds.normalizer") + # Les composants disponibles dépendent de la version installée. + # On les ajoute en try/except pour rester résilient. + for pipe_name in ("eds.names", "eds.dates", "eds.addresses"): + try: + nlp.add_pipe(pipe_name) + except Exception as e: # noqa: BLE001 + logger.debug("EDS-NLP : composant %s indisponible (%s)", pipe_name, e) + _edsnlp_pipeline = nlp + logger.info("EDS-NLP : pipeline chargée") + return _edsnlp_pipeline + except Exception as e: # noqa: BLE001 + logger.warning("EDS-NLP non utilisable (%s) — fallback regex", e) + return None + + +def _edsnlp_find_pii(text: str, nlp) -> List[Tuple[str, int, int]]: + """Utilise EDS-NLP pour trouver des entités PII. + + Les labels EDS-NLP sont mappés vers nos labels canoniques. + """ + try: + doc = nlp(text) + except Exception as e: # noqa: BLE001 + logger.debug("EDS-NLP : échec sur texte de %d chars (%s)", len(text), e) + return [] + + mapping = { + "person": "PERSON", + "name": "PERSON", + "patient": "PERSON", + "doctor": "PERSON", + "location": "LOCATION", + "address": "LOCATION", + "city": "LOCATION", + } + hits: List[Tuple[str, int, int]] = [] + for ent in getattr(doc, "ents", []): + raw_label = str(getattr(ent, "label_", "")).lower() + mapped = mapping.get(raw_label) + if mapped is None: + # On accepte aussi si le label EDS-NLP est déjà l'un de nos labels + upper = raw_label.upper() + if upper in PII_LABELS: + mapped = upper + if mapped: + hits.append((mapped, ent.start_char, ent.end_char)) + return hits + + +# ============================================================================= +# OCR avec bounding boxes par mot (docTR) +# ============================================================================= + +_ocr_predictor = None + + +def _get_ocr_predictor(): + """Charge un prédicteur docTR léger (mobilenet) pour l'OCR rapide.""" + global _ocr_predictor + if _ocr_predictor is not None: + return _ocr_predictor + from doctr.models import ocr_predictor # type: ignore + _ocr_predictor = ocr_predictor( + det_arch="db_mobilenet_v3_large", + reco_arch="crnn_mobilenet_v3_large", + pretrained=True, + ) + # GPU si disponible + try: + import torch # type: ignore + if torch.cuda.is_available(): + _ocr_predictor = _ocr_predictor.cuda() + logger.info("pii_blur : docTR chargé sur CUDA") + else: + logger.info("pii_blur : docTR chargé sur CPU") + except Exception: # noqa: BLE001 + logger.info("pii_blur : docTR chargé (device indéterminé)") + return _ocr_predictor + + +def _doctr_ocr(image_path: Path) -> Tuple[List[dict], int, int]: + """Exécute docTR et retourne une liste de mots avec leurs bbox pixel. + + Retour : (words, width, height) où words = [{text, x1, y1, x2, y2}, ...] + """ + from doctr.io import DocumentFile # type: ignore + from PIL import Image + + predictor = _get_ocr_predictor() + doc = DocumentFile.from_images([str(image_path)]) + result = predictor(doc) + + # Les coords sont normalisées (0..1). On les remappe vers la taille réelle. + with Image.open(image_path) as img: + W, H = img.size + + words: List[dict] = [] + line_counter = 0 + for page in result.pages: + for block in page.blocks: + for line in block.lines: + for word in line.words: + text = word.value + if not text or not text.strip(): + continue + (nx1, ny1), (nx2, ny2) = word.geometry + x1 = max(0, int(nx1 * W)) + y1 = max(0, int(ny1 * H)) + x2 = min(W, int(nx2 * W)) + y2 = min(H, int(ny2 * H)) + words.append({ + "text": text, + "x1": x1, "y1": y1, "x2": x2, "y2": y2, + "line": line_counter, + }) + line_counter += 1 + return words, W, H + + +# ============================================================================= +# Pipeline principal +# ============================================================================= + +class PIIBlurrer: + """Pipeline réutilisable (garde les modèles en mémoire entre appels). + + Exemple : + blurrer = PIIBlurrer() + res = blurrer.blur_image("shot_0001_full.png") + print(res.count, res.elapsed_ms) + """ + + def __init__( + self, + blur_kernel: Tuple[int, int] = (31, 31), + blur_sigma: float = 15.0, + bbox_padding: int = 2, + use_edsnlp: bool = True, + ) -> None: + self._blur_kernel = blur_kernel + self._blur_sigma = blur_sigma + self._bbox_padding = bbox_padding + self._use_edsnlp = use_edsnlp + + # ------------------------------------------------------------------ + # Point d'entrée publique + # ------------------------------------------------------------------ + def blur_image( + self, + input_path: Union[str, Path], + output_path: Optional[Union[str, Path]] = None, + ) -> PIIBlurResult: + """Floute les PII détectées et écrit la version floutée sur disque. + + Args: + input_path: Chemin vers le screenshot brut (PNG/JPG). + output_path: Chemin de sortie. Défaut : + `_blurred.png` à côté de l'input. + + Returns: + PIIBlurResult avec les timings et la liste des entités détectées. + """ + input_path = Path(input_path) + if not input_path.is_file(): + raise FileNotFoundError(f"Screenshot introuvable : {input_path}") + + if output_path is None: + output_path = input_path.with_name( + f"{input_path.stem}_blurred{input_path.suffix or '.png'}" + ) + else: + output_path = Path(output_path) + + t_start = time.perf_counter() + + # 1. OCR + t_ocr = time.perf_counter() + try: + words, W, H = _doctr_ocr(input_path) + except Exception as e: # noqa: BLE001 + logger.warning("pii_blur : OCR docTR échoué (%s) — pas de blur appliqué", e) + # On copie simplement l'original vers la version "blurred" + _copy_file(input_path, output_path) + return PIIBlurResult( + raw_path=input_path, + blurred_path=output_path, + entities=[], + elapsed_ms=(time.perf_counter() - t_start) * 1000, + ) + ocr_ms = (time.perf_counter() - t_ocr) * 1000 + + if not words: + _copy_file(input_path, output_path) + return PIIBlurResult( + raw_path=input_path, + blurred_path=output_path, + entities=[], + elapsed_ms=(time.perf_counter() - t_start) * 1000, + ocr_ms=ocr_ms, + ) + + # 2. Reconstituer le texte ligne par ligne en conservant la correspondance + # (offset_char → mot) pour pouvoir repérer les bbox des entités. + text, char_to_word = _build_text_with_map(words) + + # 3. NER : EDS-NLP si dispo, sinon regex + t_ner = time.perf_counter() + ner_engine = "regex" + entities_spans: List[Tuple[str, int, int]] = [] + if self._use_edsnlp: + nlp = _get_edsnlp_pipeline() + if nlp is not None: + entities_spans = _edsnlp_find_pii(text, nlp) + ner_engine = "edsnlp" + # Toujours compléter avec le regex (EDS-NLP ne couvre pas tous les PII + # fréquents : email, NIR, téléphone français). + entities_spans.extend(_regex_find_pii(text)) + ner_ms = (time.perf_counter() - t_ner) * 1000 + + # Dédupliquer et normaliser + entities_spans = _merge_spans(entities_spans) + + # 4. Convertir (label, start, end) → PIIEntity(label, text, bbox pixel) + pii_entities: List[PIIEntity] = [] + for label, start, end in entities_spans: + if label not in PII_LABELS: + continue + bbox = _spans_to_bbox(start, end, char_to_word, words, self._bbox_padding, W, H) + if bbox is None: + continue + pii_entities.append(PIIEntity( + label=label, + text=text[start:end], + bbox=bbox, + confidence=1.0, + source=("ner" if ner_engine == "edsnlp" else "regex"), + )) + + # 5. Appliquer le blur gaussien sur les bbox + t_blur = time.perf_counter() + _apply_blur(input_path, output_path, pii_entities, + kernel=self._blur_kernel, sigma=self._blur_sigma) + blur_ms = (time.perf_counter() - t_blur) * 1000 + + elapsed_ms = (time.perf_counter() - t_start) * 1000 + if pii_entities: + logger.info( + "pii_blur : %d PII floutés sur %s (%.0fms : ocr=%.0f ner=%.0f blur=%.0f, ner=%s)", + len(pii_entities), input_path.name, elapsed_ms, + ocr_ms, ner_ms, blur_ms, ner_engine, + ) + else: + logger.debug( + "pii_blur : aucune PII détectée dans %s (%.0fms)", + input_path.name, elapsed_ms, + ) + + return PIIBlurResult( + raw_path=input_path, + blurred_path=output_path, + entities=pii_entities, + elapsed_ms=elapsed_ms, + ocr_ms=ocr_ms, + ner_ms=ner_ms, + blur_ms=blur_ms, + ner_engine=ner_engine, + ) + + +# Instance singleton (lazy) +_default_blurrer: Optional[PIIBlurrer] = None + + +def blur_pii_on_image( + input_path: Union[str, Path], + output_path: Optional[Union[str, Path]] = None, +) -> PIIBlurResult: + """Helper fonctionnel : instancie un PIIBlurrer singleton et l'applique.""" + global _default_blurrer + if _default_blurrer is None: + _default_blurrer = PIIBlurrer() + return _default_blurrer.blur_image(input_path, output_path) + + +# ============================================================================= +# Helpers internes +# ============================================================================= + +def _copy_file(src: Path, dst: Path) -> None: + """Copie bytewise (utilisé quand aucun PII n'est détecté / OCR KO).""" + dst.parent.mkdir(parents=True, exist_ok=True) + with open(src, "rb") as f_in, open(dst, "wb") as f_out: + f_out.write(f_in.read()) + + +def _build_text_with_map(words: Sequence[dict]) -> Tuple[str, List[int]]: + """Concatène les mots en texte + mappe chaque caractère vers son index de mot. + + Quand deux mots consécutifs appartiennent à des lignes différentes (champ + `line` dans le dict), on insère un `\n` au lieu d'un espace. Cela empêche + les regex gloutons (PERSON, LOCATION…) de matcher à travers des lignes + logiques, qui sont typiquement des champs distincts dans une UI métier. + + Returns: + text : str concaténé (mots séparés par un espace ou un \n) + char_to_word : list[int] len == len(text), char_to_word[i] = index du mot + (ou -1 pour les séparateurs). + """ + parts: List[str] = [] + char_to_word: List[int] = [] + prev_line: Optional[int] = None + for i, w in enumerate(words): + cur_line = w.get("line") + if i > 0: + if prev_line is not None and cur_line is not None and cur_line != prev_line: + sep = "\n" + else: + sep = " " + parts.append(sep) + char_to_word.append(-1) + txt = w["text"] + parts.append(txt) + char_to_word.extend([i] * len(txt)) + prev_line = cur_line + return "".join(parts), char_to_word + + +def _spans_to_bbox( + start: int, + end: int, + char_to_word: Sequence[int], + words: Sequence[dict], + padding: int, + image_w: int, + image_h: int, +) -> Optional[Tuple[int, int, int, int]]: + """Convertit une plage [start, end[ dans le texte en bbox englobant les mots.""" + if end <= start or start >= len(char_to_word): + return None + word_ids = set() + for i in range(start, min(end, len(char_to_word))): + wid = char_to_word[i] + if wid >= 0: + word_ids.add(wid) + if not word_ids: + return None + xs1, ys1, xs2, ys2 = [], [], [], [] + for wid in word_ids: + w = words[wid] + xs1.append(w["x1"]); ys1.append(w["y1"]) + xs2.append(w["x2"]); ys2.append(w["y2"]) + x1 = max(0, min(xs1) - padding) + y1 = max(0, min(ys1) - padding) + x2 = min(image_w, max(xs2) + padding) + y2 = min(image_h, max(ys2) + padding) + if x2 <= x1 or y2 <= y1: + return None + return (x1, y1, x2, y2) + + +def _merge_spans( + spans: Sequence[Tuple[str, int, int]], +) -> List[Tuple[str, int, int]]: + """Déduplique et fusionne les plages qui se chevauchent sur un même label. + + En cas de conflit inter-labels, on garde celui qui couvre le plus large. + """ + if not spans: + return [] + # Trier par start puis par -width (le plus long d'abord pour les ties) + sorted_spans = sorted(spans, key=lambda s: (s[1], -(s[2] - s[1]))) + merged: List[Tuple[str, int, int]] = [] + for label, s, e in sorted_spans: + if not merged: + merged.append((label, s, e)) + continue + last_label, ls, le = merged[-1] + if s < le: # chevauchement + # On garde l'étendue fusionnée avec le label du plus large + new_start = min(ls, s) + new_end = max(le, e) + new_label = last_label if (le - ls) >= (e - s) else label + merged[-1] = (new_label, new_start, new_end) + else: + merged.append((label, s, e)) + return merged + + +def _apply_blur( + src: Path, + dst: Path, + entities: Sequence[PIIEntity], + kernel: Tuple[int, int], + sigma: float, +) -> None: + """Applique un flou gaussien sur les bbox des entités et écrit l'image.""" + from PIL import Image + + with Image.open(src) as img: + if img.mode != "RGB": + img = img.convert("RGB") + + if not entities: + dst.parent.mkdir(parents=True, exist_ok=True) + img.save(dst, format="PNG", optimize=True) + return + + # On privilégie OpenCV s'il est disponible (plus rapide), + # sinon on utilise PIL ImageFilter.GaussianBlur. + try: + import cv2 # type: ignore + import numpy as np # type: ignore + arr = np.array(img) + bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) + for ent in entities: + x1, y1, x2, y2 = ent.bbox + if x2 <= x1 or y2 <= y1: + continue + roi = bgr[y1:y2, x1:x2] + if roi.size == 0: + continue + k = (max(3, kernel[0] | 1), max(3, kernel[1] | 1)) # impair + bgr[y1:y2, x1:x2] = cv2.GaussianBlur(roi, k, sigma) + out = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) + img = Image.fromarray(out) + except ImportError: + from PIL import ImageFilter + radius = max(sigma / 2, 4.0) + for ent in entities: + x1, y1, x2, y2 = ent.bbox + region = img.crop((x1, y1, x2, y2)) + if region.size[0] == 0 or region.size[1] == 0: + continue + blurred = region.filter(ImageFilter.GaussianBlur(radius=radius)) + img.paste(blurred, (x1, y1)) + + dst.parent.mkdir(parents=True, exist_ok=True) + img.save(dst, format="PNG", optimize=True) diff --git a/core/config.py b/core/config.py index 56d796697..226a1a20c 100644 --- a/core/config.py +++ b/core/config.py @@ -68,11 +68,11 @@ class SystemConfig: clip_model: str = "ViT-B-32" clip_pretrained: str = "openai" clip_device: str = "cpu" - vlm_model: str = "qwen3-vl:8b" + vlm_model: str = "gemma4:latest" vlm_endpoint: str = "http://localhost:11434" owl_model: str = "google/owlv2-base-patch16-ensemble" owl_confidence_threshold: float = 0.1 - + # FAISS faiss_dimensions: int = 512 faiss_index_type: str = "Flat" @@ -211,7 +211,7 @@ class ConfigurationManager: clip_model=os.getenv("CLIP_MODEL", "ViT-B-32"), clip_pretrained=os.getenv("CLIP_PRETRAINED", "openai"), clip_device=os.getenv("CLIP_DEVICE", "cpu"), - vlm_model=os.getenv("VLM_MODEL", "qwen3-vl:8b"), + vlm_model=os.getenv("RPA_VLM_MODEL", os.getenv("VLM_MODEL", "gemma4:latest")), vlm_endpoint=os.getenv("VLM_ENDPOINT", "http://localhost:11434"), owl_model=os.getenv("OWL_MODEL", "google/owlv2-base-patch16-ensemble"), owl_confidence_threshold=float(os.getenv("OWL_CONFIDENCE_THRESHOLD", "0.1")), @@ -435,7 +435,7 @@ class ModelConfig: clip_model: str = "ViT-B-32" clip_pretrained: str = "openai" clip_device: str = "cpu" - vlm_model: str = "qwen3-vl:8b" + vlm_model: str = "gemma4:latest" vlm_endpoint: str = "http://localhost:11434" owl_model: str = "google/owlv2-base-patch16-ensemble" owl_confidence_threshold: float = 0.1 @@ -510,7 +510,7 @@ class FAISSConfig: class GPUResourceConfig: """Configuration for GPU resource management - DEPRECATED: Use SystemConfig instead""" ollama_endpoint: str = "http://localhost:11434" - vlm_model: str = "qwen3-vl:8b" + vlm_model: str = "gemma4:latest" clip_model: str = "ViT-B-32" idle_timeout_seconds: int = 300 vram_threshold_for_clip_gpu_mb: int = 1024 @@ -599,7 +599,7 @@ UPLOADS_PATH=data/training/uploads CLIP_MODEL=ViT-B-32 CLIP_PRETRAINED=openai CLIP_DEVICE=cpu -VLM_MODEL=qwen3-vl:8b +VLM_MODEL=gemma4:latest VLM_ENDPOINT=http://localhost:11434 OWL_MODEL=google/owlv2-base-patch16-ensemble OWL_CONFIDENCE_THRESHOLD=0.1 diff --git a/core/detection/ollama_client.py b/core/detection/ollama_client.py index 4eaab7171..73dcafe08 100644 --- a/core/detection/ollama_client.py +++ b/core/detection/ollama_client.py @@ -23,9 +23,9 @@ class OllamaClient: Permet d'envoyer des images et prompts à un VLM via l'API Ollama. """ - def __init__(self, + def __init__(self, endpoint: str = "http://localhost:11434", - model: str = "qwen3-vl:8b", + model: str = None, timeout: int = 180): """ Initialiser le client Ollama @@ -36,7 +36,12 @@ class OllamaClient: timeout: Timeout en secondes """ self.endpoint = endpoint.rstrip('/') - self.model = model + # Résolution du modèle : paramètre explicite > config centralisée + if model is not None: + self.model = model + else: + from core.detection.vlm_config import get_vlm_model + self.model = get_vlm_model(endpoint=self.endpoint) self.timeout = timeout self._check_connection() @@ -126,7 +131,12 @@ class OllamaClient: messages.append(user_message) # Déterminer si le modèle est un modèle thinking (qwen3) - is_thinking_model = "qwen3" in self.model.lower() + # Les modèles non-thinking (gemma4, qwen2.5vl) n'ont pas besoin + # du workaround prefill et supportent le rôle system natif. + from core.detection.vlm_config import is_thinking_model as _is_thinking + from core.detection.vlm_config import needs_think_false as _needs_think_false + is_thinking_model = _is_thinking(self.model) + requires_think_false = _needs_think_false(self.model) # WORKAROUND Ollama 0.18.x : think=false est ignoré par le # renderer qwen3-vl-thinking. On utilise un assistant prefill @@ -168,9 +178,9 @@ class OllamaClient: } } - # Garder think=false au cas où une future version d'Ollama le - # corrige — le prefill reste le mécanisme principal - if is_thinking_model: + # think=false : requis pour qwen3 (prefill reste le mécanisme + # principal) ET pour gemma4 (sinon tokens vides sur Ollama >=0.20) + if is_thinking_model or requires_think_false: payload["think"] = False if force_json: @@ -575,7 +585,7 @@ Your answer:""" # Fonctions utilitaires # ============================================================================ -def create_ollama_client(model: str = "qwen3-vl:8b", +def create_ollama_client(model: str = None, endpoint: str = "http://localhost:11434") -> OllamaClient: """ Créer un client Ollama diff --git a/core/detection/ui_detector.py b/core/detection/ui_detector.py index adda88a75..a90685d63 100644 --- a/core/detection/ui_detector.py +++ b/core/detection/ui_detector.py @@ -72,9 +72,9 @@ class BoundingBox: class DetectionConfig: """Configuration de la détection UI hybride""" # VLM — modèle configurable via variable d'environnement RPA_VLM_MODEL - # Production (local) : "qwen3-vl:8b" — GPU local, pas de réseau - # Tests (cloud) : "qwen3-vl:235b-cloud" — pas de GPU, plus lent mais libère la VRAM - vlm_model: str = os.environ.get("RPA_VLM_MODEL", "qwen3-vl:8b") + # Par défaut : gemma4:e4b (meilleur grounding + contextualisation) + # Fallback : qwen3-vl:8b si gemma4 non disponible + vlm_model: str = os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b")) vlm_endpoint: str = "http://localhost:11434" use_vlm_classification: bool = True # Utiliser VLM pour classifier @@ -865,21 +865,24 @@ JSON array: [{{"id":0,"type":"...","role":"...","text":"..."}}]""" # ============================================================================ def create_detector( - vlm_model: str = "qwen3-vl:8b", + vlm_model: str = None, confidence_threshold: float = 0.7, use_vlm: bool = True ) -> UIDetector: """ Créer un détecteur avec configuration personnalisée - + Args: - vlm_model: Modèle VLM à utiliser + vlm_model: Modèle VLM à utiliser (None = résolution automatique via vlm_config) confidence_threshold: Seuil de confiance use_vlm: Utiliser le VLM pour la classification - + Returns: UIDetector configuré """ + if vlm_model is None: + from core.detection.vlm_config import get_vlm_model + vlm_model = get_vlm_model() config = DetectionConfig( vlm_model=vlm_model, confidence_threshold=confidence_threshold, diff --git a/core/detection/vlm_config.py b/core/detection/vlm_config.py new file mode 100644 index 000000000..2080a3cbb --- /dev/null +++ b/core/detection/vlm_config.py @@ -0,0 +1,194 @@ +""" +Configuration centralisée du modèle VLM (Vision-Language Model). + +Point unique de configuration pour le modèle VLM utilisé dans tout le pipeline. +Gère la variable d'environnement RPA_VLM_MODEL avec fallback automatique +si le modèle configuré n'est pas disponible dans Ollama. + +Ordre de résolution du modèle : + 1. Variable d'env RPA_VLM_MODEL (prioritaire) + 2. Variable d'env VLM_MODEL (compatibilité) + 3. Modèle par défaut : gemma4:latest + +Fallback automatique : + Si le modèle choisi n'est pas trouvé dans Ollama, on essaie les + modèles de fallback dans l'ordre (FALLBACK_VLM_MODELS). +""" + +import logging +import os +from typing import List, Optional + +import requests + +logger = logging.getLogger(__name__) + +# Modèle VLM par défaut — Gemma 4 latest (8B dense, Q4_K_M) +# Nécessite think=false dans le payload (sinon tokens vides sur Ollama >=0.20) +DEFAULT_VLM_MODEL = "gemma4:latest" + +# Modèles de fallback, testés dans l'ordre si le modèle principal n'est pas dispo +FALLBACK_VLM_MODELS = ["qwen3-vl:8b", "0000/ui-tars-1.5-7b-q8_0:7b"] + +# Endpoint Ollama par défaut +DEFAULT_OLLAMA_ENDPOINT = "http://localhost:11434" + +# Cache du modèle résolu (évite de requêter Ollama à chaque appel) +_resolved_model: Optional[str] = None +_resolved_model_checked = False + + +def get_vlm_model( + endpoint: str = DEFAULT_OLLAMA_ENDPOINT, + force_check: bool = False, +) -> str: + """Retourne le nom du modèle VLM à utiliser, avec fallback automatique. + + Vérifie la disponibilité du modèle dans Ollama au premier appel, + puis cache le résultat pour les appels suivants. + + Args: + endpoint: URL de l'API Ollama + force_check: Forcer une nouvelle vérification (ignorer le cache) + + Returns: + Nom du modèle VLM disponible (ex: "gemma4:latest") + """ + global _resolved_model, _resolved_model_checked + + if _resolved_model_checked and not force_check: + return _resolved_model + + # Lire le modèle configuré depuis l'environnement + configured = ( + os.environ.get("RPA_VLM_MODEL") + or os.environ.get("VLM_MODEL") + or DEFAULT_VLM_MODEL + ) + + # Vérifier la disponibilité dans Ollama + available = _list_ollama_models(endpoint) + + if available is None: + # Ollama non joignable — utiliser le modèle configuré sans vérification + logger.warning( + "Ollama non joignable (%s) — utilisation de '%s' sans vérification", + endpoint, configured, + ) + _resolved_model = configured + _resolved_model_checked = True + return _resolved_model + + # Vérifier si le modèle configuré est disponible + if _model_available(configured, available): + logger.info("VLM model: %s (configuré, disponible)", configured) + _resolved_model = configured + _resolved_model_checked = True + return _resolved_model + + # Fallback : essayer les modèles alternatifs + logger.warning( + "Modèle VLM '%s' non trouvé dans Ollama. Recherche d'un fallback...", + configured, + ) + + # Construire la liste de fallback complète + fallback_candidates = [DEFAULT_VLM_MODEL] + FALLBACK_VLM_MODELS + for candidate in fallback_candidates: + if candidate == configured: + continue # Déjà testé + if _model_available(candidate, available): + logger.info( + "VLM model: %s (fallback, '%s' non disponible)", + candidate, configured, + ) + _resolved_model = candidate + _resolved_model_checked = True + return _resolved_model + + # Aucun fallback trouvé — utiliser le modèle configuré quand même + # (Ollama le téléchargera peut-être au premier appel) + logger.warning( + "Aucun modèle VLM trouvé dans Ollama. " + "Modèles disponibles : %s. Utilisation de '%s' par défaut.", + [m for m in available if "vl" in m.lower() or "gemma" in m.lower()], + configured, + ) + _resolved_model = configured + _resolved_model_checked = True + return _resolved_model + + +def reset_vlm_model_cache(): + """Réinitialiser le cache du modèle résolu. + + Utile après un changement de configuration ou un pull de modèle. + """ + global _resolved_model, _resolved_model_checked + _resolved_model = None + _resolved_model_checked = False + + +def is_thinking_model(model_name: str) -> bool: + """Détermine si un modèle est un modèle 'thinking' (qwen3). + + Les modèles thinking nécessitent un assistant prefill pour éviter + le mode réflexion interne qui peut durer >180s avec des images. + + Args: + model_name: Nom du modèle (ex: "qwen3-vl:8b", "gemma4:e4b") + + Returns: + True si le modèle est de type thinking (nécessite prefill workaround) + """ + return "qwen3" in model_name.lower() + + +def needs_think_false(model_name: str) -> bool: + """Détermine si un modèle nécessite think=false dans le payload. + + Sur Ollama >=0.20, gemma4 produit des tokens vides si think n'est pas + explicitement désactivé. Ce flag doit être envoyé dans le payload chat. + + Args: + model_name: Nom du modèle (ex: "gemma4:latest", "gemma4:e4b") + + Returns: + True si le modèle nécessite think=false + """ + return "gemma4" in model_name.lower() + + +def _list_ollama_models(endpoint: str) -> Optional[List[str]]: + """Lister les modèles disponibles dans Ollama. + + Returns: + Liste des noms de modèles, ou None si Ollama n'est pas joignable. + """ + try: + resp = requests.get(f"{endpoint}/api/tags", timeout=5) + if resp.status_code == 200: + models = resp.json().get("models", []) + return [m["name"] for m in models] + except Exception: + pass + return None + + +def _model_available(model_name: str, available_models: List[str]) -> bool: + """Vérifie si un modèle est disponible dans la liste Ollama. + + Supporte la correspondance exacte et le match sans tag de version + (ex: "gemma4:e4b" match "gemma4:e4b" ou "gemma4:e4b-q4_0"). + """ + # Match exact + if model_name in available_models: + return True + + # Match par préfixe (sans tag) — "gemma4:e4b" match "gemma4:e4b" + base_name = model_name.split(":")[0] if ":" in model_name else model_name + for m in available_models: + if m.startswith(base_name + ":"): + return True + + return False diff --git a/deploy/lea_package/config.txt b/deploy/lea_package/config.txt index b1388e094..3dd97c298 100644 --- a/deploy/lea_package/config.txt +++ b/deploy/lea_package/config.txt @@ -23,9 +23,21 @@ RPA_SERVER_HOST=lea.labs.laurinebazin.design # Parametres avances (ne pas modifier sauf indication) # ============================================================ -# Flouter les zones de texte dans les captures (securite donnees) -# Mettre false uniquement pour le developpement/tests -RPA_BLUR_SENSITIVE=true +# Flouter les zones de texte dans les captures cote CLIENT. +# +# DEPUIS AVRIL 2026 : LE BLUR CLIENT EST DESACTIVE PAR DEFAUT. +# Le floutage des donnees sensibles (noms, adresses, telephones, NIR, email) +# est desormais effectue cote SERVEUR via EDS-NLP + OCR dans le module +# core/anonymisation/pii_blur.py. +# +# Avantages du blur server-side : +# - Cible precisement les PII (PERSON/LOCATION/PHONE/NIR/EMAIL) +# - Ne casse plus les codes CIM, montants PMSI, identifiants techniques +# - Deux versions stockees : _raw (entrainement) + _blurred (affichage) +# +# Ne remettre a 'true' que si un deploiement specifique l'exige explicitement +# (ex : reseau non chiffre entre agent et serveur). +RPA_BLUR_SENSITIVE=false # Duree de conservation des logs en jours (minimum 180 pour conformite) RPA_LOG_RETENTION_DAYS=180 diff --git a/tests/unit/test_pii_blur.py b/tests/unit/test_pii_blur.py new file mode 100644 index 000000000..d6878a3e3 --- /dev/null +++ b/tests/unit/test_pii_blur.py @@ -0,0 +1,298 @@ +# tests/unit/test_pii_blur.py +"""Tests du pipeline d'anonymisation server-side (core.anonymisation.pii_blur). + +On couvre : +1. Logique regex : PERSON / ADDRESS / PHONE / NIR / EMAIL sont détectés ; + codes CIM, CCAM, montants, dates et IDs techniques sont protégés. +2. Fusion de spans qui se chevauchent. +3. Pipeline complet sur une image synthétique PIL (sans OCR — on patche `_doctr_ocr`) + avec assertions sur les pixels : les zones PII sont floutées, les autres non. +""" + +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import patch + +import pytest +from PIL import Image, ImageDraw, ImageFont + + +# Éviter de charger docTR pendant les tests rapides +os.environ.setdefault("RPA_PII_BLUR_SERVER", "true") + + +# --- Import du module sous test -------------------------------------------- +from core.anonymisation import pii_blur as mod # noqa: E402 + + +# =========================================================================== +# Tests regex — pas besoin d'image +# =========================================================================== + +class TestRegexPII: + def test_detect_person_with_title(self): + hits = mod._regex_find_pii("Patient : M. Dupont Jean, né le 12/03/1965") + labels = {h[0] for h in hits} + assert "PERSON" in labels + + def test_detect_email(self): + hits = mod._regex_find_pii("Contact : jean.dupont@hopital.fr") + labels = {h[0] for h in hits} + assert "EMAIL" in labels + + def test_detect_phone_fr(self): + hits = mod._regex_find_pii("Tél : 06 12 34 56 78") + labels = {h[0] for h in hits} + assert "PHONE" in labels + + def test_detect_nir(self): + hits = mod._regex_find_pii("NIR : 2 85 03 75 115 120 42") + labels = {h[0] for h in hits} + assert "NIR" in labels + + def test_detect_address(self): + hits = mod._regex_find_pii("Adresse : 12 rue de la Paix, Paris") + labels = {h[0] for h in hits} + assert "LOCATION" in labels + + # --- Négatifs : ces motifs NE DOIVENT PAS être détectés -------------- + def test_icd10_not_flagged(self): + hits = mod._regex_find_pii("Code CIM : F32.1 (épisode dépressif)") + assert not hits, f"CIM ne doit pas être floué, hits={hits}" + + def test_ccam_not_flagged(self): + hits = mod._regex_find_pii("Acte CCAM : DEQP003") + assert not hits + + def test_money_not_flagged(self): + hits = mod._regex_find_pii("Montant facturé : 1250,50 €") + assert not hits + + def test_date_not_flagged(self): + hits = mod._regex_find_pii("Séjour du 01/03/2026 au 15/03/2026") + assert not hits + + def test_tech_id_not_flagged(self): + hits = mod._regex_find_pii("Fichier : shot_0007_full.png session_abc123") + assert not hits + + # --- Mélange réaliste ------------------------------------------------- + def test_realistic_hospital_text(self): + text = ( + "Patient : M. Dupont Jean - NIR : 1 85 03 75 115 120 42 " + "- Tél : 06 12 34 56 78 - Adresse : 12 rue de la Paix " + "- Code CIM : F32.1 - Montant : 1250,50 € " + "- Séjour du 01/03/2026 - Email : jean@test.fr" + ) + hits = mod._regex_find_pii(text) + labels = {h[0] for h in hits} + assert "PERSON" in labels + assert "NIR" in labels + assert "PHONE" in labels + assert "LOCATION" in labels + assert "EMAIL" in labels + + # Vérifier qu'aucun hit ne couvre F32.1, 1250,50 €, 01/03/2026 + protected_strings = ("F32.1", "1250,50", "01/03/2026") + for label, s, e in hits: + span = text[s:e] + for prot in protected_strings: + assert prot not in span, f"{label} '{span}' couvre {prot}" + + +# =========================================================================== +# Tests de fusion de spans +# =========================================================================== + +class TestMergeSpans: + def test_non_overlapping_preserved(self): + spans = [("PERSON", 0, 5), ("EMAIL", 20, 30)] + assert mod._merge_spans(spans) == spans + + def test_overlap_kept_widest_label(self): + spans = [("PERSON", 0, 10), ("LOCATION", 5, 20)] + merged = mod._merge_spans(spans) + assert len(merged) == 1 + label, s, e = merged[0] + assert (s, e) == (0, 20) + # le plus large est LOCATION (15 chars) > PERSON (10) + assert label == "LOCATION" + + def test_identical_spans_dedup(self): + spans = [("EMAIL", 3, 9), ("EMAIL", 3, 9)] + assert len(mod._merge_spans(spans)) == 1 + + +# =========================================================================== +# Tests pipeline complet avec OCR mocké +# =========================================================================== + +@pytest.fixture +def synthetic_screenshot(tmp_path: Path) -> Path: + """Génère une image synthétique avec 4 lignes de texte aux positions connues.""" + W, H = 900, 300 + img = Image.new("RGB", (W, H), color="white") + draw = ImageDraw.Draw(img) + try: + font = ImageFont.truetype("DejaVuSans.ttf", 22) + except Exception: + font = ImageFont.load_default() + + # Lignes (y, text) — on reproduit le test demandé par l'utilisateur + lines = [ + (20, "Nom : Dupont"), + (70, "Code CIM : F32.1"), + (120, "Adresse : 12 rue de la Paix"), + (170, "Montant : 1250€"), + ] + for y, t in lines: + draw.text((20, y), t, fill="black", font=font) + + path = tmp_path / "synth.png" + img.save(path, format="PNG") + return path + + +def _fake_doctr_ocr(image_path: Path): + """Mock docTR : retourne des bbox word-level connues pour le screenshot synthétique. + + On utilise des bbox approximatives correspondant à la disposition dans le fixture. + """ + # Format : liste de mots par ligne. Les x sont progressifs pour simuler + # la largeur rendue. On reste volontairement grossier (le blur tolère). + words = [] + + line_idx = [0] + + def line(y, word_defs): + x = 20 + for text, w in word_defs: + words.append({ + "text": text, "x1": x, "y1": y, "x2": x + w, "y2": y + 30, + "line": line_idx[0], + }) + x += w + 8 + line_idx[0] += 1 + + # "Nom : Dupont" → ciblé PERSON (via "M. Dupont" ? non, on n'a pas M.) → + # on ajoute le titre "M." pour déclencher le regex PERSON. + line(20, [("M.", 30), ("Dupont", 90)]) + # "Code CIM : F32.1" → doit NE PAS flouter + line(70, [("Code", 60), ("CIM", 50), (":", 10), ("F32.1", 80)]) + # "Adresse : 12 rue de la Paix" → LOCATION + line(120, [("Adresse", 90), (":", 10), ("12", 30), ("rue", 40), ("de", 30), + ("la", 30), ("Paix", 60)]) + # "Montant : 1250€" → NE PAS flouter + line(170, [("Montant", 90), (":", 10), ("1250€", 80)]) + + return words, 900, 300 + + +def _pixel_variance(img: Image.Image, bbox) -> float: + """Variance moyenne par canal dans une ROI — proxy pour « y a-t-il du détail ». + + Une zone floutée a une variance très basse ; une zone nette a plus de détail. + """ + import statistics + x1, y1, x2, y2 = bbox + crop = img.crop((x1, y1, x2, y2)).convert("RGB") + pixels = list(crop.getdata()) + if len(pixels) < 2: + return 0.0 + rs = [p[0] for p in pixels] + gs = [p[1] for p in pixels] + bs = [p[2] for p in pixels] + return (statistics.pvariance(rs) + statistics.pvariance(gs) + statistics.pvariance(bs)) / 3 + + +class TestPIIBlurrerPipeline: + def test_blur_only_pii_regions(self, tmp_path, synthetic_screenshot): + with patch.object(mod, "_doctr_ocr", side_effect=_fake_doctr_ocr): + blurrer = mod.PIIBlurrer(use_edsnlp=False) + out = tmp_path / "synth_blurred.png" + result = blurrer.blur_image(synthetic_screenshot, out) + + # Assertions globales + assert result.count >= 2, ( + f"Attendu au moins 2 PII (PERSON + LOCATION), reçu {result.count} : " + f"{[e.label for e in result.entities]}" + ) + labels = {e.label for e in result.entities} + assert "PERSON" in labels + assert "LOCATION" in labels + # F32.1 ne doit PAS être parmi les entités floutées + assert not any("F32.1" in e.text for e in result.entities) + # 1250 ne doit PAS être parmi les entités floutées + assert not any("1250" in e.text for e in result.entities) + + # Vérification visuelle : la ligne CIM (y=70..100) doit rester nette, + # la ligne Adresse (y=120..150) doit être floutée. + original = Image.open(synthetic_screenshot) + blurred = Image.open(out) + + # Ligne CIM : doit contenir du texte net (variance haute sur la zone) + cim_bbox = (20, 68, 280, 105) + var_orig_cim = _pixel_variance(original, cim_bbox) + var_blur_cim = _pixel_variance(blurred, cim_bbox) + # Tolérance : la zone CIM doit rester AU MOINS à 60% de la variance d'origine + assert var_blur_cim >= 0.5 * var_orig_cim, ( + f"Code CIM a été flouté ! var_orig={var_orig_cim:.1f}, " + f"var_blur={var_blur_cim:.1f}" + ) + + # Ligne Adresse : la variance doit chuter (flou applique un lissage) + addr_bbox = (20, 118, 400, 155) + var_orig_addr = _pixel_variance(original, addr_bbox) + var_blur_addr = _pixel_variance(blurred, addr_bbox) + assert var_blur_addr < var_orig_addr * 0.85, ( + f"Adresse pas suffisamment floutée : var_orig={var_orig_addr:.1f}, " + f"var_blur={var_blur_addr:.1f}" + ) + + def test_no_pii_copies_file(self, tmp_path): + """Si aucun PII n'est détecté, le fichier est copié tel quel.""" + img = Image.new("RGB", (400, 100), "white") + p = tmp_path / "clean.png" + img.save(p) + + def fake_clean_ocr(path): + return ( + [{"text": "Bonjour", "x1": 10, "y1": 10, "x2": 100, "y2": 40}, + {"text": "monde", "x1": 110, "y1": 10, "x2": 200, "y2": 40}], + 400, 100, + ) + + with patch.object(mod, "_doctr_ocr", side_effect=fake_clean_ocr): + res = mod.PIIBlurrer(use_edsnlp=False).blur_image(p, tmp_path / "clean_out.png") + + assert res.count == 0 + assert (tmp_path / "clean_out.png").is_file() + + def test_ocr_failure_falls_back_to_copy(self, tmp_path): + """Si docTR plante, on copie l'original en version 'blurred' (failsafe).""" + img = Image.new("RGB", (100, 100), "white") + p = tmp_path / "fail.png" + img.save(p) + + def boom(path): + raise RuntimeError("OCR indispo") + + with patch.object(mod, "_doctr_ocr", side_effect=boom): + res = mod.PIIBlurrer(use_edsnlp=False).blur_image(p, tmp_path / "fail_out.png") + + assert res.count == 0 + assert (tmp_path / "fail_out.png").is_file() + + +# =========================================================================== +# Sanity check helper +# =========================================================================== + +def test_pii_labels_contains_expected(): + assert "PERSON" in mod.PII_LABELS + assert "LOCATION" in mod.PII_LABELS + assert "EMAIL" in mod.PII_LABELS + assert "NIR" in mod.PII_LABELS + assert "PHONE" in mod.PII_LABELS diff --git a/visual_workflow_builder/backend/vlm_provider.py b/visual_workflow_builder/backend/vlm_provider.py index 67395236d..31f76a4d0 100644 --- a/visual_workflow_builder/backend/vlm_provider.py +++ b/visual_workflow_builder/backend/vlm_provider.py @@ -18,22 +18,29 @@ for path in env_paths: break class VLMProvider: - """Hub de Vision Sémantique Multi-Fournisseurs (OpenAI, Gemini, Anthropic, Ollama)""" + """Hub de Vision Sémantique — Ollama local prioritaire, cloud opt-in. + + Par défaut, seul Ollama local est utilisé (100% local, pas de cloud). + Pour activer les APIs cloud en fallback, définir VLM_ALLOW_CLOUD=true + dans l'environnement. + """ def __init__(self): - # Clés API - self.openai_key = os.getenv("OPENAI_API_KEY") - self.gemini_key = os.getenv("GOOGLE_API_KEY") - self.anthropic_key = os.getenv("ANTHROPIC_API_KEY") - self.deepseek_key = os.getenv("DEEPSEEK_API_KEY") - - # Configuration Ollama Local - self.ollama_url = os.getenv("OLLAMA_URL", "http://localhost:11434") - self.local_model = os.getenv("VLM_MODEL", "qwen3-vl:8b") + # Cloud opt-in uniquement (VLM_ALLOW_CLOUD=true pour activer) + self.allow_cloud = os.getenv("VLM_ALLOW_CLOUD", "").lower() in ("true", "1", "yes") - # Priorité par défaut - self.preferred_cloud = "openai" # gpt-4o est la référence UI - print(f"🔧 [VLM Hub] Initialisé. OpenAI: {bool(self.openai_key)}, Gemini: {bool(self.gemini_key)}, Anthropic: {bool(self.anthropic_key)}") + # Clés API (chargées mais pas utilisées sauf si cloud autorisé) + self.openai_key = os.getenv("OPENAI_API_KEY") if self.allow_cloud else None + self.gemini_key = os.getenv("GOOGLE_API_KEY") if self.allow_cloud else None + self.anthropic_key = os.getenv("ANTHROPIC_API_KEY") if self.allow_cloud else None + self.deepseek_key = os.getenv("DEEPSEEK_API_KEY") if self.allow_cloud else None + + # Configuration Ollama Local (toujours prioritaire) + self.ollama_url = os.getenv("OLLAMA_URL", "http://localhost:11434") + self.local_model = os.getenv("RPA_VLM_MODEL", os.getenv("VLM_MODEL", "gemma4:latest")) + + cloud_status = f"OpenAI: {bool(self.openai_key)}, Gemini: {bool(self.gemini_key)}, Anthropic: {bool(self.anthropic_key)}" if self.allow_cloud else "désactivé (VLM_ALLOW_CLOUD non défini)" + print(f"[VLM Hub] Ollama local: {self.ollama_url} ({self.local_model}), Cloud: {cloud_status}") def _to_base64(self, image_input) -> str: """Convertit n'importe quel input image en base64 pur""" @@ -51,25 +58,28 @@ class VLMProvider: return base64.b64encode(image_input).decode("utf-8") def detect_ui_element(self, screenshot, anchor_image=None, description: str = "") -> Optional[Dict[str, Any]]: - """Tente de localiser l'élément en essayant les fournisseurs par ordre de qualité""" - - # 1. Tenter OpenAI (Référence Vision UI) - if self.openai_key: - res = self._call_openai(screenshot, anchor_image, description) - if res and res.get('found'): return res + """Localise l'élément — Ollama local en priorité, cloud en fallback opt-in.""" - # 2. Tenter Gemini (Excellent backup Vision) - if self.gemini_key: - res = self._call_gemini(screenshot, anchor_image, description) - if res and res.get('found'): return res + # 1. Ollama local (toujours prioritaire — 100% local) + res = self._call_ollama_local(screenshot, anchor_image, description) + if res and res.get('found'): + return res - # 3. Tenter Anthropic (Précision logique) - if self.anthropic_key: - res = self._call_anthropic(screenshot, anchor_image, description) - if res and res.get('found'): return res + # 2-4. Fallback cloud (uniquement si VLM_ALLOW_CLOUD=true) + if self.allow_cloud: + if self.openai_key: + res = self._call_openai(screenshot, anchor_image, description) + if res and res.get('found'): return res - # 4. Fallback Local (Ollama) - Crucial pour le DGX Spark - return self._call_ollama_local(screenshot, anchor_image, description) + if self.gemini_key: + res = self._call_gemini(screenshot, anchor_image, description) + if res and res.get('found'): return res + + if self.anthropic_key: + res = self._call_anthropic(screenshot, anchor_image, description) + if res and res.get('found'): return res + + return res # Retourner le dernier résultat (Ollama ou cloud) def _call_openai(self, screenshot, anchor_image, description): try: @@ -137,28 +147,36 @@ class VLMProvider: return None def _call_ollama_local(self, screenshot, anchor_image, description): - """Appel à Ollama local (Mode DGX Spark / Offline)""" + """Appel a Ollama local (prioritaire — 100% local)""" try: import requests - print(f"🏠 [Hub] Fallback Local Ollama ({self.local_model})...") - prompt = f"Localise l'élément '{description}'. Retourne JSON: {{'found': bool, 'bbox': [ymin, xmin, ymax, xmax] (0-1000)}}" - + print(f"[Hub] Ollama local ({self.local_model})...") + prompt = f"Localise l'element '{description}'. Retourne JSON: {{'found': bool, 'bbox': [ymin, xmin, ymax, xmax] (0-1000)}}" + + images = [self._to_base64(screenshot)] + if anchor_image: + images.append(self._to_base64(anchor_image)) + + messages = [{"role": "user", "content": prompt, "images": images}] + payload = { "model": self.local_model, - "prompt": prompt, - "images": [self._to_base64(screenshot)], + "messages": messages, "stream": False, "format": "json" } - if anchor_image: - payload["images"].append(self._to_base64(anchor_image)) - response = requests.post(f"{self.ollama_url}/api/generate", json=payload, timeout=60) + # gemma4 necessite think=false (sinon tokens vides sur Ollama >=0.20) + if "gemma4" in self.local_model.lower(): + payload["think"] = False + + response = requests.post(f"{self.ollama_url}/api/chat", json=payload, timeout=60) if response.status_code == 200: - return json.loads(response.json().get('response', '{}')) + content = response.json().get("message", {}).get("content", "{}") + return json.loads(content) return None except Exception as e: - print(f"❌ [Hub] Local Ollama Error: {e}") + print(f"[Hub] Ollama local erreur: {e}") return {"found": False, "error": str(e)} # Instance unique