"""role_mapper — reconstruction de champs ANCRÉS sur l'OCR. Principe cardinal (gate validé le 30/06 sur DPI urgences réel) : le VLM ne fournit QUE des ids de tokens OCR (`value_ids`) ; la valeur est reconstruite ici depuis l'OCR. Aucun texte produit par le VLM ne peut entrer dans une valeur → **0 hallucination par construction**. Ce module est volontairement PUR (pas d'appel réseau/VLM) : il prend les tokens OCR (issus de `core.llm.ocr_extractor.extract_grid_from_image`) et la réponse déjà désérialisée du VLM, et produit des champs ancrés. L'appel VLM lui-même est orchestré ailleurs (et mockable), pour rester testable hors-ligne. """ from __future__ import annotations import json from dataclasses import dataclass from typing import Callable, List, Optional, Sequence, Tuple BBox = Tuple[int, int, int, int] # (x_min, y_min, x_max, y_max) @dataclass class OcrToken: """Un token OCR indexé par un id stable.""" id: int text: str confidence: float = 1.0 bbox: Optional[BBox] = None @dataclass class MappedField: """Un champ {rôle → valeur} dont la valeur est 100% issue de l'OCR.""" label: str value: str value_ids: List[int] confidence: float bbox: Optional[BBox] anchored: bool invalid_ids: List[int] def _norm_bbox(bbox) -> Optional[BBox]: """Normalise une bbox en (x_min, y_min, x_max, y_max). Accepte soit 4 points EasyOCR `[[x,y], ...]`, soit un quadruplet déjà plat. """ if bbox is None: return None if len(bbox) == 4 and all(isinstance(v, (int, float)) for v in bbox): return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])) xs = [p[0] for p in bbox] ys = [p[1] for p in bbox] return (int(min(xs)), int(min(ys)), int(max(xs)), int(max(ys))) def tokens_from_grid(grid: Sequence[Sequence[dict]]) -> List[OcrToken]: """Convertit une grille `extract_grid_from_image` en tokens indexés (id séquentiel). L'ordre des ids suit l'ordre de lecture de la grille (lignes top→bottom, colonnes left→right), ce qui donne au VLM un référentiel stable. """ tokens: List[OcrToken] = [] tid = 0 for row in grid: for cell in row: tokens.append(OcrToken( id=tid, text=cell["text"], confidence=float(cell.get("confidence", 1.0)), bbox=_norm_bbox(cell.get("bbox")), )) tid += 1 return tokens def _enclosing_bbox(bboxes: Sequence[Optional[BBox]]) -> Optional[BBox]: present = [b for b in bboxes if b is not None] if not present: return None return ( min(b[0] for b in present), min(b[1] for b in present), max(b[2] for b in present), max(b[3] for b in present), ) def reconstruct_fields( tokens: Sequence[OcrToken], vlm_fields: Sequence[dict], ) -> List[MappedField]: """Reconstruit les champs à partir des tokens OCR et des `value_ids` du VLM. Pour chaque champ VLM `{label, value_ids:[...]}` : - déduplique les ids en préservant l'ordre de lecture donné par le VLM ; - filtre les ids hors OCR (listés dans `invalid_ids`) ; - reconstruit la valeur par concaténation des `text` des tokens valides ; - confidence = min des tokens ancrés (le plus prudent), bbox = englobante. Tout champ `value`/texte fourni par le VLM est IGNORÉ : seule la liste d'ids fait foi (anti-hallucination). """ by_id = {t.id: t for t in tokens} out: List[MappedField] = [] for vf in vlm_fields: label = vf.get("label", "") seen: List[int] = [] for i in (vf.get("value_ids") or []): if i not in seen: seen.append(i) valid = [i for i in seen if i in by_id] invalid = [i for i in seen if i not in by_id] toks = [by_id[i] for i in valid] out.append(MappedField( label=label, value=" ".join(t.text for t in toks), value_ids=valid, confidence=min((t.confidence for t in toks), default=0.0), bbox=_enclosing_bbox([t.bbox for t in toks]), anchored=bool(valid), invalid_ids=invalid, )) return out # --- Orchestration VLM (client injectable pour rester testable hors-ligne) --- # Un client VLM est un callable (image_path, prompt) -> texte de réponse. VlmClient = Callable[[str, str], str] def build_role_prompt( tokens: Sequence[OcrToken], roles: Optional[Sequence[str]] = None, ) -> str: """Construit le prompt d'attribution de rôles (ancrage strict par ids). Mode *guidé* si `roles` est fourni (rôles attendus de l'écran), sinon *libre* (le VLM nomme lui-même les champs). Dans les deux cas le VLM ne renvoie que des `value_ids` — jamais de texte recopié. """ ocr_list = [{"id": t.id, "text": t.text} for t in tokens] if roles: roles_line = ( "Rôles attendus sur cet écran (associe chacun s'il est présent) : " + ", ".join(roles) + ".\n" ) else: roles_line = ( "Identifie librement les champs présents — le 'label' est le rôle du champ.\n" ) return ( "Tu reçois une capture d'écran d'un dossier patient et la liste des tokens " "détectés par OCR (chaque token : id, text).\n" + roles_line + "Pour chaque champ, désigne les tokens OCR qui composent sa VALEUR.\n" "RÈGLES STRICTES :\n" "- Tu ne recopies AUCUN texte. Tu renvoies seulement 'value_ids' : la liste " "des id de tokens OCR (dans l'ordre de lecture) qui forment la valeur.\n" "- 'label' = le rôle du champ. N'invente aucun champ.\n" "- Réponds UNIQUEMENT en JSON PLAT :\n" '{"ecran":"","champs":[{"label":"...","value_ids":[,...]}]}\n\n' "Tokens OCR :\n" + json.dumps(ocr_list, ensure_ascii=False) ) def parse_vlm_json(text: str) -> dict: """Extrait le 1er objet JSON d'une réponse VLM (tolère les fences ```json). Robuste : renvoie `{}` si la réponse n'est pas du JSON exploitable (pas de crash en batch). """ if not text: return {} s = text.strip() if "```" in s: parts = s.split("```") if len(parts) >= 2: s = parts[1] if s.lstrip().lower().startswith("json"): s = s.lstrip()[4:] a, b = s.find("{"), s.rfind("}") if a < 0 or b <= a: return {} try: return json.loads(s[a:b + 1]) except (ValueError, TypeError): return {} def _norm_label(label: str) -> str: """Normalise un label pour comparaison : minuscules + strip espaces.""" return label.strip().lower() def assess_quality( fields: Sequence[MappedField], required_roles: Optional[Sequence[str]] = None, min_confidence: float = 0.6, ) -> str: """Évalue la qualité d'extraction d'un dossier à partir des champs reconstruits. Renvoie l'un des 4 statuts (par priorité décroissante) : - "failed" : aucun champ, OU aucun champ ancré. - "needs_review" : au moins un rôle requis absent ou non ancré. - "partial" : rôles requis ok mais confidence insuffisante OU champs non ancrés. - "complete" : tout ancré, toutes confidences >= min_confidence, aucun non ancré. Le matching required_role ↔ field.label est insensible à la casse et aux espaces. """ # --- failed : aucun champ du tout, ou aucun ancré --- anchored = [f for f in fields if f.anchored] if not fields or not anchored: return "failed" # --- needs_review : rôle requis absent ou non ancré --- if required_roles: anchored_labels = {_norm_label(f.label) for f in anchored} for role in required_roles: if _norm_label(role) not in anchored_labels: return "needs_review" # --- partial : confidence basse sur un champ ancré OU champs non ancrés --- has_low_confidence = any(f.confidence < min_confidence for f in anchored) has_unanchored = any(not f.anchored for f in fields) if has_low_confidence or has_unanchored: return "partial" # --- complete --- return "complete" def map_roles( image_path: str, tokens: Sequence[OcrToken], vlm_client: VlmClient, roles: Optional[Sequence[str]] = None, ) -> List[MappedField]: """Orchestre l'attribution de rôles : prompt → VLM → parse → reconstruction ancrée. `vlm_client` est injecté (testable hors-ligne). Le résultat est toujours ancré sur l'OCR via `reconstruct_fields`. """ prompt = build_role_prompt(tokens, roles) raw = vlm_client(image_path, prompt) data = parse_vlm_json(raw) vlm_fields = data.get("champs", []) if isinstance(data, dict) else [] return reconstruct_fields(tokens, vlm_fields)