complete / partial / needs_review / failed (priorité décroissante), matching rôle requis insensible casse+espaces, seuil min_confidence paramétrable (0.6). 16 tests ajoutés (31 au total, verts). Brique TDD via sous-agent, code révisé. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
250 lines
8.7 KiB
Python
250 lines
8.7 KiB
Python
"""role_mapper — reconstruction de champs ANCRÉS sur l'OCR.
|
|
|
|
Principe cardinal (gate validé le 30/06 sur DPI urgences réel) :
|
|
le VLM ne fournit QUE des ids de tokens OCR (`value_ids`) ; la valeur est
|
|
reconstruite ici depuis l'OCR. Aucun texte produit par le VLM ne peut entrer
|
|
dans une valeur → **0 hallucination par construction**.
|
|
|
|
Ce module est volontairement PUR (pas d'appel réseau/VLM) : il prend les tokens
|
|
OCR (issus de `core.llm.ocr_extractor.extract_grid_from_image`) et la réponse
|
|
déjà désérialisée du VLM, et produit des champs ancrés. L'appel VLM lui-même
|
|
est orchestré ailleurs (et mockable), pour rester testable hors-ligne.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Callable, List, Optional, Sequence, Tuple
|
|
|
|
BBox = Tuple[int, int, int, int] # (x_min, y_min, x_max, y_max)
|
|
|
|
|
|
@dataclass
|
|
class OcrToken:
|
|
"""Un token OCR indexé par un id stable."""
|
|
id: int
|
|
text: str
|
|
confidence: float = 1.0
|
|
bbox: Optional[BBox] = None
|
|
|
|
|
|
@dataclass
|
|
class MappedField:
|
|
"""Un champ {rôle → valeur} dont la valeur est 100% issue de l'OCR."""
|
|
label: str
|
|
value: str
|
|
value_ids: List[int]
|
|
confidence: float
|
|
bbox: Optional[BBox]
|
|
anchored: bool
|
|
invalid_ids: List[int]
|
|
|
|
|
|
def _norm_bbox(bbox) -> Optional[BBox]:
|
|
"""Normalise une bbox en (x_min, y_min, x_max, y_max).
|
|
|
|
Accepte soit 4 points EasyOCR `[[x,y], ...]`, soit un quadruplet déjà plat.
|
|
"""
|
|
if bbox is None:
|
|
return None
|
|
if len(bbox) == 4 and all(isinstance(v, (int, float)) for v in bbox):
|
|
return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))
|
|
xs = [p[0] for p in bbox]
|
|
ys = [p[1] for p in bbox]
|
|
return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys)))
|
|
|
|
|
|
def tokens_from_grid(grid: Sequence[Sequence[dict]]) -> List[OcrToken]:
|
|
"""Convertit une grille `extract_grid_from_image` en tokens indexés (id séquentiel).
|
|
|
|
L'ordre des ids suit l'ordre de lecture de la grille (lignes top→bottom,
|
|
colonnes left→right), ce qui donne au VLM un référentiel stable.
|
|
"""
|
|
tokens: List[OcrToken] = []
|
|
tid = 0
|
|
for row in grid:
|
|
for cell in row:
|
|
tokens.append(OcrToken(
|
|
id=tid,
|
|
text=cell["text"],
|
|
confidence=float(cell.get("confidence", 1.0)),
|
|
bbox=_norm_bbox(cell.get("bbox")),
|
|
))
|
|
tid += 1
|
|
return tokens
|
|
|
|
|
|
def _enclosing_bbox(bboxes: Sequence[Optional[BBox]]) -> Optional[BBox]:
|
|
present = [b for b in bboxes if b is not None]
|
|
if not present:
|
|
return None
|
|
return (
|
|
min(b[0] for b in present),
|
|
min(b[1] for b in present),
|
|
max(b[2] for b in present),
|
|
max(b[3] for b in present),
|
|
)
|
|
|
|
|
|
def reconstruct_fields(
|
|
tokens: Sequence[OcrToken],
|
|
vlm_fields: Sequence[dict],
|
|
) -> List[MappedField]:
|
|
"""Reconstruit les champs à partir des tokens OCR et des `value_ids` du VLM.
|
|
|
|
Pour chaque champ VLM `{label, value_ids:[...]}` :
|
|
- déduplique les ids en préservant l'ordre de lecture donné par le VLM ;
|
|
- filtre les ids hors OCR (listés dans `invalid_ids`) ;
|
|
- reconstruit la valeur par concaténation des `text` des tokens valides ;
|
|
- confidence = min des tokens ancrés (le plus prudent), bbox = englobante.
|
|
|
|
Tout champ `value`/texte fourni par le VLM est IGNORÉ : seule la liste
|
|
d'ids fait foi (anti-hallucination).
|
|
"""
|
|
by_id = {t.id: t for t in tokens}
|
|
out: List[MappedField] = []
|
|
for vf in vlm_fields:
|
|
label = vf.get("label", "")
|
|
seen: List[int] = []
|
|
for i in (vf.get("value_ids") or []):
|
|
if i not in seen:
|
|
seen.append(i)
|
|
valid = [i for i in seen if i in by_id]
|
|
invalid = [i for i in seen if i not in by_id]
|
|
toks = [by_id[i] for i in valid]
|
|
out.append(MappedField(
|
|
label=label,
|
|
value=" ".join(t.text for t in toks),
|
|
value_ids=valid,
|
|
confidence=min((t.confidence for t in toks), default=0.0),
|
|
bbox=_enclosing_bbox([t.bbox for t in toks]),
|
|
anchored=bool(valid),
|
|
invalid_ids=invalid,
|
|
))
|
|
return out
|
|
|
|
|
|
# --- Orchestration VLM (client injectable pour rester testable hors-ligne) ---
|
|
|
|
# Un client VLM est un callable (image_path, prompt) -> texte de réponse.
|
|
VlmClient = Callable[[str, str], str]
|
|
|
|
|
|
def build_role_prompt(
|
|
tokens: Sequence[OcrToken],
|
|
roles: Optional[Sequence[str]] = None,
|
|
) -> str:
|
|
"""Construit le prompt d'attribution de rôles (ancrage strict par ids).
|
|
|
|
Mode *guidé* si `roles` est fourni (rôles attendus de l'écran), sinon *libre*
|
|
(le VLM nomme lui-même les champs). Dans les deux cas le VLM ne renvoie que
|
|
des `value_ids` — jamais de texte recopié.
|
|
"""
|
|
ocr_list = [{"id": t.id, "text": t.text} for t in tokens]
|
|
if roles:
|
|
roles_line = (
|
|
"Rôles attendus sur cet écran (associe chacun s'il est présent) : "
|
|
+ ", ".join(roles) + ".\n"
|
|
)
|
|
else:
|
|
roles_line = (
|
|
"Identifie librement les champs présents — le 'label' est le rôle du champ.\n"
|
|
)
|
|
return (
|
|
"Tu reçois une capture d'écran d'un dossier patient et la liste des tokens "
|
|
"détectés par OCR (chaque token : id, text).\n"
|
|
+ roles_line +
|
|
"Pour chaque champ, désigne les tokens OCR qui composent sa VALEUR.\n"
|
|
"RÈGLES STRICTES :\n"
|
|
"- Tu ne recopies AUCUN texte. Tu renvoies seulement 'value_ids' : la liste "
|
|
"des id de tokens OCR (dans l'ordre de lecture) qui forment la valeur.\n"
|
|
"- 'label' = le rôle du champ. N'invente aucun champ.\n"
|
|
"- Réponds UNIQUEMENT en JSON PLAT :\n"
|
|
'{"ecran":"<type en 3 mots>","champs":[{"label":"...","value_ids":[<int>,...]}]}\n\n'
|
|
"Tokens OCR :\n" + json.dumps(ocr_list, ensure_ascii=False)
|
|
)
|
|
|
|
|
|
def parse_vlm_json(text: str) -> dict:
|
|
"""Extrait le 1er objet JSON d'une réponse VLM (tolère les fences ```json).
|
|
|
|
Robuste : renvoie `{}` si la réponse n'est pas du JSON exploitable (pas de
|
|
crash en batch).
|
|
"""
|
|
if not text:
|
|
return {}
|
|
s = text.strip()
|
|
if "```" in s:
|
|
parts = s.split("```")
|
|
if len(parts) >= 2:
|
|
s = parts[1]
|
|
if s.lstrip().lower().startswith("json"):
|
|
s = s.lstrip()[4:]
|
|
a, b = s.find("{"), s.rfind("}")
|
|
if a < 0 or b <= a:
|
|
return {}
|
|
try:
|
|
return json.loads(s[a:b + 1])
|
|
except (ValueError, TypeError):
|
|
return {}
|
|
|
|
|
|
def _norm_label(label: str) -> str:
|
|
"""Normalise un label pour comparaison : minuscules + strip espaces."""
|
|
return label.strip().lower()
|
|
|
|
|
|
def assess_quality(
|
|
fields: Sequence[MappedField],
|
|
required_roles: Optional[Sequence[str]] = None,
|
|
min_confidence: float = 0.6,
|
|
) -> str:
|
|
"""Évalue la qualité d'extraction d'un dossier à partir des champs reconstruits.
|
|
|
|
Renvoie l'un des 4 statuts (par priorité décroissante) :
|
|
- "failed" : aucun champ, OU aucun champ ancré.
|
|
- "needs_review" : au moins un rôle requis absent ou non ancré.
|
|
- "partial" : rôles requis ok mais confidence insuffisante OU champs non ancrés.
|
|
- "complete" : tout ancré, toutes confidences >= min_confidence, aucun non ancré.
|
|
|
|
Le matching required_role ↔ field.label est insensible à la casse et aux espaces.
|
|
"""
|
|
# --- failed : aucun champ du tout, ou aucun ancré ---
|
|
anchored = [f for f in fields if f.anchored]
|
|
if not fields or not anchored:
|
|
return "failed"
|
|
|
|
# --- needs_review : rôle requis absent ou non ancré ---
|
|
if required_roles:
|
|
anchored_labels = {_norm_label(f.label) for f in anchored}
|
|
for role in required_roles:
|
|
if _norm_label(role) not in anchored_labels:
|
|
return "needs_review"
|
|
|
|
# --- partial : confidence basse sur un champ ancré OU champs non ancrés ---
|
|
has_low_confidence = any(f.confidence < min_confidence for f in anchored)
|
|
has_unanchored = any(not f.anchored for f in fields)
|
|
if has_low_confidence or has_unanchored:
|
|
return "partial"
|
|
|
|
# --- complete ---
|
|
return "complete"
|
|
|
|
|
|
def map_roles(
|
|
image_path: str,
|
|
tokens: Sequence[OcrToken],
|
|
vlm_client: VlmClient,
|
|
roles: Optional[Sequence[str]] = None,
|
|
) -> List[MappedField]:
|
|
"""Orchestre l'attribution de rôles : prompt → VLM → parse → reconstruction ancrée.
|
|
|
|
`vlm_client` est injecté (testable hors-ligne). Le résultat est toujours
|
|
ancré sur l'OCR via `reconstruct_fields`.
|
|
"""
|
|
prompt = build_role_prompt(tokens, roles)
|
|
raw = vlm_client(image_path, prompt)
|
|
data = parse_vlm_json(raw)
|
|
vlm_fields = data.get("champs", []) if isinstance(data, dict) else []
|
|
return reconstruct_fields(tokens, vlm_fields)
|