"""Phase 2.5 — Analyseur sémantique post-apprentissage. Module isolé qui prend en entrée un ensemble de screenshots capturés pendant la phase Shadow et produit un payload structuré ``{tables, forms, buttons, text_blocks}`` par écran distinct, stocké dans un fichier ``.semantic.yaml`` séparé. Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md`` Garde-fous : - Wrapper try/except global autour de chaque appel OmniParser. - Fallback OCR-seul (docTR) si OmniParser indisponible ou KO. - Healthcheck OmniParser au démarrage : KO ⇒ bascule auto en dégradé. - Cache disque ``data/cache/omniparser//.json``. - Cap 10 écrans distincts par session. - Aucun import de FastAPI, aucun appel réseau direct. """ from __future__ import annotations import concurrent.futures import hashlib import io import json import logging import re import time import traceback from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable, List, Optional, Sequence, Tuple try: # pragma: no cover - dépendance externe déjà présente dans le projet import yaml except ImportError as exc: # pragma: no cover raise RuntimeError("PyYAML est requis pour core.semantic.phase25_analyzer") from exc try: # PIL toujours présent côté Linux dev / DGX from PIL import Image _HAS_PIL = True except ImportError: # pragma: no cover Image = None # type: ignore[assignment] _HAS_PIL = False try: import imagehash # type: ignore _HAS_IMAGEHASH = True except ImportError: # pragma: no cover - fallback MD5 thumbnail imagehash = None # type: ignore[assignment] _HAS_IMAGEHASH = False logger = logging.getLogger(__name__) # ---------------------------------------------------------------------------- # Constantes et chemins # ---------------------------------------------------------------------------- REPO_ROOT = Path(__file__).resolve().parents[2] DATA_ROOT = REPO_ROOT / "data" SEMANTIC_DIR = DATA_ROOT / "competences" / "candidate" OMNIPARSER_CACHE_ROOT = DATA_ROOT / "cache" / "omniparser" OMNIPARSER_CACHE_DIR = OMNIPARSER_CACHE_ROOT # alias public LOGS_DIR = REPO_ROOT / "logs" OMNIPARSER_ERROR_LOG = LOGS_DIR / "omniparser_errors.log" # Heuristique de regroupement perceptuel (cf. specs §3). PHASH_HAMMING_THRESHOLD = 8 MAX_SCREENS_PER_SESSION = 10 THUMBNAIL_SIZE = (256, 256) # fallback MD5 # Timeout par screenshot (cf. specs §2). OMNIPARSER_TIMEOUT_SEC = 30.0 # Slug autorisé (réutilisation du pattern persist : a-z0-9_). SLUG_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,79}$") # session_id autorisé : caractères inoffensifs uniquement. SESSION_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_\-]{0,127}$") # ---------------------------------------------------------------------------- # Dataclasses # ---------------------------------------------------------------------------- @dataclass class SemanticStructure: """Structure sémantique d'un écran (cf. specs §2).""" tables: List[dict] = field(default_factory=list) forms: List[dict] = field(default_factory=list) buttons: List[dict] = field(default_factory=list) text_blocks: List[dict] = field(default_factory=list) def to_dict(self) -> dict: return { "tables": list(self.tables), "forms": list(self.forms), "buttons": list(self.buttons), "text_blocks": list(self.text_blocks), } @dataclass class ScreenAnalysis: """Analyse d'un écran représentatif (cf. specs §3).""" index: int phash: str screen_id: str screenshot_path: Optional[str] structure: SemanticStructure degraded: bool = False degraded_reason: Optional[str] = None elapsed_sec: float = 0.0 window_title: Optional[str] = None # Snapshot "contrat Codex" : représentation aplatie destinée à # l'agent-chat / dashboard. Calculée à la volée par to_dict(). def to_dict(self) -> dict: elements = _structure_to_elements(self.structure) return { "index": self.index, "hash": self.phash, "screen_id": self.screen_id, "window_title": self.window_title, "screenshot_path": self.screenshot_path, "structure": self.structure.to_dict(), "elements": elements, "degraded": self.degraded, "degraded_reason": self.degraded_reason, "elapsed_sec": round(self.elapsed_sec, 3), } @dataclass class Phase25Result: """Résultat global d'une analyse Phase 2.5.""" session_id: str generated_at: str omniparser_available: bool degraded: bool too_complex: bool screens: List[ScreenAnalysis] = field(default_factory=list) healthcheck_passed: bool = True healthcheck_reason: Optional[str] = None def to_dict(self) -> dict: return { "session_id": self.session_id, "generated_at": self.generated_at, "omniparser_available": self.omniparser_available, "degraded": self.degraded, "too_complex": self.too_complex, "healthcheck_passed": self.healthcheck_passed, "healthcheck_reason": self.healthcheck_reason, "screens": [s.to_dict() for s in self.screens], } # ---------------------------------------------------------------------------- # Helpers : validation et FS # ---------------------------------------------------------------------------- def _validate_session_id(session_id: Any) -> str: if not isinstance(session_id, str) or not session_id.strip(): raise ValueError("session_id doit etre une chaine non vide") sid = session_id.strip() if not SESSION_ID_PATTERN.match(sid): raise ValueError( "session_id invalide (autorise : [A-Za-z0-9][A-Za-z0-9_-]{0,127})" ) # Anti path-traversal de ceinture-bretelles : on refuse explicitement # toute tentative ../ même si le regex ne devrait pas la laisser passer. if ".." in sid or "/" in sid or "\\" in sid: raise ValueError("session_id invalide (path-traversal interdit)") return sid def _validate_slug(slug: Any) -> str: if not isinstance(slug, str): raise ValueError("slug doit etre une chaine") s = slug.strip() if not SLUG_PATTERN.match(s): raise ValueError( f"slug invalide '{s}' (regle : {SLUG_PATTERN.pattern})" ) return s def _ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def _log_omniparser_error(session_id: str, frame_index: int, exc: BaseException) -> None: """Append-only sur ``logs/omniparser_errors.log`` (cf. specs §7).""" try: _ensure_dir(LOGS_DIR) entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "session_id": session_id, "frame_index": frame_index, "error_type": type(exc).__name__, "error_message": str(exc), "traceback": traceback.format_exception_only(type(exc), exc), } with OMNIPARSER_ERROR_LOG.open("a", encoding="utf-8") as fh: fh.write(json.dumps(entry, ensure_ascii=False) + "\n") except OSError as log_exc: # pragma: no cover - log best-effort logger.warning("[PHASE25] echec ecriture omniparser_errors.log : %s", log_exc) # ---------------------------------------------------------------------------- # Hash perceptuel (avec fallback MD5) # ---------------------------------------------------------------------------- def compute_phash(image: "Image.Image") -> str: """Calcule un hash perceptuel ou un hash MD5 thumbnail (fallback).""" if _HAS_IMAGEHASH and imagehash is not None: try: return str(imagehash.phash(image)) except Exception as exc: # pragma: no cover logger.warning("[PHASE25] phash imagehash KO, fallback MD5 : %s", exc) # Fallback MD5 sur thumbnail. thumb = image.copy() thumb.thumbnail(THUMBNAIL_SIZE) buf = io.BytesIO() thumb.convert("RGB").save(buf, format="PNG") return "md5:" + hashlib.md5(buf.getvalue()).hexdigest() def _hamming_distance(h1: str, h2: str) -> int: """Distance de Hamming entre deux phash imagehash, ou fallback MD5. - Cas imagehash : on reconvertit via ``imagehash.hex_to_hash``. - Cas MD5 (préfixe ``md5:``) : 0 si égal, sinon distance "haute" pour ne jamais les considérer comme similaires (heuristique conservative). """ if h1.startswith("md5:") or h2.startswith("md5:"): return 0 if h1 == h2 else PHASH_HAMMING_THRESHOLD + 1 if not _HAS_IMAGEHASH or imagehash is None: # Pas d'imagehash mais les hashes hex présents (rare) : XOR brut. try: i1 = int(h1, 16) i2 = int(h2, 16) return bin(i1 ^ i2).count("1") except ValueError: return PHASH_HAMMING_THRESHOLD + 1 try: return abs(imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)) except Exception: return PHASH_HAMMING_THRESHOLD + 1 def identify_distinct_screens( frames: Sequence[Tuple[int, "Image.Image"]], threshold: int = PHASH_HAMMING_THRESHOLD, ) -> List[Tuple[int, "Image.Image", str]]: """Regroupe les frames par similarité phash et retourne un représentant par groupe. Args: frames: séquence ``(frame_index, PIL.Image)``. threshold: Hamming distance max pour considérer deux frames identiques. Returns: Liste ``(frame_index, image, phash)`` — un représentant par groupe, dans l'ordre temporel d'apparition (premier vu = représentant). """ representatives: List[Tuple[int, Image.Image, str]] = [] for idx, img in frames: h = compute_phash(img) matched = False for ridx, _rimg, rhash in representatives: if _hamming_distance(h, rhash) <= threshold: matched = True logger.debug( "[PHASE25] frame %d regroupee avec representant %d (phash=%s)", idx, ridx, h, ) break if not matched: representatives.append((idx, img, h)) return representatives # ---------------------------------------------------------------------------- # Conversion structure ⇄ "elements" (contrat Codex) # ---------------------------------------------------------------------------- def _structure_to_elements(struct: SemanticStructure) -> List[dict]: """Aplatissement structure -> liste d'éléments {kind, label, bbox, confidence}.""" elements: List[dict] = [] for tbl in struct.tables: elements.append({ "kind": "table", "label": tbl.get("label", "table"), "bbox": tbl.get("bbox", []), "confidence": float(tbl.get("confidence", 0.5)), }) for frm in struct.forms: elements.append({ "kind": "field", "label": frm.get("label", "field"), "bbox": frm.get("bbox", []), "confidence": float(frm.get("confidence", 0.5)), }) for btn in struct.buttons: elements.append({ "kind": "button", "label": btn.get("label", "button"), "bbox": btn.get("bbox", []), "confidence": float(btn.get("confidence", 0.5)), }) for tb in struct.text_blocks: elements.append({ "kind": "text_block", "label": tb.get("label", tb.get("text", "")), "bbox": tb.get("bbox", []), "confidence": float(tb.get("confidence", 0.5)), }) return elements def _classify_element(label: str, kind_hint: str | None = None) -> str: """Heuristique de classification d'un élément OmniParser. Cohérente avec ``OmniParserAdapter._classify_element``, mais retourne nos catégories sémantiques : ``table | field | button | text_block``. """ lab = (label or "").lower() if kind_hint: kh = kind_hint.lower() if "table" in kh: return "table" if "input" in kh or "field" in kh or "edit" in kh: return "field" if "button" in kh or "btn" in kh: return "button" if any(kw in lab for kw in ("button", "btn", "submit", "valider", "annuler", "ok", "close")): return "button" if any(kw in lab for kw in ("input", "field", "saisie", "textbox", "champ")): return "field" if "table" in lab or "grille" in lab: return "table" return "text_block" # ---------------------------------------------------------------------------- # Adapter wrappers : OmniParser et docTR (fallback) # ---------------------------------------------------------------------------- class _OmniParserSafeWrapper: """Wrap fragile OmniParserAdapter avec garde-fou anti-exception. - Import paresseux (lazy) pour ne pas casser l'import du module si OmniParser n'est pas installé. - ``available=False`` ⇒ caller bascule en fallback OCR-seul. - Timeout effectif appliqué autour de chaque appel ``detect`` via ``ThreadPoolExecutor`` + ``future.result(timeout=...)``. """ # Executor module-level pour ne pas créer un pool par appel. _TIMEOUT_EXECUTOR: Optional[concurrent.futures.ThreadPoolExecutor] = None @classmethod def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor: if cls._TIMEOUT_EXECUTOR is None: cls._TIMEOUT_EXECUTOR = concurrent.futures.ThreadPoolExecutor( max_workers=2, thread_name_prefix="phase25-omniparser-timeout", ) return cls._TIMEOUT_EXECUTOR def __init__(self) -> None: self._adapter: Any = None self._available: bool = False self._import_error: Optional[str] = None self._try_import() def _try_import(self) -> None: try: from core.detection.omniparser_adapter import OmniParserAdapter # type: ignore self._adapter = OmniParserAdapter() self._available = bool(getattr(self._adapter, "available", False)) if not self._available: # L'adapter existe mais le check de disponibilité a échoué. self._import_error = "OmniParser adapter installé mais modèles non disponibles" except Exception as exc: self._adapter = None self._available = False self._import_error = f"{type(exc).__name__}: {exc}" @property def available(self) -> bool: return self._available @property def import_error(self) -> Optional[str]: return self._import_error def detect( self, image: "Image.Image", *, timeout: Optional[float] = None, ) -> List[Any]: """Appel sécurisé : enrobé d'un timeout dur, lève en cas d'exception. Args: image: image PIL à analyser. timeout: timeout en secondes (défaut : ``OMNIPARSER_TIMEOUT_SEC``). Si dépassé ⇒ ``concurrent.futures.TimeoutError`` propagée au caller, qui bascule en fallback docTR + ``degraded=True``. """ if not self._available or self._adapter is None: return [] effective_timeout = ( timeout if timeout is not None else OMNIPARSER_TIMEOUT_SEC ) executor = self._get_executor() future = executor.submit(self._adapter.detect, image) try: return list(future.result(timeout=effective_timeout)) except concurrent.futures.TimeoutError as exc: # Le thread OmniParser continue son travail en arrière-plan mais # le résultat est ignoré ; le caller bascule en fallback docTR. logger.warning( "[PHASE25] OmniParser.detect timeout (%.1fs) -> fallback", effective_timeout, ) raise except Exception as exc: logger.warning("[PHASE25] OmniParser.detect KO : %s", exc) raise # remonté au caller pour log + fallback def _detect_via_omniparser( wrapper: _OmniParserSafeWrapper, image: "Image.Image", *, timeout: Optional[float] = None, ) -> List[Any]: return wrapper.detect(image, timeout=timeout) def _detect_via_doctr(image: "Image.Image", screenshot_path: Optional[str]) -> List[dict]: """Fallback OCR-seul (docTR). Retourne une liste de text_blocks bruts. Aucun VLM, aucune classification fine — juste OCR ⇒ ``text_blocks``. """ if not _HAS_PIL or image is None: return [] try: from doctr.io import DocumentFile # type: ignore from doctr.models import ocr_predictor # type: ignore except ImportError: logger.info("[PHASE25] docTR non disponible pour fallback OCR") return [] # Cache predictor module-level pour éviter rechargement. global _DOCTR_PREDICTOR try: _DOCTR_PREDICTOR # type: ignore[used-before-def] except NameError: _DOCTR_PREDICTOR = None # type: ignore[assignment] try: if _DOCTR_PREDICTOR is None: # type: ignore[has-type] _DOCTR_PREDICTOR = ocr_predictor( # type: ignore[assignment] det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True, ) except Exception as exc: # pragma: no cover logger.warning("[PHASE25] docTR init KO : %s", exc) return [] # docTR prend un fichier ou un array numpy ; on privilégie le chemin si fourni. blocks: List[dict] = [] try: if screenshot_path and Path(screenshot_path).exists(): doc = DocumentFile.from_images([screenshot_path]) else: buf = io.BytesIO() image.convert("RGB").save(buf, format="PNG") buf.seek(0) doc = DocumentFile.from_images([buf.getvalue()]) result = _DOCTR_PREDICTOR(doc) # type: ignore[misc] W, H = image.size for page in result.pages: for block in page.blocks: for line_obj in block.lines: text = " ".join(w.value for w in line_obj.words).strip() if not text: continue geom = line_obj.geometry # ((x1,y1), (x2,y2)) norm 0-1 x1 = int(geom[0][0] * W) y1 = int(geom[0][1] * H) x2 = int(geom[1][0] * W) y2 = int(geom[1][1] * H) blocks.append({ "label": text, "text": text, "bbox": [x1, y1, x2, y2], "confidence": 0.6, # docTR ne donne pas de score line-level facilement }) except Exception as exc: # pragma: no cover logger.warning("[PHASE25] docTR predict KO : %s", exc) return [] return blocks def _elements_to_structure(elements: Iterable[Any]) -> SemanticStructure: """Convertit la liste OmniParser ``DetectedElement`` en SemanticStructure.""" struct = SemanticStructure() for el in elements: # Compatible avec DetectedElement (dataclass) et dict. if hasattr(el, "label"): label = getattr(el, "label", "") or "" bbox = list(getattr(el, "bbox", ()) or ()) conf = float(getattr(el, "confidence", 0.5) or 0.5) kind_hint = getattr(el, "element_type", None) elif isinstance(el, dict): label = str(el.get("label") or el.get("text") or "") bbox = list(el.get("bbox") or []) conf = float(el.get("confidence", el.get("score", 0.5)) or 0.5) kind_hint = el.get("element_type") or el.get("type") else: continue kind = _classify_element(label, kind_hint) entry = {"label": label, "bbox": bbox, "confidence": conf} if kind == "table": struct.tables.append(entry) elif kind == "field": struct.forms.append(entry) elif kind == "button": struct.buttons.append(entry) else: struct.text_blocks.append({**entry, "text": label}) return struct # ---------------------------------------------------------------------------- # Cache disque # ---------------------------------------------------------------------------- def _cache_path(session_id: str, frame_index: int) -> Path: sid = _validate_session_id(session_id) return OMNIPARSER_CACHE_ROOT / sid / f"{int(frame_index)}.json" def _cache_read(session_id: str, frame_index: int) -> Optional[dict]: path = _cache_path(session_id, frame_index) if not path.exists(): return None try: with path.open("r", encoding="utf-8") as fh: return json.load(fh) except (OSError, json.JSONDecodeError) as exc: logger.warning("[PHASE25] cache illisible %s : %s", path, exc) return None def _cache_write(session_id: str, frame_index: int, payload: dict) -> None: path = _cache_path(session_id, frame_index) try: _ensure_dir(path.parent) tmp = path.with_suffix(".json.tmp") with tmp.open("w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) tmp.replace(path) except OSError as exc: # pragma: no cover logger.warning("[PHASE25] cache ecriture KO %s : %s", path, exc) # ---------------------------------------------------------------------------- # Analyseur principal # ---------------------------------------------------------------------------- class Phase25Analyzer: """Analyseur sémantique post-apprentissage. Usage minimal : analyzer = Phase25Analyzer(session_id="abc123") result = analyzer.analyze_frames(frames=[(0, img0), (12, img12), ...]) path = analyzer.write_semantic_yaml(result, slug="ma_competence") ``frames`` est une séquence ``(frame_index, PIL.Image[, screenshot_path])``. """ def __init__( self, session_id: str, *, omniparser: Optional[_OmniParserSafeWrapper] = None, max_screens: int = MAX_SCREENS_PER_SESSION, timeout_sec: float = OMNIPARSER_TIMEOUT_SEC, ) -> None: self.session_id = _validate_session_id(session_id) self.omniparser = omniparser if omniparser is not None else _OmniParserSafeWrapper() self.max_screens = max_screens self.timeout_sec = timeout_sec self._healthcheck_passed = True self._healthcheck_reason: Optional[str] = None # -- healthcheck ------------------------------------------------------- def healthcheck(self) -> bool: """Vérifie qu'OmniParser répond sur une image bidon (cf. specs §7). - Si l'adapter est ``available=False`` ⇒ healthcheck KO (mais on continuera quand même en mode dégradé OCR-seul). - Si l'adapter lève une exception ⇒ KO + log dédié. """ if not _HAS_PIL: self._healthcheck_passed = False self._healthcheck_reason = "PIL indisponible" return False if not self.omniparser.available: self._healthcheck_passed = False self._healthcheck_reason = ( self.omniparser.import_error or "OmniParser indisponible" ) return False try: dummy = Image.new("RGB", (64, 64), color=(255, 255, 255)) _ = self.omniparser.detect(dummy, timeout=self.timeout_sec) self._healthcheck_passed = True self._healthcheck_reason = None return True except Exception as exc: _log_omniparser_error(self.session_id, -1, exc) self._healthcheck_passed = False self._healthcheck_reason = f"{type(exc).__name__}: {exc}" return False # -- analyse écran ---------------------------------------------------- def analyze_screen( self, frame_index: int, image: "Image.Image", phash: str, *, screenshot_path: Optional[str] = None, window_title: Optional[str] = None, force_fallback: bool = False, ) -> ScreenAnalysis: """Analyse un écran représentatif. Stratégie : 1. Cache disque (idempotence par session_id+frame_index). 2. OmniParser via wrapper safe → sinon fallback OCR-seul docTR. 3. Exception ⇒ log dédié + ``degraded=True`` + structure docTR. """ # 1. Cache cached = _cache_read(self.session_id, frame_index) if cached is not None: struct = SemanticStructure( tables=cached.get("structure", {}).get("tables", []), forms=cached.get("structure", {}).get("forms", []), buttons=cached.get("structure", {}).get("buttons", []), text_blocks=cached.get("structure", {}).get("text_blocks", []), ) return ScreenAnalysis( index=frame_index, phash=cached.get("phash", phash), screen_id=cached.get("screen_id", f"screen_{frame_index:03d}"), screenshot_path=cached.get("screenshot_path", screenshot_path), structure=struct, degraded=bool(cached.get("degraded", False)), degraded_reason=cached.get("degraded_reason"), elapsed_sec=float(cached.get("elapsed_sec", 0.0)), window_title=cached.get("window_title", window_title), ) t0 = time.monotonic() degraded = False degraded_reason: Optional[str] = None structure: SemanticStructure use_omniparser = self.omniparser.available and not force_fallback if use_omniparser: try: elements = _detect_via_omniparser( self.omniparser, image, timeout=self.timeout_sec, ) structure = _elements_to_structure(elements) if not (structure.tables or structure.forms or structure.buttons or structure.text_blocks): # OmniParser n'a rien produit : on ajoute en complément docTR text_blocks. blocks = _detect_via_doctr(image, screenshot_path) structure.text_blocks.extend(blocks) except Exception as exc: _log_omniparser_error(self.session_id, frame_index, exc) degraded = True degraded_reason = f"omniparser_exception: {type(exc).__name__}" blocks = _detect_via_doctr(image, screenshot_path) structure = SemanticStructure(text_blocks=blocks) else: degraded = True degraded_reason = ( "omniparser_unavailable: " + (self.omniparser.import_error or "n/a") if not self.omniparser.available else "forced_fallback" ) blocks = _detect_via_doctr(image, screenshot_path) structure = SemanticStructure(text_blocks=blocks) elapsed = time.monotonic() - t0 analysis = ScreenAnalysis( index=frame_index, phash=phash, screen_id=f"screen_{frame_index:03d}", screenshot_path=screenshot_path, structure=structure, degraded=degraded, degraded_reason=degraded_reason, elapsed_sec=elapsed, window_title=window_title, ) # Cache écriture (best-effort). _cache_write(self.session_id, frame_index, analysis.to_dict()) return analysis # -- pipeline complet ------------------------------------------------- def analyze_frames( self, frames: Sequence[Tuple[int, "Image.Image"]], *, screenshot_paths: Optional[dict[int, str]] = None, window_titles: Optional[dict[int, str]] = None, run_healthcheck: bool = True, ) -> Phase25Result: """Pipeline complet : grouping phash → analyse → cap → résultat. Args: frames: liste ``(frame_index, PIL.Image)``. screenshot_paths: mapping ``frame_index -> path`` (optionnel). window_titles: mapping ``frame_index -> window_title`` (optionnel). run_healthcheck: lancer le healthcheck OmniParser avant analyse. Returns: ``Phase25Result`` avec ``too_complex=True`` si > max_screens. """ if not _HAS_PIL: raise RuntimeError("PIL est requis pour Phase25Analyzer.analyze_frames") if run_healthcheck: self.healthcheck() if not self._healthcheck_passed: logger.warning( "[PHASE25] healthcheck OmniParser KO (%s) -> mode degrade docTR", self._healthcheck_reason, ) force_fallback = not self._healthcheck_passed # 1. Regrouper par similarité perceptuelle. reps = identify_distinct_screens(frames) # 2. Cap MAX_SCREENS_PER_SESSION. too_complex = len(reps) > self.max_screens if too_complex: logger.warning( "[PHASE25] session %s : %d ecrans distincts > cap %d -> too_complex", self.session_id, len(reps), self.max_screens, ) reps = reps[: self.max_screens] # 3. Analyser chaque représentant. sp = screenshot_paths or {} wt = window_titles or {} screens: List[ScreenAnalysis] = [] any_degraded = False for idx, img, phash in reps: analysis = self.analyze_screen( idx, img, phash, screenshot_path=sp.get(idx), window_title=wt.get(idx), force_fallback=force_fallback, ) screens.append(analysis) any_degraded = any_degraded or analysis.degraded return Phase25Result( session_id=self.session_id, generated_at=datetime.now(timezone.utc).isoformat(), omniparser_available=self.omniparser.available and self._healthcheck_passed, degraded=any_degraded or not self._healthcheck_passed, too_complex=too_complex, screens=screens, healthcheck_passed=self._healthcheck_passed, healthcheck_reason=self._healthcheck_reason, ) # -- écriture YAML ----------------------------------------------------- def write_semantic_yaml( self, result: Phase25Result, slug: str, *, target_dir: Optional[Path] = None, ) -> Path: """Écrit le ``.semantic.yaml`` à côté du YAML compétence candidate. Args: result: Résultat d'analyse Phase 2.5. slug: slug compétence (validé contre SLUG_PATTERN). target_dir: répertoire cible (défaut : ``data/competences/candidate/``). Returns: Path absolu du fichier écrit. Raises: ValueError: slug invalide. OSError: écriture impossible. """ s = _validate_slug(slug) out_dir = target_dir if target_dir is not None else SEMANTIC_DIR out_dir = Path(out_dir) _ensure_dir(out_dir) # Anti écrasement supervised/stable : on refuse explicitement. forbidden = {"supervised", "stable"} if out_dir.name in forbidden: raise ValueError( f"target_dir interdit '{out_dir.name}' (autorise : candidate uniquement)" ) payload = { "competence_id": s, "semantic_version": 1, "generated_at": result.generated_at, "session_id": result.session_id, "omniparser_available": result.omniparser_available, "degraded": result.degraded, "too_complex": result.too_complex, "healthcheck_passed": result.healthcheck_passed, "healthcheck_reason": result.healthcheck_reason, "screens": [], } for sc in result.screens: payload["screens"].append({ "screen_id": sc.screen_id, "phash": sc.phash, "representative_frame_index": sc.index, "screenshot_path": sc.screenshot_path, "window_title": sc.window_title, "degraded": sc.degraded, "degraded_reason": sc.degraded_reason, "elapsed_sec": round(sc.elapsed_sec, 3), "structure": sc.structure.to_dict(), "annotations": [], # placeholder — annotation humaine ultérieure }) target = out_dir / f"{s}.semantic.yaml" tmp = target.with_suffix(".yaml.tmp") with tmp.open("w", encoding="utf-8") as fh: yaml.safe_dump(payload, fh, allow_unicode=True, sort_keys=False) tmp.replace(target) logger.info( "[PHASE25] semantic yaml ecrit : %s (screens=%d, degraded=%s)", target, len(result.screens), result.degraded, ) return target # ---------------------------------------------------------------------------- # Helpers utilitaires (chargement frames) # ---------------------------------------------------------------------------- def load_frames_from_paths(paths_by_index: dict[int, str]) -> List[Tuple[int, "Image.Image"]]: """Charge des images PIL à partir d'un mapping ``frame_index -> path``. Ignore silencieusement les chemins inexistants (avec log warning). """ if not _HAS_PIL: raise RuntimeError("PIL est requis pour load_frames_from_paths") frames: List[Tuple[int, Image.Image]] = [] for idx in sorted(paths_by_index.keys()): p = paths_by_index[idx] try: img = Image.open(p) img.load() frames.append((int(idx), img)) except (FileNotFoundError, OSError) as exc: logger.warning("[PHASE25] frame %d illisible (%s) : %s", idx, p, exc) return frames __all__ = [ "Phase25Analyzer", "Phase25Result", "ScreenAnalysis", "SemanticStructure", "SEMANTIC_DIR", "OMNIPARSER_CACHE_DIR", "OMNIPARSER_CACHE_ROOT", "OMNIPARSER_ERROR_LOG", "PHASH_HAMMING_THRESHOLD", "MAX_SCREENS_PER_SESSION", "compute_phash", "identify_distinct_screens", "load_frames_from_paths", ]