VLM v2 : prompt élargi (19 catégories PII), modèle 235b cloud, masquage total pages manuscrites

- vlm_manager.py : nouvelles catégories (NUMERO_LOT, NUMERO_PATIENT, NUMERO_ORDONNANCE,
  SERVICE, ETABLISSEMENT, DATE, AGE, NDA), prompt détaillé pour identifiants médicaux
  (EFS, lots PSL, services hospitaliers), modèle par défaut qwen3-vl:235b-instruct-cloud,
  parser JSON robuste (réparation troncature), num_predict 8192
- anonymizer_core_refactored_onnx.py : FULL_PAGE_MASK pour pages manuscrites
  (OCR < 100 mots + VLM PII ou VLM en échec), matching flou pour numéros manuscrits
  (_search_ocr_words_fuzzy_digits), auto-rotation VLM (4 orientations),
  fix label OGC doublé, support nouveaux kinds VLM dans redact_pdf_raster

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 02:08:02 +01:00
parent f206d160f4
commit 125ac82f4f
2 changed files with 510 additions and 18 deletions

View File

@@ -1401,6 +1401,45 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
# ----------------- PDF Redaction ----------------- # ----------------- PDF Redaction -----------------
def _search_ocr_words_fuzzy_digits(ocr_words: List[Tuple[str, float, float, float, float]],
token: str, page_rect, min_ratio: float = 0.7) -> list:
"""Matching flou pour identifiants numériques manuscrits.
Compare les séquences de chiffres entre le token VLM et les mots OCR.
Accepte une correspondance si ≥ min_ratio des chiffres matchent."""
token_digits = re.sub(r"[^0-9]", "", token)
if len(token_digits) < 4:
return []
rects = []
for (word, x0n, y0n, x1n, y1n) in ocr_words:
word_digits = re.sub(r"[^0-9]", "", word)
if len(word_digits) < 3:
continue
# Match exact des chiffres (après nettoyage)
if word_digits == token_digits:
rects.append(fitz.Rect(
x0n * page_rect.width, y0n * page_rect.height,
x1n * page_rect.width, y1n * page_rect.height,
))
continue
# Match partiel : le token est contenu dans le mot OCR ou vice-versa
if token_digits in word_digits or word_digits in token_digits:
if min(len(token_digits), len(word_digits)) / max(len(token_digits), len(word_digits)) >= min_ratio:
rects.append(fitz.Rect(
x0n * page_rect.width, y0n * page_rect.height,
x1n * page_rect.width, y1n * page_rect.height,
))
continue
# Match par distance : comparer caractère par caractère (Hamming-like)
if abs(len(word_digits) - len(token_digits)) <= 2:
shorter, longer = (word_digits, token_digits) if len(word_digits) <= len(token_digits) else (token_digits, word_digits)
matches = sum(1 for a, b in zip(shorter, longer) if a == b)
if matches / len(longer) >= min_ratio:
rects.append(fitz.Rect(
x0n * page_rect.width, y0n * page_rect.height,
x1n * page_rect.width, y1n * page_rect.height,
))
return rects
def _search_ocr_words(ocr_words: List[Tuple[str, float, float, float, float]], token: str, page_rect) -> list: def _search_ocr_words(ocr_words: List[Tuple[str, float, float, float, float]], token: str, page_rect) -> list:
"""Cherche un token dans les mots OCR d'une page. """Cherche un token dans les mots OCR d'une page.
Pour les tokens multi-mots, cherche chaque mot individuellement. Pour les tokens multi-mots, cherche chaque mot individuellement.
@@ -1525,7 +1564,7 @@ def _rasterize_page(args):
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except Exception: except Exception:
font = ImageFont.load_default() font = ImageFont.load_default()
text = f"OGC: {ogc_label}" text = ogc_label if ogc_label.upper().startswith("OGC") else f"OGC: {ogc_label}"
bbox = draw.textbbox((0, 0), text, font=font) bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
margin = int(10 * zoom) margin = int(10 * zoom)
@@ -1547,6 +1586,8 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
_RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} _RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"}
_RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", _RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
"EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"}
_VLM_NUMERIC_KINDS = {"VLM_NUM_PATIENT", "VLM_NUM_LOT", "VLM_NUM_ORD", "VLM_NDA",
"VLM_NIR", "VLM_IPP", "VLM_RPPS"}
by_page: Dict[int, List[PiiHit]] = {} by_page: Dict[int, List[PiiHit]] = {}
for h in audit: for h in audit:
by_page.setdefault(h.page, []).append(h) by_page.setdefault(h.page, []).append(h)
@@ -1555,6 +1596,12 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
rects = [] rects = []
seen_tokens: set = set() seen_tokens: set = set()
hits = by_page.get(pno, []) + by_page.get(-1, []) hits = by_page.get(pno, []) + by_page.get(-1, [])
# Masquage total si FULL_PAGE_MASK détecté (page manuscrite non déchiffrable)
if any(h.kind == "FULL_PAGE_MASK" and h.page == pno for h in hits):
margin = 5 # points — liseré fin autour du masque
rects.append(fitz.Rect(margin, margin, page.rect.width - margin, page.rect.height - margin))
all_rects[pno] = rects
continue
for h in hits: for h in hits:
token = h.original.strip() token = h.original.strip()
if not token or h.kind in _RASTER_SKIP_KINDS: if not token or h.kind in _RASTER_SKIP_KINDS:
@@ -1570,19 +1617,24 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
rects.extend(found_short) rects.extend(found_short)
continue continue
found = page.search_for(token) found = page.search_for(token)
if not found and h.kind in {"NIR", "IBAN", "TEL"}: if not found and h.kind in {"NIR", "IBAN", "TEL", "VLM_TEL", "VLM_NIR"}:
compact = re.sub(r"\s+", "", token) compact = re.sub(r"\s+", "", token)
found = page.search_for(compact) found = page.search_for(compact)
if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM",
"VLM_NOM", "VLM_ETAB", "VLM_SERVICE"}:
for word in token.split(): for word in token.split():
word = word.strip(" .-'") word = word.strip(" .-'")
if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: if len(word) < 3 or word.lower() in _MEDICAL_STOP_WORDS_SET:
continue
if not word[0].isupper():
continue continue
found.extend(page.search_for(word)) found.extend(page.search_for(word))
# Fallback OCR pour chaque mot
if not found and ocr_word_map and pno in ocr_word_map:
found.extend(_search_ocr_words(ocr_word_map[pno], word, page.rect))
if not found and ocr_word_map and pno in ocr_word_map: if not found and ocr_word_map and pno in ocr_word_map:
found = _search_ocr_words(ocr_word_map[pno], token, page.rect) found = _search_ocr_words(ocr_word_map[pno], token, page.rect)
# Matching flou pour identifiants numériques VLM (manuscrit)
if not found and h.kind in _VLM_NUMERIC_KINDS and ocr_word_map and pno in ocr_word_map:
found = _search_ocr_words_fuzzy_digits(ocr_word_map[pno], token, page.rect)
rects.extend(found) rects.extend(found)
all_rects[pno] = rects all_rects[pno] = rects
@@ -1615,32 +1667,57 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: OcrWordMap, vlm_manager) -> None: def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: OcrWordMap, vlm_manager) -> None:
"""Utilise un VLM (Ollama) pour détecter visuellement les PII sur chaque page d'un PDF scanné. """Utilise un VLM (Ollama) pour détecter visuellement les PII sur chaque page d'un PDF scanné.
Les entités détectées sont ajoutées à anon.audit et au texte pseudonymisé.""" Les entités détectées sont ajoutées à anon.audit et au texte pseudonymisé.
Auto-rotation : si une page a peu de mots OCR, essaie 4 orientations."""
from vlm_manager import VLM_CATEGORY_MAP from vlm_manager import VLM_CATEGORY_MAP
doc = fitz.open(str(pdf_path)) doc = fitz.open(str(pdf_path))
# Collecter les PII déjà détectés pour contexte VLM # Collecter les PII déjà détectés pour contexte VLM
existing_pii = list({h.original.strip() for h in anon.audit if h.original.strip()}) existing_pii = list({h.original.strip() for h in anon.audit if h.original.strip()})
for pno in range(len(doc)): # Catégories contenant des identifiants numériques (matching flou)
pix = doc[pno].get_pixmap(dpi=200) _NUMERIC_CATS = {"NUMERO_PATIENT", "NUMERO_LOT", "NUMERO_ORDONNANCE", "NUMERO_SEJOUR",
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) "NDA", "NIR", "IPP", "RPPS"}
try: # Catégories à splitter en mots (noms, services, établissements)
entities = vlm_manager.analyze_page_image(img, page_number=pno, existing_pii=existing_pii[:20]) _SPLIT_CATS = {"NOM", "PRENOM", "ETABLISSEMENT", "SERVICE"}
except Exception:
continue
for ent in entities: for pno in range(len(doc)):
pix = doc[pno].get_pixmap(dpi=150)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
# Détection page manuscrite : peu de mots OCR = scan manuscrit / mal orienté
ocr_count = len(ocr_word_map.get(pno, []))
is_handwritten_page = ocr_count < 100
rotations_to_try = [0]
if is_handwritten_page:
rotations_to_try = [0, 270, 90, 180]
best_entities = []
for rot in rotations_to_try:
img_rot = img.rotate(rot, expand=True) if rot else img
try:
ents = vlm_manager.analyze_page_image(img_rot, page_number=pno,
existing_pii=existing_pii[:20])
except Exception:
ents = []
if len(ents) > len(best_entities):
best_entities = ents
# Si on a trouvé assez d'entités, pas besoin d'essayer d'autres rotations
if len(ents) >= 5:
break
for ent in best_entities:
cat = ent.get("categorie", "").upper() cat = ent.get("categorie", "").upper()
texte = ent.get("texte", "").strip() texte = ent.get("texte", "").strip()
conf = ent.get("confiance", 0.0) conf = ent.get("confiance", 0.0)
if not texte or conf < 0.5: if not texte or conf < 0.3:
continue continue
if cat not in VLM_CATEGORY_MAP: if cat not in VLM_CATEGORY_MAP:
continue continue
kind, placeholder_key = VLM_CATEGORY_MAP[cat] kind, placeholder_key = VLM_CATEGORY_MAP[cat]
placeholder = PLACEHOLDERS.get(placeholder_key, PLACEHOLDERS["MASK"]) placeholder = PLACEHOLDERS.get(placeholder_key, PLACEHOLDERS["MASK"])
# Ajouter chaque mot comme hit séparé (meilleur matching OCR)
if cat in ("NOM", "PRENOM"): if cat in _SPLIT_CATS:
# Splitter en mots pour meilleur matching OCR
for word in texte.split(): for word in texte.split():
word = word.strip(" .-'(),") word = word.strip(" .-'(),")
if len(word) < 2 or word.lower() in _MEDICAL_STOP_WORDS_SET: if len(word) < 2 or word.lower() in _MEDICAL_STOP_WORDS_SET:
@@ -1648,12 +1725,28 @@ def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: Oc
anon.audit.append(PiiHit(page=pno, kind=kind, original=word, placeholder=placeholder)) anon.audit.append(PiiHit(page=pno, kind=kind, original=word, placeholder=placeholder))
else: else:
anon.audit.append(PiiHit(page=pno, kind=kind, original=texte, placeholder=placeholder)) anon.audit.append(PiiHit(page=pno, kind=kind, original=texte, placeholder=placeholder))
# Pour les identifiants numériques, ajouter aussi le token nettoyé (chiffres seuls)
if cat in _NUMERIC_CATS:
digits_only = re.sub(r"[^0-9]", "", texte)
if digits_only and digits_only != texte:
anon.audit.append(PiiHit(page=pno, kind=kind, original=digits_only, placeholder=placeholder))
# Remplacer dans le texte pseudonymisé si trouvé # Remplacer dans le texte pseudonymisé si trouvé
try: try:
anon.text_out = re.sub(rf"\b{re.escape(texte)}\b", placeholder, anon.text_out) anon.text_out = re.sub(rf"\b{re.escape(texte)}\b", placeholder, anon.text_out)
except re.error: except re.error:
anon.text_out = anon.text_out.replace(texte, placeholder) anon.text_out = anon.text_out.replace(texte, placeholder)
# Masquage total : page manuscrite avec PII confirmées OU VLM en échec
vlm_pii_count = sum(1 for e in best_entities
if e.get("categorie", "").upper() in VLM_CATEGORY_MAP
and e.get("confiance", 0) >= 0.3)
if is_handwritten_page and (vlm_pii_count >= 3 or (len(best_entities) == 0 and ocr_count > 0)):
anon.audit.append(PiiHit(page=pno, kind="FULL_PAGE_MASK", original="page manuscrite",
placeholder=PLACEHOLDERS["MASK"]))
log.info("VLM page %d : masquage total (OCR=%d mots, VLM=%d PII, handwritten=%s)",
pno, ocr_count, vlm_pii_count, is_handwritten_page)
doc.close() doc.close()

399
vlm_manager.py Normal file
View File

@@ -0,0 +1,399 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
VLM Manager — Analyse visuelle des PDF via Ollama (qwen3-vl)
-------------------------------------------------------------
Couche complémentaire aux regex/NER : envoie chaque page PDF comme image
à un VLM local (Ollama) pour détecter visuellement les PII.
Dégradation gracieuse : si Ollama est indisponible, le pipeline continue sans VLM.
Dépendances : aucune (utilise uniquement urllib de la stdlib).
"""
from __future__ import annotations
import base64
import io
import json
import logging
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
try:
from PIL import Image
except ImportError:
Image = None # type: ignore
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class VlmConfig:
"""Configuration pour le VLM Ollama."""
base_url: str = "http://localhost:11434"
model: str = "qwen3-vl:235b-instruct-cloud"
timeout: int = 180
max_image_size: int = 2048 # pixels (côté le plus long)
temperature: float = 0.1
num_predict: int = 8192
min_confidence: float = 0.5
# ---------------------------------------------------------------------------
# Mapping catégories VLM → (PiiHit.kind, clé PLACEHOLDERS)
# ---------------------------------------------------------------------------
VLM_CATEGORY_MAP: Dict[str, Tuple[str, str]] = {
"NOM": ("VLM_NOM", "NOM"),
"PRENOM": ("VLM_NOM", "NOM"),
"ADRESSE": ("VLM_ADRESSE", "ADRESSE"),
"TELEPHONE": ("VLM_TEL", "TEL"),
"EMAIL": ("VLM_EMAIL", "EMAIL"),
"DATE_NAISSANCE": ("VLM_DATE_NAISS", "DATE_NAISSANCE"),
"NIR": ("VLM_NIR", "NIR"),
"IPP": ("VLM_IPP", "IPP"),
"CODE_POSTAL": ("VLM_CP", "CODE_POSTAL"),
"VILLE": ("VLM_VILLE", "VILLE"),
"RPPS": ("VLM_RPPS", "RPPS"),
# Identifiants médicaux / traçabilité
"NUMERO_PATIENT": ("VLM_NUM_PATIENT", "DOSSIER"),
"NUMERO_LOT": ("VLM_NUM_LOT", "MASK"),
"NUMERO_ORDONNANCE":("VLM_NUM_ORD", "DOSSIER"),
"NUMERO_SEJOUR": ("VLM_NDA", "NDA"),
"NDA": ("VLM_NDA", "NDA"),
"SERVICE": ("VLM_SERVICE", "MASK"),
"ETABLISSEMENT": ("VLM_ETAB", "ETAB"),
"DATE": ("VLM_DATE", "DATE"),
"AGE": ("VLM_AGE", "AGE"),
}
# ---------------------------------------------------------------------------
# Prompt système
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT = (
"Tu identifies les données personnelles et identifiants traçables dans les documents "
"médicaux français. Réponds uniquement en JSON."
)
_USER_PROMPT_TEMPLATE = """\
Identifie TOUTES les informations permettant d'identifier un patient dans cette page de document médical.
Le document peut être pivoté ou manuscrit — lis dans toutes les orientations.
Catégories :
- NOM, PRENOM : noms et prénoms de patients, médecins, infirmiers, soignants
- ADRESSE : adresses postales
- TELEPHONE : numéros de téléphone
- DATE_NAISSANCE : dates de naissance
- DATE : toutes les autres dates (consultation, séjour, transfusion, intervention…)
- NIR : numéro de sécurité sociale
- IPP : identifiant permanent du patient (ex: BA172948)
- NDA : numéro de dossier administratif / numéro de séjour
- NUMERO_PATIENT : tout numéro identifiant un patient (numéro EFS, numéro d'ordonnance…)
- NUMERO_LOT : numéros de lots de produits sanguins (PSL), codes numériques de traçabilité
- CODE_POSTAL, VILLE : codes postaux et villes
- ETABLISSEMENT : noms d'hôpitaux, cliniques (ex: CH COTE BASQUE, CHU BORDEAUX)
- SERVICE : noms de services hospitaliers (ex: CANCEROLOGIE HDJ, REANIMATION)
- AGE : âge du patient (ex: 85A, 62 ans)
- RPPS : numéro RPPS du médecin
Règles :
- Texte EXACT visible sur l'image (copie fidèle, y compris manuscrit)
- Inclure TOUS les identifiants numériques (manuscrits ou imprimés)
- Inclure les noms d'établissements et de services hospitaliers
- Ne PAS inclure : médicaments, diagnostics, termes médicaux purs, résultats de labo
Réponds en JSON : {{"entites": [{{"categorie": "NOM", "texte": "DUPONT", "confiance": 0.95}}]}}
Si aucune PII : {{"entites": []}}"""
# ---------------------------------------------------------------------------
# VlmManager
# ---------------------------------------------------------------------------
class VlmManager:
"""Gestionnaire VLM via Ollama. Même pattern que NerModelManager."""
def __init__(self, config: Optional[VlmConfig] = None):
self._config = config or VlmConfig()
self._loaded = False
self._model_name: Optional[str] = None
# ---- public API ----
def is_loaded(self) -> bool:
return self._loaded
def load(self, model: Optional[str] = None) -> None:
"""Vérifie la connexion Ollama et la disponibilité du modèle."""
cfg = self._config
if model:
cfg.model = model
self._model_name = cfg.model
# 1) Vérifier qu'Ollama répond
try:
req = urllib.request.Request(
f"{cfg.base_url}/api/tags",
method="GET",
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
raise RuntimeError(f"Ollama indisponible ({cfg.base_url}) : {e}") from e
# 2) Vérifier que le modèle est disponible
available = [m.get("name", "") for m in data.get("models", [])]
# Normaliser : "qwen3-vl:8b" matche "qwen3-vl:8b" ou "qwen3-vl:8b-..."
model_found = any(
a == cfg.model or a.startswith(cfg.model.split(":")[0] + ":")
for a in available
)
if not model_found:
raise RuntimeError(
f"Modèle '{cfg.model}' non trouvé dans Ollama. "
f"Disponibles : {', '.join(available) or '(aucun)'}. "
f"Lancez : ollama pull {cfg.model}"
)
self._loaded = True
log.info("VLM prêt : %s via %s", cfg.model, cfg.base_url)
def unload(self) -> None:
self._loaded = False
self._model_name = None
@staticmethod
def models_catalog() -> Dict[str, str]:
return {
"Qwen2.5-VL 7B (Ollama)": "qwen2.5vl:7b",
"Qwen3-VL 8B (Ollama)": "qwen3-vl:8b",
}
# ---- analyse d'une page ----
def analyze_page_image(
self,
image: "Image.Image",
page_number: int = 0,
existing_pii: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Envoie une image de page à Ollama et retourne les entités détectées.
Returns:
Liste de dicts avec clés : categorie, texte, confiance
"""
if not self._loaded:
return []
if Image is None:
log.warning("Pillow non disponible, VLM ignoré")
return []
cfg = self._config
# Redimensionner l'image
img = _resize_image(image, cfg.max_image_size)
# Encoder en base64
img_b64 = _image_to_base64(img)
# Construire le prompt utilisateur
user_prompt = _USER_PROMPT_TEMPLATE
if existing_pii:
user_prompt += (
"\n\nPII déjà détectés (vérifie et cherche ceux qui manquent) : "
+ ", ".join(existing_pii[:20])
)
# Appel API Ollama
payload = {
"model": cfg.model,
"messages": [
{"role": "system", "content": _SYSTEM_PROMPT},
{
"role": "user",
"content": user_prompt,
"images": [img_b64],
},
],
"stream": False,
"options": {
"temperature": cfg.temperature,
"num_predict": cfg.num_predict,
},
}
try:
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
f"{cfg.base_url}/api/chat",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=cfg.timeout) as resp:
result = json.loads(resp.read().decode("utf-8"))
except urllib.error.URLError as e:
log.warning("VLM appel échoué (page %d) : %s", page_number, e)
return []
except Exception as e:
log.warning("VLM erreur inattendue (page %d) : %s", page_number, e)
return []
# Extraire le contenu de la réponse
# Qwen3 peut mettre la réponse dans "content" ou "thinking"
content = ""
msg = result.get("message", {})
if isinstance(msg, dict):
content = msg.get("content", "")
# Fallback : si content vide, chercher dans thinking (mode Qwen3)
if not content.strip():
content = msg.get("thinking", "")
elif isinstance(msg, str):
content = msg
# Parser le JSON de réponse (défensif)
entities = _parse_vlm_response(content)
log.info("VLM page %d : %d entités détectées", page_number, len(entities))
return entities
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resize_image(img: "Image.Image", max_size: int) -> "Image.Image":
"""Redimensionne l'image si un côté dépasse max_size, en conservant le ratio."""
w, h = img.size
if max(w, h) <= max_size:
return img
ratio = max_size / max(w, h)
new_w = int(w * ratio)
new_h = int(h * ratio)
return img.resize((new_w, new_h), Image.LANCZOS)
def _image_to_base64(img: "Image.Image") -> str:
"""Encode une image PIL en base64 (PNG)."""
buf = io.BytesIO()
img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode("ascii")
def _parse_vlm_response(content: str) -> List[Dict[str, Any]]:
"""Parse la réponse du VLM en liste d'entités. Gère JSON brut, markdown code blocks,
et JSON noyé dans du texte de raisonnement (thinking)."""
if not content or not content.strip():
return []
import re
text = content.strip()
# Tentative 1 : JSON direct
try:
data = json.loads(text)
return _extract_entities(data)
except json.JSONDecodeError:
pass
# Tentative 2 : extraire un bloc ```json ... ``` ou ``` ... ```
m = re.search(r"```(?:json)?\s*\n?(.*?)```", text, re.DOTALL)
if m:
try:
data = json.loads(m.group(1).strip())
return _extract_entities(data)
except json.JSONDecodeError:
pass
# Tentative 3 : chercher un bloc JSON contenant "entites" ou "entities"
# Gérer les accolades imbriquées en trouvant le bon bloc
for keyword in ['"entites"', '"entities"']:
idx = text.find(keyword)
if idx < 0:
continue
# Remonter jusqu'au { ouvrant
brace_start = text.rfind("{", 0, idx)
if brace_start < 0:
continue
# Trouver le } fermant correspondant (gestion profondeur)
depth = 0
for i in range(brace_start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
try:
data = json.loads(text[brace_start:i + 1])
return _extract_entities(data)
except json.JSONDecodeError:
break
break
# Tentative 4 : chercher le premier { ... } (fallback)
brace_start = text.find("{")
brace_end = text.rfind("}")
if brace_start >= 0 and brace_end > brace_start:
try:
data = json.loads(text[brace_start:brace_end + 1])
return _extract_entities(data)
except json.JSONDecodeError:
pass
# Tentative 5 : réparation JSON tronqué (num_predict dépassé)
# Le VLM a pu couper la réponse au milieu d'un objet entité
brace_start = text.find("{")
if brace_start >= 0:
fragment = text[brace_start:]
# Trouver la dernière entité complète (se terminant par })
last_complete = fragment.rfind("}")
if last_complete > 0:
truncated = fragment[:last_complete + 1]
# Fermer le tableau et l'objet si nécessaire
for suffix in ["", "]}", "]}}"]:
try:
data = json.loads(truncated + suffix)
entities = _extract_entities(data)
if entities:
log.info("VLM : JSON tronqué réparé (%d entités récupérées)", len(entities))
return entities
except json.JSONDecodeError:
continue
log.warning("VLM : impossible de parser la réponse JSON : %s", text[:200])
return []
def _extract_entities(data: Any) -> List[Dict[str, Any]]:
"""Extrait la liste d'entités depuis la structure JSON parsée."""
raw_list = []
if isinstance(data, dict):
# Structure attendue : {"entites": [...]}
raw_list = data.get("entites") or data.get("entities") or []
if not isinstance(raw_list, list):
raw_list = []
elif isinstance(data, list):
raw_list = data
result = []
for e in raw_list:
if not isinstance(e, dict):
continue
texte = e.get("texte") or e.get("text") or ""
if not texte:
continue
# Accepter les entités sans catégorie (default NOM)
categorie = e.get("categorie") or e.get("category") or "NOM"
result.append({
"categorie": categorie.upper(),
"texte": texte,
"confiance": float(e.get("confiance", e.get("confidence", 0.8))),
})
return result