921 lines
34 KiB
Python
921 lines
34 KiB
Python
"""Phase 2.5 — Analyseur sémantique post-apprentissage.
|
|
|
|
Module isolé qui prend en entrée un ensemble de screenshots capturés
|
|
pendant la phase Shadow et produit un payload structuré
|
|
``{tables, forms, buttons, text_blocks}`` par écran distinct,
|
|
stocké dans un fichier ``.semantic.yaml`` séparé.
|
|
|
|
Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md``
|
|
|
|
Garde-fous :
|
|
- Wrapper try/except global autour de chaque appel OmniParser.
|
|
- Fallback OCR-seul (docTR) si OmniParser indisponible ou KO.
|
|
- Healthcheck OmniParser au démarrage : KO ⇒ bascule auto en dégradé.
|
|
- Cache disque ``data/cache/omniparser/<session>/<index>.json``.
|
|
- Cap 10 écrans distincts par session.
|
|
- Aucun import de FastAPI, aucun appel réseau direct.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import concurrent.futures
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
import traceback
|
|
from dataclasses import asdict, dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Iterable, List, Optional, Sequence, Tuple
|
|
|
|
try: # pragma: no cover - dépendance externe déjà présente dans le projet
|
|
import yaml
|
|
except ImportError as exc: # pragma: no cover
|
|
raise RuntimeError("PyYAML est requis pour core.semantic.phase25_analyzer") from exc
|
|
|
|
try: # PIL toujours présent côté Linux dev / DGX
|
|
from PIL import Image
|
|
_HAS_PIL = True
|
|
except ImportError: # pragma: no cover
|
|
Image = None # type: ignore[assignment]
|
|
_HAS_PIL = False
|
|
|
|
try:
|
|
import imagehash # type: ignore
|
|
_HAS_IMAGEHASH = True
|
|
except ImportError: # pragma: no cover - fallback MD5 thumbnail
|
|
imagehash = None # type: ignore[assignment]
|
|
_HAS_IMAGEHASH = False
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Constantes et chemins
|
|
# ----------------------------------------------------------------------------
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
DATA_ROOT = REPO_ROOT / "data"
|
|
SEMANTIC_DIR = DATA_ROOT / "competences" / "candidate"
|
|
OMNIPARSER_CACHE_ROOT = DATA_ROOT / "cache" / "omniparser"
|
|
OMNIPARSER_CACHE_DIR = OMNIPARSER_CACHE_ROOT # alias public
|
|
LOGS_DIR = REPO_ROOT / "logs"
|
|
OMNIPARSER_ERROR_LOG = LOGS_DIR / "omniparser_errors.log"
|
|
|
|
# Heuristique de regroupement perceptuel (cf. specs §3).
|
|
PHASH_HAMMING_THRESHOLD = 8
|
|
MAX_SCREENS_PER_SESSION = 10
|
|
THUMBNAIL_SIZE = (256, 256) # fallback MD5
|
|
|
|
# Timeout par screenshot (cf. specs §2).
|
|
OMNIPARSER_TIMEOUT_SEC = 30.0
|
|
|
|
# Slug autorisé (réutilisation du pattern persist : a-z0-9_).
|
|
SLUG_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,79}$")
|
|
# session_id autorisé : caractères inoffensifs uniquement.
|
|
SESSION_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_\-]{0,127}$")
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Dataclasses
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class SemanticStructure:
|
|
"""Structure sémantique d'un écran (cf. specs §2)."""
|
|
|
|
tables: List[dict] = field(default_factory=list)
|
|
forms: List[dict] = field(default_factory=list)
|
|
buttons: List[dict] = field(default_factory=list)
|
|
text_blocks: List[dict] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"tables": list(self.tables),
|
|
"forms": list(self.forms),
|
|
"buttons": list(self.buttons),
|
|
"text_blocks": list(self.text_blocks),
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ScreenAnalysis:
|
|
"""Analyse d'un écran représentatif (cf. specs §3)."""
|
|
|
|
index: int
|
|
phash: str
|
|
screen_id: str
|
|
screenshot_path: Optional[str]
|
|
structure: SemanticStructure
|
|
degraded: bool = False
|
|
degraded_reason: Optional[str] = None
|
|
elapsed_sec: float = 0.0
|
|
window_title: Optional[str] = None
|
|
# Snapshot "contrat Codex" : représentation aplatie destinée à
|
|
# l'agent-chat / dashboard. Calculée à la volée par to_dict().
|
|
|
|
def to_dict(self) -> dict:
|
|
elements = _structure_to_elements(self.structure)
|
|
return {
|
|
"index": self.index,
|
|
"hash": self.phash,
|
|
"screen_id": self.screen_id,
|
|
"window_title": self.window_title,
|
|
"screenshot_path": self.screenshot_path,
|
|
"structure": self.structure.to_dict(),
|
|
"elements": elements,
|
|
"degraded": self.degraded,
|
|
"degraded_reason": self.degraded_reason,
|
|
"elapsed_sec": round(self.elapsed_sec, 3),
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Phase25Result:
|
|
"""Résultat global d'une analyse Phase 2.5."""
|
|
|
|
session_id: str
|
|
generated_at: str
|
|
omniparser_available: bool
|
|
degraded: bool
|
|
too_complex: bool
|
|
screens: List[ScreenAnalysis] = field(default_factory=list)
|
|
healthcheck_passed: bool = True
|
|
healthcheck_reason: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"session_id": self.session_id,
|
|
"generated_at": self.generated_at,
|
|
"omniparser_available": self.omniparser_available,
|
|
"degraded": self.degraded,
|
|
"too_complex": self.too_complex,
|
|
"healthcheck_passed": self.healthcheck_passed,
|
|
"healthcheck_reason": self.healthcheck_reason,
|
|
"screens": [s.to_dict() for s in self.screens],
|
|
}
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Helpers : validation et FS
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
def _validate_session_id(session_id: Any) -> str:
|
|
if not isinstance(session_id, str) or not session_id.strip():
|
|
raise ValueError("session_id doit etre une chaine non vide")
|
|
sid = session_id.strip()
|
|
if not SESSION_ID_PATTERN.match(sid):
|
|
raise ValueError(
|
|
"session_id invalide (autorise : [A-Za-z0-9][A-Za-z0-9_-]{0,127})"
|
|
)
|
|
# Anti path-traversal de ceinture-bretelles : on refuse explicitement
|
|
# toute tentative ../ même si le regex ne devrait pas la laisser passer.
|
|
if ".." in sid or "/" in sid or "\\" in sid:
|
|
raise ValueError("session_id invalide (path-traversal interdit)")
|
|
return sid
|
|
|
|
|
|
def _validate_slug(slug: Any) -> str:
|
|
if not isinstance(slug, str):
|
|
raise ValueError("slug doit etre une chaine")
|
|
s = slug.strip()
|
|
if not SLUG_PATTERN.match(s):
|
|
raise ValueError(
|
|
f"slug invalide '{s}' (regle : {SLUG_PATTERN.pattern})"
|
|
)
|
|
return s
|
|
|
|
|
|
def _ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _log_omniparser_error(session_id: str, frame_index: int, exc: BaseException) -> None:
|
|
"""Append-only sur ``logs/omniparser_errors.log`` (cf. specs §7)."""
|
|
try:
|
|
_ensure_dir(LOGS_DIR)
|
|
entry = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"session_id": session_id,
|
|
"frame_index": frame_index,
|
|
"error_type": type(exc).__name__,
|
|
"error_message": str(exc),
|
|
"traceback": traceback.format_exception_only(type(exc), exc),
|
|
}
|
|
with OMNIPARSER_ERROR_LOG.open("a", encoding="utf-8") as fh:
|
|
fh.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
|
except OSError as log_exc: # pragma: no cover - log best-effort
|
|
logger.warning("[PHASE25] echec ecriture omniparser_errors.log : %s", log_exc)
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Hash perceptuel (avec fallback MD5)
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
def compute_phash(image: "Image.Image") -> str:
|
|
"""Calcule un hash perceptuel ou un hash MD5 thumbnail (fallback)."""
|
|
if _HAS_IMAGEHASH and imagehash is not None:
|
|
try:
|
|
return str(imagehash.phash(image))
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("[PHASE25] phash imagehash KO, fallback MD5 : %s", exc)
|
|
# Fallback MD5 sur thumbnail.
|
|
thumb = image.copy()
|
|
thumb.thumbnail(THUMBNAIL_SIZE)
|
|
buf = io.BytesIO()
|
|
thumb.convert("RGB").save(buf, format="PNG")
|
|
return "md5:" + hashlib.md5(buf.getvalue()).hexdigest()
|
|
|
|
|
|
def _hamming_distance(h1: str, h2: str) -> int:
|
|
"""Distance de Hamming entre deux phash imagehash, ou fallback MD5.
|
|
|
|
- Cas imagehash : on reconvertit via ``imagehash.hex_to_hash``.
|
|
- Cas MD5 (préfixe ``md5:``) : 0 si égal, sinon distance "haute" pour ne
|
|
jamais les considérer comme similaires (heuristique conservative).
|
|
"""
|
|
if h1.startswith("md5:") or h2.startswith("md5:"):
|
|
return 0 if h1 == h2 else PHASH_HAMMING_THRESHOLD + 1
|
|
if not _HAS_IMAGEHASH or imagehash is None:
|
|
# Pas d'imagehash mais les hashes hex présents (rare) : XOR brut.
|
|
try:
|
|
i1 = int(h1, 16)
|
|
i2 = int(h2, 16)
|
|
return bin(i1 ^ i2).count("1")
|
|
except ValueError:
|
|
return PHASH_HAMMING_THRESHOLD + 1
|
|
try:
|
|
return abs(imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2))
|
|
except Exception:
|
|
return PHASH_HAMMING_THRESHOLD + 1
|
|
|
|
|
|
def identify_distinct_screens(
|
|
frames: Sequence[Tuple[int, "Image.Image"]],
|
|
threshold: int = PHASH_HAMMING_THRESHOLD,
|
|
) -> List[Tuple[int, "Image.Image", str]]:
|
|
"""Regroupe les frames par similarité phash et retourne un représentant par groupe.
|
|
|
|
Args:
|
|
frames: séquence ``(frame_index, PIL.Image)``.
|
|
threshold: Hamming distance max pour considérer deux frames identiques.
|
|
|
|
Returns:
|
|
Liste ``(frame_index, image, phash)`` — un représentant par groupe,
|
|
dans l'ordre temporel d'apparition (premier vu = représentant).
|
|
"""
|
|
representatives: List[Tuple[int, Image.Image, str]] = []
|
|
for idx, img in frames:
|
|
h = compute_phash(img)
|
|
matched = False
|
|
for ridx, _rimg, rhash in representatives:
|
|
if _hamming_distance(h, rhash) <= threshold:
|
|
matched = True
|
|
logger.debug(
|
|
"[PHASE25] frame %d regroupee avec representant %d (phash=%s)",
|
|
idx, ridx, h,
|
|
)
|
|
break
|
|
if not matched:
|
|
representatives.append((idx, img, h))
|
|
return representatives
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Conversion structure ⇄ "elements" (contrat Codex)
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
def _structure_to_elements(struct: SemanticStructure) -> List[dict]:
|
|
"""Aplatissement structure -> liste d'éléments {kind, label, bbox, confidence}."""
|
|
elements: List[dict] = []
|
|
for tbl in struct.tables:
|
|
elements.append({
|
|
"kind": "table",
|
|
"label": tbl.get("label", "table"),
|
|
"bbox": tbl.get("bbox", []),
|
|
"confidence": float(tbl.get("confidence", 0.5)),
|
|
})
|
|
for frm in struct.forms:
|
|
elements.append({
|
|
"kind": "field",
|
|
"label": frm.get("label", "field"),
|
|
"bbox": frm.get("bbox", []),
|
|
"confidence": float(frm.get("confidence", 0.5)),
|
|
})
|
|
for btn in struct.buttons:
|
|
elements.append({
|
|
"kind": "button",
|
|
"label": btn.get("label", "button"),
|
|
"bbox": btn.get("bbox", []),
|
|
"confidence": float(btn.get("confidence", 0.5)),
|
|
})
|
|
for tb in struct.text_blocks:
|
|
elements.append({
|
|
"kind": "text_block",
|
|
"label": tb.get("label", tb.get("text", "")),
|
|
"bbox": tb.get("bbox", []),
|
|
"confidence": float(tb.get("confidence", 0.5)),
|
|
})
|
|
return elements
|
|
|
|
|
|
def _classify_element(label: str, kind_hint: str | None = None) -> str:
|
|
"""Heuristique de classification d'un élément OmniParser.
|
|
|
|
Cohérente avec ``OmniParserAdapter._classify_element``, mais retourne
|
|
nos catégories sémantiques : ``table | field | button | text_block``.
|
|
"""
|
|
lab = (label or "").lower()
|
|
if kind_hint:
|
|
kh = kind_hint.lower()
|
|
if "table" in kh:
|
|
return "table"
|
|
if "input" in kh or "field" in kh or "edit" in kh:
|
|
return "field"
|
|
if "button" in kh or "btn" in kh:
|
|
return "button"
|
|
if any(kw in lab for kw in ("button", "btn", "submit", "valider", "annuler", "ok", "close")):
|
|
return "button"
|
|
if any(kw in lab for kw in ("input", "field", "saisie", "textbox", "champ")):
|
|
return "field"
|
|
if "table" in lab or "grille" in lab:
|
|
return "table"
|
|
return "text_block"
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Adapter wrappers : OmniParser et docTR (fallback)
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
class _OmniParserSafeWrapper:
|
|
"""Wrap fragile OmniParserAdapter avec garde-fou anti-exception.
|
|
|
|
- Import paresseux (lazy) pour ne pas casser l'import du module si
|
|
OmniParser n'est pas installé.
|
|
- ``available=False`` ⇒ caller bascule en fallback OCR-seul.
|
|
- Timeout effectif appliqué autour de chaque appel ``detect`` via
|
|
``ThreadPoolExecutor`` + ``future.result(timeout=...)``.
|
|
"""
|
|
|
|
# Executor module-level pour ne pas créer un pool par appel.
|
|
_TIMEOUT_EXECUTOR: Optional[concurrent.futures.ThreadPoolExecutor] = None
|
|
|
|
@classmethod
|
|
def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor:
|
|
if cls._TIMEOUT_EXECUTOR is None:
|
|
cls._TIMEOUT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
|
|
max_workers=2, thread_name_prefix="phase25-omniparser-timeout",
|
|
)
|
|
return cls._TIMEOUT_EXECUTOR
|
|
|
|
def __init__(self) -> None:
|
|
self._adapter: Any = None
|
|
self._available: bool = False
|
|
self._import_error: Optional[str] = None
|
|
self._try_import()
|
|
|
|
def _try_import(self) -> None:
|
|
try:
|
|
from core.detection.omniparser_adapter import OmniParserAdapter # type: ignore
|
|
self._adapter = OmniParserAdapter()
|
|
self._available = bool(getattr(self._adapter, "available", False))
|
|
if not self._available:
|
|
# L'adapter existe mais le check de disponibilité a échoué.
|
|
self._import_error = "OmniParser adapter installé mais modèles non disponibles"
|
|
except Exception as exc:
|
|
self._adapter = None
|
|
self._available = False
|
|
self._import_error = f"{type(exc).__name__}: {exc}"
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
return self._available
|
|
|
|
@property
|
|
def import_error(self) -> Optional[str]:
|
|
return self._import_error
|
|
|
|
def detect(
|
|
self,
|
|
image: "Image.Image",
|
|
*,
|
|
timeout: Optional[float] = None,
|
|
) -> List[Any]:
|
|
"""Appel sécurisé : enrobé d'un timeout dur, lève en cas d'exception.
|
|
|
|
Args:
|
|
image: image PIL à analyser.
|
|
timeout: timeout en secondes (défaut : ``OMNIPARSER_TIMEOUT_SEC``).
|
|
Si dépassé ⇒ ``concurrent.futures.TimeoutError`` propagée au
|
|
caller, qui bascule en fallback docTR + ``degraded=True``.
|
|
"""
|
|
if not self._available or self._adapter is None:
|
|
return []
|
|
effective_timeout = (
|
|
timeout if timeout is not None else OMNIPARSER_TIMEOUT_SEC
|
|
)
|
|
executor = self._get_executor()
|
|
future = executor.submit(self._adapter.detect, image)
|
|
try:
|
|
return list(future.result(timeout=effective_timeout))
|
|
except concurrent.futures.TimeoutError as exc:
|
|
# Le thread OmniParser continue son travail en arrière-plan mais
|
|
# le résultat est ignoré ; le caller bascule en fallback docTR.
|
|
logger.warning(
|
|
"[PHASE25] OmniParser.detect timeout (%.1fs) -> fallback",
|
|
effective_timeout,
|
|
)
|
|
raise
|
|
except Exception as exc:
|
|
logger.warning("[PHASE25] OmniParser.detect KO : %s", exc)
|
|
raise # remonté au caller pour log + fallback
|
|
|
|
|
|
def _detect_via_omniparser(
|
|
wrapper: _OmniParserSafeWrapper,
|
|
image: "Image.Image",
|
|
*,
|
|
timeout: Optional[float] = None,
|
|
) -> List[Any]:
|
|
return wrapper.detect(image, timeout=timeout)
|
|
|
|
|
|
def _detect_via_doctr(image: "Image.Image", screenshot_path: Optional[str]) -> List[dict]:
|
|
"""Fallback OCR-seul (docTR). Retourne une liste de text_blocks bruts.
|
|
|
|
Aucun VLM, aucune classification fine — juste OCR ⇒ ``text_blocks``.
|
|
"""
|
|
if not _HAS_PIL or image is None:
|
|
return []
|
|
try:
|
|
from doctr.io import DocumentFile # type: ignore
|
|
from doctr.models import ocr_predictor # type: ignore
|
|
except ImportError:
|
|
logger.info("[PHASE25] docTR non disponible pour fallback OCR")
|
|
return []
|
|
|
|
# Cache predictor module-level pour éviter rechargement.
|
|
global _DOCTR_PREDICTOR
|
|
try:
|
|
_DOCTR_PREDICTOR # type: ignore[used-before-def]
|
|
except NameError:
|
|
_DOCTR_PREDICTOR = None # type: ignore[assignment]
|
|
|
|
try:
|
|
if _DOCTR_PREDICTOR is None: # type: ignore[has-type]
|
|
_DOCTR_PREDICTOR = ocr_predictor( # type: ignore[assignment]
|
|
det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True,
|
|
)
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("[PHASE25] docTR init KO : %s", exc)
|
|
return []
|
|
|
|
# docTR prend un fichier ou un array numpy ; on privilégie le chemin si fourni.
|
|
blocks: List[dict] = []
|
|
try:
|
|
if screenshot_path and Path(screenshot_path).exists():
|
|
doc = DocumentFile.from_images([screenshot_path])
|
|
else:
|
|
buf = io.BytesIO()
|
|
image.convert("RGB").save(buf, format="PNG")
|
|
buf.seek(0)
|
|
doc = DocumentFile.from_images([buf.getvalue()])
|
|
result = _DOCTR_PREDICTOR(doc) # type: ignore[misc]
|
|
W, H = image.size
|
|
for page in result.pages:
|
|
for block in page.blocks:
|
|
for line_obj in block.lines:
|
|
text = " ".join(w.value for w in line_obj.words).strip()
|
|
if not text:
|
|
continue
|
|
geom = line_obj.geometry # ((x1,y1), (x2,y2)) norm 0-1
|
|
x1 = int(geom[0][0] * W)
|
|
y1 = int(geom[0][1] * H)
|
|
x2 = int(geom[1][0] * W)
|
|
y2 = int(geom[1][1] * H)
|
|
blocks.append({
|
|
"label": text,
|
|
"text": text,
|
|
"bbox": [x1, y1, x2, y2],
|
|
"confidence": 0.6, # docTR ne donne pas de score line-level facilement
|
|
})
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("[PHASE25] docTR predict KO : %s", exc)
|
|
return []
|
|
|
|
return blocks
|
|
|
|
|
|
def _elements_to_structure(elements: Iterable[Any]) -> SemanticStructure:
|
|
"""Convertit la liste OmniParser ``DetectedElement`` en SemanticStructure."""
|
|
struct = SemanticStructure()
|
|
for el in elements:
|
|
# Compatible avec DetectedElement (dataclass) et dict.
|
|
if hasattr(el, "label"):
|
|
label = getattr(el, "label", "") or ""
|
|
bbox = list(getattr(el, "bbox", ()) or ())
|
|
conf = float(getattr(el, "confidence", 0.5) or 0.5)
|
|
kind_hint = getattr(el, "element_type", None)
|
|
elif isinstance(el, dict):
|
|
label = str(el.get("label") or el.get("text") or "")
|
|
bbox = list(el.get("bbox") or [])
|
|
conf = float(el.get("confidence", el.get("score", 0.5)) or 0.5)
|
|
kind_hint = el.get("element_type") or el.get("type")
|
|
else:
|
|
continue
|
|
|
|
kind = _classify_element(label, kind_hint)
|
|
entry = {"label": label, "bbox": bbox, "confidence": conf}
|
|
if kind == "table":
|
|
struct.tables.append(entry)
|
|
elif kind == "field":
|
|
struct.forms.append(entry)
|
|
elif kind == "button":
|
|
struct.buttons.append(entry)
|
|
else:
|
|
struct.text_blocks.append({**entry, "text": label})
|
|
return struct
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Cache disque
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
def _cache_path(session_id: str, frame_index: int) -> Path:
|
|
sid = _validate_session_id(session_id)
|
|
return OMNIPARSER_CACHE_ROOT / sid / f"{int(frame_index)}.json"
|
|
|
|
|
|
def _cache_read(session_id: str, frame_index: int) -> Optional[dict]:
|
|
path = _cache_path(session_id, frame_index)
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
with path.open("r", encoding="utf-8") as fh:
|
|
return json.load(fh)
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
logger.warning("[PHASE25] cache illisible %s : %s", path, exc)
|
|
return None
|
|
|
|
|
|
def _cache_write(session_id: str, frame_index: int, payload: dict) -> None:
|
|
path = _cache_path(session_id, frame_index)
|
|
try:
|
|
_ensure_dir(path.parent)
|
|
tmp = path.with_suffix(".json.tmp")
|
|
with tmp.open("w", encoding="utf-8") as fh:
|
|
json.dump(payload, fh, ensure_ascii=False, indent=2)
|
|
tmp.replace(path)
|
|
except OSError as exc: # pragma: no cover
|
|
logger.warning("[PHASE25] cache ecriture KO %s : %s", path, exc)
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Analyseur principal
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
class Phase25Analyzer:
|
|
"""Analyseur sémantique post-apprentissage.
|
|
|
|
Usage minimal :
|
|
|
|
analyzer = Phase25Analyzer(session_id="abc123")
|
|
result = analyzer.analyze_frames(frames=[(0, img0), (12, img12), ...])
|
|
path = analyzer.write_semantic_yaml(result, slug="ma_competence")
|
|
|
|
``frames`` est une séquence ``(frame_index, PIL.Image[, screenshot_path])``.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
session_id: str,
|
|
*,
|
|
omniparser: Optional[_OmniParserSafeWrapper] = None,
|
|
max_screens: int = MAX_SCREENS_PER_SESSION,
|
|
timeout_sec: float = OMNIPARSER_TIMEOUT_SEC,
|
|
) -> None:
|
|
self.session_id = _validate_session_id(session_id)
|
|
self.omniparser = omniparser if omniparser is not None else _OmniParserSafeWrapper()
|
|
self.max_screens = max_screens
|
|
self.timeout_sec = timeout_sec
|
|
self._healthcheck_passed = True
|
|
self._healthcheck_reason: Optional[str] = None
|
|
|
|
# -- healthcheck -------------------------------------------------------
|
|
|
|
def healthcheck(self) -> bool:
|
|
"""Vérifie qu'OmniParser répond sur une image bidon (cf. specs §7).
|
|
|
|
- Si l'adapter est ``available=False`` ⇒ healthcheck KO (mais on
|
|
continuera quand même en mode dégradé OCR-seul).
|
|
- Si l'adapter lève une exception ⇒ KO + log dédié.
|
|
"""
|
|
if not _HAS_PIL:
|
|
self._healthcheck_passed = False
|
|
self._healthcheck_reason = "PIL indisponible"
|
|
return False
|
|
if not self.omniparser.available:
|
|
self._healthcheck_passed = False
|
|
self._healthcheck_reason = (
|
|
self.omniparser.import_error or "OmniParser indisponible"
|
|
)
|
|
return False
|
|
try:
|
|
dummy = Image.new("RGB", (64, 64), color=(255, 255, 255))
|
|
_ = self.omniparser.detect(dummy, timeout=self.timeout_sec)
|
|
self._healthcheck_passed = True
|
|
self._healthcheck_reason = None
|
|
return True
|
|
except Exception as exc:
|
|
_log_omniparser_error(self.session_id, -1, exc)
|
|
self._healthcheck_passed = False
|
|
self._healthcheck_reason = f"{type(exc).__name__}: {exc}"
|
|
return False
|
|
|
|
# -- analyse écran ----------------------------------------------------
|
|
|
|
def analyze_screen(
|
|
self,
|
|
frame_index: int,
|
|
image: "Image.Image",
|
|
phash: str,
|
|
*,
|
|
screenshot_path: Optional[str] = None,
|
|
window_title: Optional[str] = None,
|
|
force_fallback: bool = False,
|
|
) -> ScreenAnalysis:
|
|
"""Analyse un écran représentatif.
|
|
|
|
Stratégie :
|
|
1. Cache disque (idempotence par session_id+frame_index).
|
|
2. OmniParser via wrapper safe → sinon fallback OCR-seul docTR.
|
|
3. Exception ⇒ log dédié + ``degraded=True`` + structure docTR.
|
|
"""
|
|
# 1. Cache
|
|
cached = _cache_read(self.session_id, frame_index)
|
|
if cached is not None:
|
|
struct = SemanticStructure(
|
|
tables=cached.get("structure", {}).get("tables", []),
|
|
forms=cached.get("structure", {}).get("forms", []),
|
|
buttons=cached.get("structure", {}).get("buttons", []),
|
|
text_blocks=cached.get("structure", {}).get("text_blocks", []),
|
|
)
|
|
return ScreenAnalysis(
|
|
index=frame_index,
|
|
phash=cached.get("phash", phash),
|
|
screen_id=cached.get("screen_id", f"screen_{frame_index:03d}"),
|
|
screenshot_path=cached.get("screenshot_path", screenshot_path),
|
|
structure=struct,
|
|
degraded=bool(cached.get("degraded", False)),
|
|
degraded_reason=cached.get("degraded_reason"),
|
|
elapsed_sec=float(cached.get("elapsed_sec", 0.0)),
|
|
window_title=cached.get("window_title", window_title),
|
|
)
|
|
|
|
t0 = time.monotonic()
|
|
degraded = False
|
|
degraded_reason: Optional[str] = None
|
|
structure: SemanticStructure
|
|
|
|
use_omniparser = self.omniparser.available and not force_fallback
|
|
if use_omniparser:
|
|
try:
|
|
elements = _detect_via_omniparser(
|
|
self.omniparser, image, timeout=self.timeout_sec,
|
|
)
|
|
structure = _elements_to_structure(elements)
|
|
if not (structure.tables or structure.forms or structure.buttons or structure.text_blocks):
|
|
# OmniParser n'a rien produit : on ajoute en complément docTR text_blocks.
|
|
blocks = _detect_via_doctr(image, screenshot_path)
|
|
structure.text_blocks.extend(blocks)
|
|
except Exception as exc:
|
|
_log_omniparser_error(self.session_id, frame_index, exc)
|
|
degraded = True
|
|
degraded_reason = f"omniparser_exception: {type(exc).__name__}"
|
|
blocks = _detect_via_doctr(image, screenshot_path)
|
|
structure = SemanticStructure(text_blocks=blocks)
|
|
else:
|
|
degraded = True
|
|
degraded_reason = (
|
|
"omniparser_unavailable: " + (self.omniparser.import_error or "n/a")
|
|
if not self.omniparser.available
|
|
else "forced_fallback"
|
|
)
|
|
blocks = _detect_via_doctr(image, screenshot_path)
|
|
structure = SemanticStructure(text_blocks=blocks)
|
|
|
|
elapsed = time.monotonic() - t0
|
|
analysis = ScreenAnalysis(
|
|
index=frame_index,
|
|
phash=phash,
|
|
screen_id=f"screen_{frame_index:03d}",
|
|
screenshot_path=screenshot_path,
|
|
structure=structure,
|
|
degraded=degraded,
|
|
degraded_reason=degraded_reason,
|
|
elapsed_sec=elapsed,
|
|
window_title=window_title,
|
|
)
|
|
|
|
# Cache écriture (best-effort).
|
|
_cache_write(self.session_id, frame_index, analysis.to_dict())
|
|
return analysis
|
|
|
|
# -- pipeline complet -------------------------------------------------
|
|
|
|
def analyze_frames(
|
|
self,
|
|
frames: Sequence[Tuple[int, "Image.Image"]],
|
|
*,
|
|
screenshot_paths: Optional[dict[int, str]] = None,
|
|
window_titles: Optional[dict[int, str]] = None,
|
|
run_healthcheck: bool = True,
|
|
) -> Phase25Result:
|
|
"""Pipeline complet : grouping phash → analyse → cap → résultat.
|
|
|
|
Args:
|
|
frames: liste ``(frame_index, PIL.Image)``.
|
|
screenshot_paths: mapping ``frame_index -> path`` (optionnel).
|
|
window_titles: mapping ``frame_index -> window_title`` (optionnel).
|
|
run_healthcheck: lancer le healthcheck OmniParser avant analyse.
|
|
|
|
Returns:
|
|
``Phase25Result`` avec ``too_complex=True`` si > max_screens.
|
|
"""
|
|
if not _HAS_PIL:
|
|
raise RuntimeError("PIL est requis pour Phase25Analyzer.analyze_frames")
|
|
|
|
if run_healthcheck:
|
|
self.healthcheck()
|
|
if not self._healthcheck_passed:
|
|
logger.warning(
|
|
"[PHASE25] healthcheck OmniParser KO (%s) -> mode degrade docTR",
|
|
self._healthcheck_reason,
|
|
)
|
|
|
|
force_fallback = not self._healthcheck_passed
|
|
|
|
# 1. Regrouper par similarité perceptuelle.
|
|
reps = identify_distinct_screens(frames)
|
|
|
|
# 2. Cap MAX_SCREENS_PER_SESSION.
|
|
too_complex = len(reps) > self.max_screens
|
|
if too_complex:
|
|
logger.warning(
|
|
"[PHASE25] session %s : %d ecrans distincts > cap %d -> too_complex",
|
|
self.session_id, len(reps), self.max_screens,
|
|
)
|
|
reps = reps[: self.max_screens]
|
|
|
|
# 3. Analyser chaque représentant.
|
|
sp = screenshot_paths or {}
|
|
wt = window_titles or {}
|
|
screens: List[ScreenAnalysis] = []
|
|
any_degraded = False
|
|
for idx, img, phash in reps:
|
|
analysis = self.analyze_screen(
|
|
idx,
|
|
img,
|
|
phash,
|
|
screenshot_path=sp.get(idx),
|
|
window_title=wt.get(idx),
|
|
force_fallback=force_fallback,
|
|
)
|
|
screens.append(analysis)
|
|
any_degraded = any_degraded or analysis.degraded
|
|
|
|
return Phase25Result(
|
|
session_id=self.session_id,
|
|
generated_at=datetime.now(timezone.utc).isoformat(),
|
|
omniparser_available=self.omniparser.available and self._healthcheck_passed,
|
|
degraded=any_degraded or not self._healthcheck_passed,
|
|
too_complex=too_complex,
|
|
screens=screens,
|
|
healthcheck_passed=self._healthcheck_passed,
|
|
healthcheck_reason=self._healthcheck_reason,
|
|
)
|
|
|
|
# -- écriture YAML -----------------------------------------------------
|
|
|
|
def write_semantic_yaml(
|
|
self,
|
|
result: Phase25Result,
|
|
slug: str,
|
|
*,
|
|
target_dir: Optional[Path] = None,
|
|
) -> Path:
|
|
"""Écrit le ``.semantic.yaml`` à côté du YAML compétence candidate.
|
|
|
|
Args:
|
|
result: Résultat d'analyse Phase 2.5.
|
|
slug: slug compétence (validé contre SLUG_PATTERN).
|
|
target_dir: répertoire cible (défaut : ``data/competences/candidate/``).
|
|
|
|
Returns:
|
|
Path absolu du fichier écrit.
|
|
|
|
Raises:
|
|
ValueError: slug invalide.
|
|
OSError: écriture impossible.
|
|
"""
|
|
s = _validate_slug(slug)
|
|
out_dir = target_dir if target_dir is not None else SEMANTIC_DIR
|
|
out_dir = Path(out_dir)
|
|
_ensure_dir(out_dir)
|
|
|
|
# Anti écrasement supervised/stable : on refuse explicitement.
|
|
forbidden = {"supervised", "stable"}
|
|
if out_dir.name in forbidden:
|
|
raise ValueError(
|
|
f"target_dir interdit '{out_dir.name}' (autorise : candidate uniquement)"
|
|
)
|
|
|
|
payload = {
|
|
"competence_id": s,
|
|
"semantic_version": 1,
|
|
"generated_at": result.generated_at,
|
|
"session_id": result.session_id,
|
|
"omniparser_available": result.omniparser_available,
|
|
"degraded": result.degraded,
|
|
"too_complex": result.too_complex,
|
|
"healthcheck_passed": result.healthcheck_passed,
|
|
"healthcheck_reason": result.healthcheck_reason,
|
|
"screens": [],
|
|
}
|
|
for sc in result.screens:
|
|
payload["screens"].append({
|
|
"screen_id": sc.screen_id,
|
|
"phash": sc.phash,
|
|
"representative_frame_index": sc.index,
|
|
"screenshot_path": sc.screenshot_path,
|
|
"window_title": sc.window_title,
|
|
"degraded": sc.degraded,
|
|
"degraded_reason": sc.degraded_reason,
|
|
"elapsed_sec": round(sc.elapsed_sec, 3),
|
|
"structure": sc.structure.to_dict(),
|
|
"annotations": [], # placeholder — annotation humaine ultérieure
|
|
})
|
|
|
|
target = out_dir / f"{s}.semantic.yaml"
|
|
tmp = target.with_suffix(".yaml.tmp")
|
|
with tmp.open("w", encoding="utf-8") as fh:
|
|
yaml.safe_dump(payload, fh, allow_unicode=True, sort_keys=False)
|
|
tmp.replace(target)
|
|
logger.info(
|
|
"[PHASE25] semantic yaml ecrit : %s (screens=%d, degraded=%s)",
|
|
target, len(result.screens), result.degraded,
|
|
)
|
|
return target
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Helpers utilitaires (chargement frames)
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
def load_frames_from_paths(paths_by_index: dict[int, str]) -> List[Tuple[int, "Image.Image"]]:
|
|
"""Charge des images PIL à partir d'un mapping ``frame_index -> path``.
|
|
|
|
Ignore silencieusement les chemins inexistants (avec log warning).
|
|
"""
|
|
if not _HAS_PIL:
|
|
raise RuntimeError("PIL est requis pour load_frames_from_paths")
|
|
frames: List[Tuple[int, Image.Image]] = []
|
|
for idx in sorted(paths_by_index.keys()):
|
|
p = paths_by_index[idx]
|
|
try:
|
|
img = Image.open(p)
|
|
img.load()
|
|
frames.append((int(idx), img))
|
|
except (FileNotFoundError, OSError) as exc:
|
|
logger.warning("[PHASE25] frame %d illisible (%s) : %s", idx, p, exc)
|
|
return frames
|
|
|
|
|
|
__all__ = [
|
|
"Phase25Analyzer",
|
|
"Phase25Result",
|
|
"ScreenAnalysis",
|
|
"SemanticStructure",
|
|
"SEMANTIC_DIR",
|
|
"OMNIPARSER_CACHE_DIR",
|
|
"OMNIPARSER_CACHE_ROOT",
|
|
"OMNIPARSER_ERROR_LOG",
|
|
"PHASH_HAMMING_THRESHOLD",
|
|
"MAX_SCREENS_PER_SESSION",
|
|
"compute_phash",
|
|
"identify_distinct_screens",
|
|
"load_frames_from_paths",
|
|
]
|