diff --git a/agent_v0/server_v1/replay_engine.py b/agent_v0/server_v1/replay_engine.py index 8b2bdf43a..6a3cb9220 100644 --- a/agent_v0/server_v1/replay_engine.py +++ b/agent_v0/server_v1/replay_engine.py @@ -687,6 +687,7 @@ def _extract_required_apps_from_events( - launch_result_target: dict optionnel (vrai clic SearchHost -> app) """ app_counts: Dict[str, int] = defaultdict(int) + app_titles: Dict[str, List[str]] = defaultdict(list) first_app = None first_window_title = None @@ -702,6 +703,8 @@ def _extract_required_apps_from_events( title = to_info.get("title", "") if app_name: app_counts[app_name] += 1 + if title: + app_titles[app_name].append(title) if first_app is None and app_name.lower() not in _SETUP_IGNORE_APPS: first_app = app_name first_window_title = title @@ -741,6 +744,10 @@ def _extract_required_apps_from_events( "primary_launch_cmd": primary_launch_cmd, "first_window_title": first_window_title or "", "apps": dict(app_counts), + "has_neutral_window_title": any( + _is_neutral_window_title(title) + for title in app_titles.get(primary_app, []) + ), } if start_menu_target: result["start_menu_target"] = start_menu_target @@ -927,6 +934,9 @@ def _extract_required_apps_from_workflow(workflow) -> Dict[str, Any]: "primary_launch_cmd": primary_launch_cmd, "first_window_title": first_title, "apps": {}, + "has_neutral_window_title": any( + _is_neutral_window_title(title) for title in window_titles + ), "source_session_id": source_session_id, "machine_id": machine_id, } @@ -1113,6 +1123,50 @@ def _generate_run_dialog_setup_actions( }, ] + needs_fresh_notepad_document = ( + primary_app.lower() == "notepad.exe" + and ( + bool(app_info.get("has_neutral_window_title")) + or _is_neutral_window_title(first_title) + ) + ) + if needs_fresh_notepad_document: + if title_patterns or first_title: + actions.append({ + "action_id": f"act_{setup_id_prefix}_verify_before_fresh_document", + "type": "verify_screen", + "expected_node": "setup_initial_before_fresh_document", + "timeout_ms": 5000, + "_setup_phase": True, + "_setup_step": "verify_app_ready_before_fresh_document", + "_setup_strategy": "run_dialog", + "expected_window_title_contains": title_patterns or [first_title], + "intention": ( + "vérifier que Bloc-notes est la scène active avant " + "d'ouvrir un document vierge" + ), + }) + actions.extend([ + { + "action_id": f"act_{setup_id_prefix}_ensure_fresh_document", + "type": "key_combo", + "keys": ["ctrl", "n"], + "_setup_phase": True, + "_setup_step": "ensure_fresh_document", + "_setup_strategy": "run_dialog", + "expected_window_before": first_title, + "intention": "ouvrir un document Bloc-notes vierge non nommé", + }, + { + "action_id": f"act_{setup_id_prefix}_wait_fresh_document", + "type": "wait", + "duration_ms": 400, + "_setup_phase": True, + "_setup_step": "wait_fresh_document", + "_setup_strategy": "run_dialog", + }, + ]) + if title_patterns or first_title: actions.append({ "action_id": f"act_{setup_id_prefix}_verify", @@ -1688,6 +1742,63 @@ def _is_learned_workflow(workflow) -> bool: return has_prototype +_TARGET_SEMANTIC_KEYS = ( + "by_text", + "by_role", + "anchor_id", + "target_text", + "ocr_description", + "description", + "vlm_description", + "anchor_image_base64", + "by_text_source", + "window_title", + "anchor_bbox", + "original_size", +) + + +def _first_non_empty_text(*values: Any) -> str: + for value in values: + text = str(value or "").strip() + if text and text.casefold() not in {"none", "null"}: + return text + return "" + + +def _target_attr(target: Any, key: str, default: Any = None) -> Any: + if isinstance(target, dict): + return target.get(key, default) + return getattr(target, key, default) + + +def _copy_semantic_target_fields( + target_spec: Dict[str, Any], + *sources: Optional[Dict[str, Any]], +) -> None: + for source in sources: + if not isinstance(source, dict): + continue + for key in _TARGET_SEMANTIC_KEYS: + value = source.get(key) + if value and not target_spec.get(key): + target_spec[key] = value + + if not target_spec.get("by_text"): + target_text = _first_non_empty_text(target_spec.get("target_text")) + if target_text: + target_spec["by_text"] = target_text + target_spec.setdefault("by_text_source", "visual_anchor") + + if not target_spec.get("vlm_description"): + description = _first_non_empty_text( + target_spec.get("description"), + target_spec.get("ocr_description"), + ) + if description: + target_spec["vlm_description"] = description + + def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, Any]]: """ Convertir un WorkflowEdge en liste d'actions normalisées pour l'Agent V1. @@ -1705,8 +1816,9 @@ def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, # Extraire les coordonnées normalisées depuis TargetSpec.by_position x_pct = 0.0 y_pct = 0.0 - if target and target.by_position: - px, py = target.by_position + by_position = _target_attr(target, "by_position") + if target and by_position: + px, py = by_position if px <= 1.0 and py <= 1.0: x_pct = px y_pct = py @@ -1769,10 +1881,15 @@ def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, elif action_type == "extract_table": normalized["type"] = "extract_table" normalized["parameters"] = { - "output_var": action_params.get("output_var", "table_rows"), + "output_var": ( + action_params.get("variable_name") + or action_params.get("output_var") + or "table_rows" + ), "pattern": action_params.get("pattern"), "limit": action_params.get("limit"), "region": action_params.get("region"), + "engine": action_params.get("engine", "easyocr"), } return [normalized] @@ -1833,14 +1950,33 @@ def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, # Ajouter le target_spec complet pour la résolution visuelle target_spec = {} - if target and target.by_role: - target_spec["by_role"] = target.by_role - normalized["target_role"] = target.by_role # Compat debug - if target and target.by_text: - target_spec["by_text"] = target.by_text - normalized["target_text"] = target.by_text # Compat debug - if target and hasattr(target, 'context_hints') and target.context_hints: - target_spec["context_hints"] = target.context_hints + by_role = _target_attr(target, "by_role", "") + by_text = _target_attr(target, "by_text", "") + context_hints = _target_attr(target, "context_hints", {}) or {} + if target and by_role: + target_spec["by_role"] = by_role + normalized["target_role"] = by_role # Compat debug + if target and by_text: + target_spec["by_text"] = by_text + normalized["target_text"] = by_text # Compat debug + if target and context_hints: + target_spec["context_hints"] = context_hints + _copy_semantic_target_fields( + target_spec, + action_params, + action_params.get("target_spec") if isinstance(action_params, dict) else None, + context_hints, + ) + semantic_label = _first_non_empty_text( + target_spec.get("by_text"), + target_spec.get("target_text"), + target_spec.get("description"), + target_spec.get("ocr_description"), + target_spec.get("vlm_description"), + ) + if semantic_label: + normalized.setdefault("target_text", target_spec.get("target_text") or semantic_label) + normalized.setdefault("target_description", semantic_label) if target_spec: normalized["target_spec"] = target_spec normalized["visual_mode"] = True # Signal à l'agent d'utiliser la résolution visuelle @@ -2004,6 +2140,7 @@ def _handle_extract_table_action( output_var : nom de variable runtime (default "table_rows") pattern : regex à matcher sur chaque token OCR (ex : r"^25\\d{6}$") limit : nb max d'entrées à retourner + engine : easyocr (défaut) ou tesseract/digits/ipp pour chiffres region : (x, y, w, h) en pixels pour cropper avant OCR (None = image entière) @@ -2014,6 +2151,7 @@ def _handle_extract_table_action( output_var = (params.get("output_var") or params.get("variable_name") or "table_rows").strip() pattern = params.get("pattern") or None limit = params.get("limit") + engine = params.get("engine") or "easyocr" region = params.get("region") or None if isinstance(limit, str): try: @@ -2058,6 +2196,7 @@ def _handle_extract_table_action( region=tuple(region) if region else None, pattern=pattern, limit=limit, + engine=engine, ) except Exception as e: logger.warning( @@ -2071,8 +2210,8 @@ def _handle_extract_table_action( replay_state.setdefault("variables", {})[output_var] = rows logger.info( - "extract_table → variable '%s' (%d entrées, pattern=%r, limit=%s) replay %s", - output_var, len(rows), pattern, limit, replay_state.get("replay_id", "?"), + "extract_table → variable '%s' (%d entrées, pattern=%r, limit=%s, engine=%s) replay %s", + output_var, len(rows), pattern, limit, engine, replay_state.get("replay_id", "?"), ) return bool(rows) @@ -2410,6 +2549,29 @@ def _expand_compound_steps( action["x_pct"] = step.get("x_pct", 0.0) action["y_pct"] = step.get("y_pct", 0.0) action["button"] = step.get("button", "left") + target_spec: Dict[str, Any] = {} + _copy_semantic_target_fields( + target_spec, + step, + step.get("target_spec") if isinstance(step, dict) else None, + step.get("visual_anchor") if isinstance(step, dict) else None, + ) + semantic_label = _first_non_empty_text( + target_spec.get("by_text"), + target_spec.get("target_text"), + target_spec.get("description"), + target_spec.get("ocr_description"), + target_spec.get("vlm_description"), + ) + if semantic_label: + action.setdefault( + "target_text", + target_spec.get("target_text") or semantic_label, + ) + action.setdefault("target_description", semantic_label) + if target_spec: + action["target_spec"] = target_spec + action["visual_mode"] = True else: logger.debug(f"Step compound inconnu : {step_type}") @@ -2659,6 +2821,8 @@ def _create_replay_state( a_copy = { "action_id": a.get("action_id"), "type": a.get("type"), + "keys": a.get("keys"), + "button": a.get("button"), "x_pct": a.get("x_pct"), "y_pct": a.get("y_pct"), # Contrôle strict des étapes (Dom, matin 10 avril 2026) @@ -2667,6 +2831,9 @@ def _create_replay_state( "expected_window_title": a.get("expected_window_title", ""), # Contexte métier utile pour logs et apprentissage "intention": a.get("intention", ""), + "target_text": a.get("target_text", ""), + "target_description": a.get("target_description", ""), + "description": a.get("description", ""), } ts = a.get("target_spec") if isinstance(ts, dict): diff --git a/agent_v0/server_v1/replay_memory.py b/agent_v0/server_v1/replay_memory.py index 65df58e54..c242d655a 100644 --- a/agent_v0/server_v1/replay_memory.py +++ b/agent_v0/server_v1/replay_memory.py @@ -43,6 +43,22 @@ logger = logging.getLogger(__name__) _MEMORY_SINGLETON: Optional[Any] = None _MEMORY_DISABLED = False +_GENERIC_BUTTON_TEXTS = { + "annuler", + "cancel", + "enregistrer", + "non", + "no", + "ok", + "oui", + "ouvrir", + "open", + "remplacer", + "replace", + "save", + "yes", +} + def get_memory_store(): """Retourne le `TargetMemoryStore` partagé, ou None si indisponible. @@ -91,6 +107,44 @@ def _norm_text(s: str) -> str: return " ".join(s.split()) +def _memory_lookup_skip_reason(target_spec: Dict[str, Any]) -> str: + """Retourne la raison pour laquelle la mémoire ne doit pas court-circuiter. + + Les clics qui changent de fenêtre doivent être résolus visuellement à + l'instant T : une coordonnée apprise peut être une bonne piste, mais pas + une décision finale. Pour les boutons très génériques, on exige au moins + un contexte de fenêtre/interaction dans la clé mémoire afin d'éviter les + collisions entre « Enregistrer », « OK », « Oui », etc. + """ + if not isinstance(target_spec, dict): + return "" + + hints = target_spec.get("context_hints") or {} + if bool(hints.get("requires_window_transition")): + return "window_transition_requires_visual_confirmation" + + button_text = _norm_text(str(target_spec.get("by_text") or "")) + if button_text not in _GENERIC_BUTTON_TEXTS: + return "" + + before = ( + hints.get("expected_window_before") + or hints.get("button_expected_before_window") + or hints.get("window_title") + or target_spec.get("window_title") + ) + after = ( + hints.get("expected_window_after") + or hints.get("button_expected_after_window") + or hints.get("expected_after_window") + ) + interaction = hints.get("interaction") or hints.get("foreground_dialog_id") + role = target_spec.get("by_role") + if not (before and role and (after or interaction)): + return "generic_button_missing_context" + return "" + + def compute_screen_sig(window_title: str) -> str: """Calcule la signature d'écran V4 à partir du titre de fenêtre. @@ -203,6 +257,11 @@ def memory_lookup( (resolved, method, x_pct, y_pct, score, ...) si une entrée fiable est trouvée. None sinon. """ + skip_reason = _memory_lookup_skip_reason(target_spec) + if skip_reason: + logger.info("memory_lookup SKIP : %s", skip_reason) + return None + store = get_memory_store() if store is None: return None diff --git a/agent_v0/server_v1/resolve_engine.py b/agent_v0/server_v1/resolve_engine.py index c38d54fe4..6789ec73b 100644 --- a/agent_v0/server_v1/resolve_engine.py +++ b/agent_v0/server_v1/resolve_engine.py @@ -988,7 +988,9 @@ def _resolve_by_grounding( {"role": "user", "content": prompt, "images": [shot_b64]}, ], "stream": False, - "options": {"temperature": 0.1, "num_predict": 100}, + # D5-v3a (2026-05-25) num_ctx=4096 explicite : éviter fuite 8192 + # via Modelfile qwen2.5vl:7b-rpa (PARAMETER num_ctx 8192). + "options": {"temperature": 0.1, "num_predict": 100, "num_ctx": 4096}, }, timeout=60) content = resp.json().get("message", {}).get("content", "") except Exception as e: @@ -1016,7 +1018,9 @@ def _resolve_by_grounding( {"role": "user", "content": prompt_mi, "images": [shot_b64, anchor_b64]}, ], "stream": False, - "options": {"temperature": 0.1, "num_predict": 50}, + # D5-v3a (2026-05-25) num_ctx=4096 explicite : éviter fuite + # 8192 via Modelfile qwen2.5vl:7b-rpa. + "options": {"temperature": 0.1, "num_predict": 50, "num_ctx": 4096}, }, timeout=60) content2 = resp2.json().get("message", {}).get("content", "") elapsed = time.time() - t0 @@ -2482,10 +2486,15 @@ def _get_validation_ocr_reader(): if _VALIDATION_OCR_READER is None and not _VALIDATION_OCR_FAILED: try: import easyocr # type: ignore + from core.llm.ocr_extractor import easyocr_gpu_enabled + gpu = easyocr_gpu_enabled(default=False) _VALIDATION_OCR_READER = easyocr.Reader( - ['fr', 'en'], gpu=True, verbose=False + ['fr', 'en'], gpu=gpu, verbose=False + ) + logger.info( + "[REPLAY] EasyOCR validator chargé (fr+en, %s)", + "GPU" if gpu else "CPU", ) - logger.info("[REPLAY] EasyOCR validator chargé (fr+en, GPU)") except Exception as e: logger.warning("[REPLAY] EasyOCR validator indisponible (%s) — pré-check désactivé", e) _VALIDATION_OCR_FAILED = True @@ -2507,8 +2516,15 @@ def _normalize_for_match(s: str) -> str: def _text_match_fuzzy(expected: str, observed: str, min_token_ratio: float = 0.60) -> bool: """Match tolérant aux imperfections OCR. - 1. Substring exacte → match. - 2. Sinon : split en tokens ≥3 caractères, retourne True si au moins + 1. Substring exacte (expected ⊂ observed) → match. + 2. C-P1 (2026-05-25) : tolérance préfixe — observed est un préfixe + d'expected avec longueur ≥ 4 chars ET ≥ 50% de la longueur expected. + Couvre le cas OCR partiel "Enregi" / "Enregistrer" (6 chars sur 11 + = 54%, préfixe strict) où l'OCR coupe une ligne longue. Garde-fous : + - len ≥ 4 évite "Sa" / "Save" (faux positif probable) + - 50% évite "Bo" / "Bouton" et "Enregi" / "Enregistrer sous" (qui + serait 37%, rejet correct). + 3. Sinon : split en tokens ≥3 caractères, retourne True si au moins `min_token_ratio` des tokens attendus apparaissent dans observed. Ex : "Coller ou saisir le dossier patient" → tokens ['coller', 'saisir', 'dossier', 'patient'] ; si OCR voit "u saisir @@ -2523,6 +2539,13 @@ def _text_match_fuzzy(expected: str, observed: str, min_token_ratio: float = 0.6 return True if nexp in nobs: return True + # C-P1 : tolérance préfixe sur OCR partiel + if ( + len(nobs) >= 4 + and len(nobs) * 2 >= len(nexp) + and nexp.startswith(nobs) + ): + return True tokens = [t for t in nexp.split() if len(t) >= 3] if not tokens: return False @@ -3010,7 +3033,9 @@ def _locate_popup_button( "model": "qwen2.5vl:7b", "messages": [{"role": "user", "content": prompt, "images": [screenshot_b64]}], "stream": False, - "options": {"temperature": 0.1, "num_predict": 50}, + # D5-v3a (2026-05-25) num_ctx=4096 explicite : éviter fuite 8192 + # via Modelfile qwen2.5vl:7b/-rpa (PARAMETER num_ctx 8192). + "options": {"temperature": 0.1, "num_predict": 50, "num_ctx": 4096}, }, timeout=15, ) diff --git a/agent_v0/server_v1/workflow_replay.py b/agent_v0/server_v1/workflow_replay.py index eea261117..4b475a051 100644 --- a/agent_v0/server_v1/workflow_replay.py +++ b/agent_v0/server_v1/workflow_replay.py @@ -126,6 +126,25 @@ def build_workflow_replay( "x_relative": "", }, } + _merge_semantic_target_fields( + step_action["target_spec"], + target, + params, + step, + ) + target_label = _first_non_empty_text( + step_action["target_spec"].get("by_text"), + step_action["target_spec"].get("target_text"), + step_action["target_spec"].get("description"), + step_action["target_spec"].get("ocr_description"), + step_action["target_spec"].get("vlm_description"), + ) + if target_label: + step_action.setdefault( + "target_text", + step_action["target_spec"].get("target_text") or target_label, + ) + step_action.setdefault("target_description", target_label) # Ajouter le crop anchor si disponible _attach_anchor(step_action, step, session_dir) @@ -171,6 +190,58 @@ def _map_action_type(step_type: str) -> str: return mapping.get(step_type, step_type) +_TARGET_SEMANTIC_KEYS = ( + "by_text", + "by_role", + "anchor_id", + "target_text", + "ocr_description", + "description", + "vlm_description", + "by_text_source", + "anchor_bbox", + "original_size", +) + + +def _first_non_empty_text(*values: Any) -> str: + for value in values: + text = str(value or "").strip() + if text and text.casefold() not in {"none", "null"}: + return text + return "" + + +def _merge_semantic_target_fields( + target_spec: Dict[str, Any], + *sources: Dict[str, Any], +) -> None: + for source in sources: + if not isinstance(source, dict): + continue + visual_anchor = source.get("visual_anchor") or {} + if isinstance(visual_anchor, dict): + _merge_semantic_target_fields(target_spec, visual_anchor) + for key in _TARGET_SEMANTIC_KEYS: + value = source.get(key) + if value and not target_spec.get(key): + target_spec[key] = value + + if not target_spec.get("by_text"): + target_text = _first_non_empty_text(target_spec.get("target_text")) + if target_text: + target_spec["by_text"] = target_text + target_spec.setdefault("by_text_source", "visual_anchor") + + if not target_spec.get("vlm_description"): + description = _first_non_empty_text( + target_spec.get("description"), + target_spec.get("ocr_description"), + ) + if description: + target_spec["vlm_description"] = description + + def _attach_anchor(action: dict, step: dict, session_dir: str) -> None: """Attacher le crop anchor au target_spec si disponible.""" import base64 diff --git a/core/competences/persist.py b/core/competences/persist.py new file mode 100644 index 000000000..1dc226f12 --- /dev/null +++ b/core/competences/persist.py @@ -0,0 +1,518 @@ +"""Helpers de persistance pour les competences candidates (POC Lea-first). + +Couvre : +- slugification stricte (ASCII, regex ^[a-z][a-z0-9_]{2,79}$) +- detection PII (regex MVP, paramétrable) +- atomic write + rename POSIX +- append-only audit JSONL avec verrou fcntl +- detection de collision cross-states (candidate / supervised / stable) + +Le module est volontairement minimal : il n'importe pas FastAPI ni le pipeline +VWB, il ne fait pas de logique reseau. Il est consomme depuis +``agent_v0/server_v1/api_stream.py`` endpoint ``/persist``. +""" + +from __future__ import annotations + +import json +import os +import re +import time +import unicodedata +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable, Optional + +try: # pragma: no cover - dependance externe deja presente dans le projet + import yaml +except ImportError as exc: # pragma: no cover + raise RuntimeError("PyYAML est requis pour core.competences.persist") from exc + +try: + import fcntl # POSIX uniquement + _HAS_FCNTL = True +except ImportError: # pragma: no cover - Windows + fcntl = None # type: ignore[assignment] + _HAS_FCNTL = False + + +REPO_ROOT = Path(__file__).resolve().parents[2] +COMPETENCES_ROOT = REPO_ROOT / "data" / "competences" +CANDIDATE_DIR = COMPETENCES_ROOT / "candidate" +SUPERVISED_DIR = COMPETENCES_ROOT / "supervised" +STABLE_DIR = COMPETENCES_ROOT / "stable" +AUDIT_PATH = COMPETENCES_ROOT / "persist_audit.jsonl" +INCOMPLETE_PATH = COMPETENCES_ROOT / "incomplete_learnings.jsonl" + +# Pattern final autorise pour un slug de competence. +SLUG_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,79}$") + +# Detection PII MVP — regex parametrable via env RPA_PII_PATTERNS +# (separes par |). Defaut : couvre patterns simples (IPP, NIR, email, tel FR). +_DEFAULT_PII_PATTERNS = [ + r"\b\d{13}\b", # NIR FR (13 chiffres) + r"\b\d{15}\b", # NIR FR + cle + r"\bIPP[\s:_-]*\d{6,}\b", # IPP hospitalier + r"[\w\.-]+@[\w\.-]+\.\w{2,}", # email + r"\b0[1-9](?:[ .-]?\d{2}){4}\b", # telephone FR +] + + +def _compile_pii_patterns() -> list[re.Pattern[str]]: + raw = os.environ.get("RPA_PII_PATTERNS") + patterns = raw.split("|") if raw else _DEFAULT_PII_PATTERNS + compiled: list[re.Pattern[str]] = [] + for pat in patterns: + pat = pat.strip() + if not pat: + continue + try: + compiled.append(re.compile(pat, re.IGNORECASE)) + except re.error: + continue + return compiled + + +# ---------------------------------------------------------------------------- +# Slugification +# ---------------------------------------------------------------------------- + + +def slugify(name: str) -> str: + """Convertir un nom libre en slug ASCII strict. + + Regle : + - translitteration NFKD (suppression accents) + - lowercase, espaces / tirets / points -> '_' + - chars hors [a-z0-9_] retires + - underscores multiples reduits a 1 + - troncature a 80 chars max + - doit matcher SLUG_PATTERN + + Leve ValueError si le slug final ne matche pas le pattern. + """ + if not isinstance(name, str): + raise ValueError("name doit etre une chaine non vide") + raw = name.strip() + if not raw: + raise ValueError("name est vide") + + # NFKD pour decomposer les accents puis suppression des combinaisons + normalized = unicodedata.normalize("NFKD", raw) + ascii_only = normalized.encode("ascii", "ignore").decode("ascii") + # Espaces / tirets / points / slashes -> underscore + cleaned = re.sub(r"[\s\-./\\]+", "_", ascii_only.lower()) + # Tout ce qui n'est pas [a-z0-9_] -> supprime + cleaned = re.sub(r"[^a-z0-9_]+", "", cleaned) + # Reduire underscores multiples + cleaned = re.sub(r"_+", "_", cleaned).strip("_") + # Forcer commencement par une lettre (si commence par chiffre, prefixer) + if cleaned and cleaned[0].isdigit(): + cleaned = f"c_{cleaned}" + # Tronquer + if len(cleaned) > 80: + cleaned = cleaned[:80].rstrip("_") + + if not SLUG_PATTERN.match(cleaned): + raise ValueError( + f"slug invalide '{cleaned}' (regle : {SLUG_PATTERN.pattern})" + ) + return cleaned + + +# ---------------------------------------------------------------------------- +# Collisions cross-states +# ---------------------------------------------------------------------------- + + +def detect_cross_state_collision( + slug: str, + *, + competences_root: Path = COMPETENCES_ROOT, +) -> Optional[str]: + """Retourne le sous-dossier ou un YAML .yaml existe deja, sinon None. + + Verifie candidate/, supervised/, stable/. + """ + for sub in ("candidate", "supervised", "stable"): + target = competences_root / sub / f"{slug}.yaml" + if target.exists(): + return sub + return None + + +# ---------------------------------------------------------------------------- +# Detection PII +# ---------------------------------------------------------------------------- + + +def detect_pii(payload: Any) -> list[str]: + """Parcourt recursivement un payload (dict/list/str) et retourne la liste + des patterns PII matches. Liste vide = pas de PII detecte. + + L'appelant decide quoi en faire (HTTP 400 + log non-sensible). + """ + matches: list[str] = [] + patterns = _compile_pii_patterns() + if not patterns: + return matches + + def _walk(node: Any) -> None: + if isinstance(node, str): + for pat in patterns: + if pat.search(node): + matches.append(pat.pattern) + elif isinstance(node, dict): + for v in node.values(): + _walk(v) + elif isinstance(node, (list, tuple)): + for v in node: + _walk(v) + + _walk(payload) + # dedoublonner en preservant l'ordre + seen = set() + out: list[str] = [] + for p in matches: + if p not in seen: + seen.add(p) + out.append(p) + return out + + +# ---------------------------------------------------------------------------- +# Atomic write +# ---------------------------------------------------------------------------- + + +def atomic_write_yaml( + target_path: Path, + data: dict[str, Any], + *, + persist_id: str, +) -> Path: + """Ecrire un dict en YAML de maniere atomique. + + 1. Ecrit dans /..tmp. + 2. os.rename vers target_path (POSIX atomic) + 3. En cas d'echec, supprime le .tmp si possible. + + Retourne le chemin final (target_path). + """ + target_path = Path(target_path) + target_dir = target_path.parent + target_dir.mkdir(parents=True, exist_ok=True) + tmp_name = f".{target_path.name}.tmp.{persist_id}" + tmp_path = target_dir / tmp_name + + try: + with tmp_path.open("w", encoding="utf-8") as handle: + yaml.safe_dump( + data, + handle, + allow_unicode=True, + sort_keys=False, + default_flow_style=False, + ) + handle.flush() + try: + os.fsync(handle.fileno()) + except OSError: + pass + # rename atomique (POSIX). Echoue si target existe deja sur Windows, + # mais Linux (POSIX) ecrase silencieusement. On a verifie la collision + # avant l'appel. + os.rename(tmp_path, target_path) + except Exception: + if tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + raise + + return target_path + + +# ---------------------------------------------------------------------------- +# Audit append (JSONL + verrou) +# ---------------------------------------------------------------------------- + + +def audit_append( + entry: dict[str, Any], + *, + audit_path: Path = AUDIT_PATH, +) -> int: + """Append une ligne JSON dans le fichier audit, retourne audit_entry_id. + + L'audit_entry_id est un compteur monotone derive du nombre de lignes + avant l'append. La concurrence est serialisee via fcntl.flock (POSIX). + Sur les systemes sans fcntl (Windows), l'ecriture est best-effort. + """ + audit_path = Path(audit_path) + audit_path.parent.mkdir(parents=True, exist_ok=True) + + if "timestamp" not in entry: + entry["timestamp"] = ( + datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + ) + + # Open en append + lecture pour compter les lignes existantes (audit_entry_id). + flags = "a+" + with open(audit_path, flags, encoding="utf-8") as handle: + if _HAS_FCNTL: + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) # type: ignore[union-attr] + except OSError: + pass + try: + handle.seek(0) + line_count = sum(1 for _ in handle) + audit_entry_id = line_count + 1 + entry["audit_entry_id"] = audit_entry_id + handle.write(json.dumps(entry, ensure_ascii=False) + "\n") + handle.flush() + try: + os.fsync(handle.fileno()) + except OSError: + pass + finally: + if _HAS_FCNTL: + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) # type: ignore[union-attr] + except OSError: + pass + return audit_entry_id + + +def find_existing_audit_entry( + persist_id: str, + *, + audit_path: Path = AUDIT_PATH, +) -> Optional[dict[str, Any]]: + """Recherche une entree existante par persist_id pour l'idempotence.""" + if not persist_id: + return None + audit_path = Path(audit_path) + if not audit_path.exists(): + return None + try: + with audit_path.open("r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + if record.get("persist_id") == persist_id: + return record + except OSError: + return None + return None + + +# ---------------------------------------------------------------------------- +# YAML body construction +# ---------------------------------------------------------------------------- + + +REQUIRED_YAML_FIELDS = ( + "schema_version", + "id", + "name", + "version", + "learning_state", + "intent", + "parameters", + "preconditions", + "methods", + "success_marker", + "failure_message_template", + "promotion", + "generalisation", + "failure_log", + "created_at", + "last_updated_at", + "methods_execution", +) + + +def build_competence_yaml( + *, + slug: str, + name: str, + workflow_ir: dict[str, Any], + parameters: Optional[list[dict[str, Any]]], + intent_fr: str, + learning_state: str, + session_id: Optional[str], + machine_id: Optional[str], + external_agent_id: Optional[str] = None, +) -> dict[str, Any]: + """Construit le dict YAML conforme au schema de reference. + + Aligne sur ``data/competences/candidate/key_win_r_wait_explorer_exe.yaml``. + """ + now_iso = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + steps = list(workflow_ir.get("steps") or []) + preconditions = list(workflow_ir.get("preconditions") or []) + success_marker = workflow_ir.get("success_marker") or { + "mode": "all_of", + "timeout_ms": 5000, + "markers": [], + } + + methods: list[dict[str, Any]] = [] + for idx, step in enumerate(steps, start=1): + if not isinstance(step, dict): + continue + method = dict(step) + method.setdefault("id", f"step_{idx}_{step.get('kind') or 'action'}") + if "primitive_ref" not in method and method.get("kind"): + method["primitive_ref"] = method["kind"] + method.setdefault("observed", False) + methods.append(method) + + params_dict: dict[str, Any] = {} + for p in (parameters or []): + if isinstance(p, dict) and p.get("name"): + params_dict[str(p["name"])] = { + "type": p.get("type", "string"), + "required": bool(p.get("required", False)), + "description": p.get("description", ""), + } + + yaml_body: dict[str, Any] = { + "schema_version": 1, + "id": slug, + "name": name, + "version": 1, + "learning_state": learning_state, + "intent": {"fr": intent_fr or name}, + "parameters": params_dict, + "preconditions": preconditions, + "methods": methods, + "success_marker": success_marker, + "failure_message_template": workflow_ir.get("failure_message_template") + or { + "intention": intent_fr or name, + "attendu": "", + "vu": "{observed_human_state}", + "demande": "indiquer la correction attendue", + }, + "promotion": { + "history": [ + { + "at": now_iso, + "from": "observed", + "to": learning_state, + "by": "lea_persist_endpoint", + "reason": "persisted via /api/v1/lea/competences/candidate/persist", + } + ], + "candidate_requires": [ + "method_trace_present", + "success_marker_defined", + "failure_message_template_valid", + ], + "supervised_requires": ["replay_verified_once", "human_validation"], + "stable_requires": { + "min_successes": 3, + "distinct_contexts": 3, + "max_unexplained_failures": 0, + }, + "t2_known_gaps": [], + }, + "generalisation": { + "seen_contexts": [], + "method_success_rate": {}, + "variance_log": [], + }, + "failure_log": [], + "created_at": now_iso, + "last_updated_at": now_iso, + "methods_execution": "sequence", + } + + if session_id or machine_id or external_agent_id: + yaml_body["chain_refs"] = { + "source_session": session_id, + "machine_id": machine_id, + "external_agent_id": external_agent_id, + } + return yaml_body + + +def validate_yaml_schema(data: dict[str, Any]) -> list[str]: + """Verifie la presence des champs obligatoires. Retourne la liste des manquants.""" + return [field for field in REQUIRED_YAML_FIELDS if field not in data] + + +# ---------------------------------------------------------------------------- +# Rate limit token-bucket simple (en memoire, par machine_id) +# ---------------------------------------------------------------------------- + + +class PersistRateLimiter: + """Token-bucket minimal pour /persist. + + Par defaut : 10 requetes / minute / machine_id (cf. specs §6). + Instance unique attendue ; thread-safe via lock minimal. + """ + + def __init__(self, *, max_per_minute: int = 10, window_seconds: int = 60) -> None: + self.max_per_minute = max_per_minute + self.window_seconds = window_seconds + self._timestamps: dict[str, list[float]] = {} + + def allow(self, machine_id: str) -> tuple[bool, int]: + """Renvoie (allowed, retry_after_seconds). + + retry_after_seconds = 0 si autorise. + """ + if not machine_id: + return True, 0 + now = time.time() + bucket = self._timestamps.setdefault(machine_id, []) + # Purger les entrees hors fenetre + bucket[:] = [ts for ts in bucket if now - ts < self.window_seconds] + if len(bucket) >= self.max_per_minute: + oldest = bucket[0] + retry_after = max(1, int(self.window_seconds - (now - oldest))) + return False, retry_after + bucket.append(now) + return True, 0 + + def reset(self, machine_id: Optional[str] = None) -> None: + if machine_id is None: + self._timestamps.clear() + else: + self._timestamps.pop(machine_id, None) + + +# Instance partagee importable depuis api_stream +persist_rate_limiter = PersistRateLimiter() + + +__all__ = [ + "SLUG_PATTERN", + "COMPETENCES_ROOT", + "CANDIDATE_DIR", + "AUDIT_PATH", + "INCOMPLETE_PATH", + "REQUIRED_YAML_FIELDS", + "slugify", + "detect_cross_state_collision", + "detect_pii", + "atomic_write_yaml", + "audit_append", + "find_existing_audit_entry", + "build_competence_yaml", + "validate_yaml_schema", + "PersistRateLimiter", + "persist_rate_limiter", +] diff --git a/core/detection/ollama_client.py b/core/detection/ollama_client.py index 73dcafe08..aeff89bd7 100644 --- a/core/detection/ollama_client.py +++ b/core/detection/ollama_client.py @@ -16,6 +16,48 @@ import io logger = logging.getLogger(__name__) +def _extract_first_json_object(text: str) -> Optional[Dict[str, Any]]: + """Extrait le premier objet JSON racine d'un texte qui peut contenir + du contenu parasite après (typique des modèles VLM qui ajoutent une + explication post-JSON). + + Retourne None si aucun JSON valide n'est trouvé. + """ + if not text: + return None + # Trouver la première '{' au niveau racine + start = text.find("{") + if start < 0: + return None + depth = 0 + in_string = False + escape = False + for i in range(start, len(text)): + c = text[i] + if escape: + escape = False + continue + if c == "\\" and in_string: + escape = True + continue + if c == '"': + in_string = not in_string + continue + if in_string: + continue + if c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + candidate = text[start : i + 1] + try: + return json.loads(candidate) + except json.JSONDecodeError: + return None + return None + + class OllamaClient: """ Client Ollama pour VLM @@ -219,7 +261,93 @@ class OllamaClient: "success": False, "error": str(e) } - + + def generate_grounding( + self, + prompt: str, + image_path: Optional[str] = None, + image: Optional[Image.Image] = None, + extra_images_b64: Optional[List[str]] = None, + profile: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """D5-v2 (2026-05-25) : appel grounding VLM centralisé, prefill-aware. + + Utilise le profil dédié `vlm_config.get_grounding_profile()` pour + garantir num_ctx pinned (défaut 4096), prefill JSON, think=false, + temperature=0, num_predict court. Évite les chemins qui retomberaient + sur qwen2.5vl en ctx 8192. + + Le profile peut être surchargé via param explicite (utile tests). + + Reconstitue le JSON complet via prefill : la réponse Ollama est + complétée par le préfixe `{"x_pct":` avant parsing, pour que + `json.loads()` voit le JSON natif. + + Args: + prompt: prompt textuel (typiquement "Find element X") + image_path / image / extra_images_b64: cf. generate() + profile: override du profile grounding (sinon get_grounding_profile()) + + Returns: + Dict avec `response` (texte complet incluant prefill), `success`, + `error`, `parsed_json` (dict {x_pct, y_pct, confidence, ...} ou + None si non parsable), `profile_used` (dict). + + Notes: + - Pas de fallback automatique sur fallback_model ici. Le caller + décide de retry avec un autre modèle si besoin. + - `keep_alive` du profile n'est PAS envoyé en payload (Ollama + accepte mais non standard). À gérer côté pull/keep si critique. + """ + if profile is None: + from core.detection.vlm_config import get_grounding_profile + profile = get_grounding_profile(endpoint=self.endpoint) + + # Préserver le modèle courant, switcher temporairement. + original_model = self.model + self.model = profile["model"] + try: + result = self.generate( + prompt=prompt, + image_path=image_path, + image=image, + extra_images_b64=extra_images_b64, + temperature=profile["temperature"], + max_tokens=profile["num_predict"], + assistant_prefill=profile["prefill"], + num_ctx=profile["num_ctx"], + force_json=False, # prefill suffit, format=json ralentit qwen3.5 + ) + finally: + self.model = original_model + + # Logging non-bruyant : 1 ligne par appel grounding + elapsed_hint = "" # caller mesure via time.perf_counter si besoin + logger.info( + "[PERF] vlm.grounding model=%s ctx=%d prefill=%s success=%s", + profile["model"], profile["num_ctx"], + "yes" if profile["prefill"] else "no", + result.get("success", False), + ) + + # Parse JSON prefill-aware. Le contenu complet inclut déjà le prefill + # (reconstitué par generate()) sauf si prefill=None. Si pas de prefill, + # tenter parse direct (le modèle peut avoir produit du JSON pur). + parsed = None + content = (result.get("response") or "").strip() + if content: + try: + # Le JSON peut être suivi de texte parasite (qwen termine + # parfois par des explications). Couper à la 1ère accolade + # fermante au niveau racine. + parsed = _extract_first_json_object(content) + except Exception as e: + logger.debug("[PERF] vlm.grounding parse failed: %s — content=%r", e, content[:160]) + + result["parsed_json"] = parsed + result["profile_used"] = dict(profile) + return result + def detect_ui_elements(self, image_path: str) -> Dict[str, Any]: """ Détecter les éléments UI dans une image diff --git a/core/detection/vlm_config.py b/core/detection/vlm_config.py index e6bdd673d..4c10f76a6 100644 --- a/core/detection/vlm_config.py +++ b/core/detection/vlm_config.py @@ -134,13 +134,13 @@ def reset_vlm_model_cache(): def is_thinking_model(model_name: str) -> bool: - """Détermine si un modèle est un modèle 'thinking' (qwen3). + """Détermine si un modèle est un modèle 'thinking' (qwen3, qwen3.5). Les modèles thinking nécessitent un assistant prefill pour éviter le mode réflexion interne qui peut durer >180s avec des images. Args: - model_name: Nom du modèle (ex: "qwen3-vl:8b", "gemma4:e4b") + model_name: Nom du modèle (ex: "qwen3-vl:8b", "qwen3.5:9b", "gemma4:e4b") Returns: True si le modèle est de type thinking (nécessite prefill workaround) @@ -148,6 +148,92 @@ def is_thinking_model(model_name: str) -> bool: return "qwen3" in model_name.lower() +# ──────────────────────────────────────────────────────────────────────────── +# D5-v2 (2026-05-25) : profil grounding dédié, centralisé, env-overridable +# ──────────────────────────────────────────────────────────────────────────── + +# Profil grounding par défaut — qwen3.5:9b avec ctx 4096 et prefill JSON. +# Cohérent avec décision Codex après revue Gemini : empêcher rechauffe +# qwen2.5vl en ctx 8192 et garantir un chemin grounding reproductible. +DEFAULT_GROUNDING_MODEL = "qwen3.5:9b" +DEFAULT_GROUNDING_CTX = 4096 +DEFAULT_GROUNDING_PREFILL = '{"x_pct":' +DEFAULT_GROUNDING_TEMPERATURE = 0.0 +DEFAULT_GROUNDING_NUM_PREDICT = 96 # ~80 tokens suffisent pour `{x_pct,y_pct,confidence}` +DEFAULT_GROUNDING_KEEP_ALIVE = "30m" # éviter cold reload entre actions + +# Fallback grounding : qwen2.5vl conservé pour compat existante (rpa-tag). +DEFAULT_GROUNDING_FALLBACK = "qwen2.5vl:7b-rpa" + + +def get_grounding_profile(endpoint: str = DEFAULT_OLLAMA_ENDPOINT) -> dict: + """Retourne le profil VLM pour les appels de grounding **format JSON** + (réponse `{"x_pct": ..., "y_pct": ..., "confidence": ...}`). + + ⚠️ ATTENTION SCOPE D5-v3a (2026-05-25) : + Ce profil est destiné aux appels qui consomment la sortie via prefill JSON + (typiquement qwen3.5:9b avec prefill `{"x_pct":`). Il n'est PAS adapté + aux appels grounding **format bbox_2d natif** de qwen2.5vl (utilisés + dans `agent_v0/server_v1/resolve_engine.py:959-1013, 3008-3045` avec + parsing via `core.grounding.bbox_parser.parse_bbox_to_norm`). + + Conflit env var connu : `resolve_engine.py:959` lit aussi + `RPA_GROUNDING_MODEL` mais attend un modèle bbox_2d (qwen2.5vl). + Si tu setes `RPA_GROUNDING_MODEL=qwen3.5:9b`, ce profil OK mais le + site bbox legacy de resolve_engine va recevoir un modèle incompatible. + Reporté à D5-v3b : renommer en `RPA_BBOX_GROUNDING_MODEL` côté legacy + + introduire `OllamaClient.generate_bbox_grounding()`. + + Centralise la politique pour empêcher les chemins VLM de retomber sur + qwen2.5vl en num_ctx=8192 (Modelfile). Sortie consommée par + OllamaClient.generate_grounding(). + + Env vars supportées : + - RPA_GROUNDING_MODEL : modèle principal (défaut qwen3.5:9b) + - RPA_GROUNDING_CTX : context window (défaut 4096) + - RPA_GROUNDING_FALLBACK : modèle fallback (défaut qwen2.5vl:7b-rpa) + - RPA_VLM_PREFILL=false : désactive le prefill JSON (rare, debug) + + Returns: + dict avec clés : + - model: str + - num_ctx: int + - prefill: str ou None + - temperature: float + - num_predict: int + - think: bool (False pour qwen3 et qwen3.5) + - keep_alive: str + - fallback_model: str + """ + model = os.environ.get("RPA_GROUNDING_MODEL", DEFAULT_GROUNDING_MODEL).strip() + try: + num_ctx = int(os.environ.get("RPA_GROUNDING_CTX", str(DEFAULT_GROUNDING_CTX))) + except (TypeError, ValueError): + num_ctx = DEFAULT_GROUNDING_CTX + fallback = os.environ.get( + "RPA_GROUNDING_FALLBACK", DEFAULT_GROUNDING_FALLBACK + ).strip() + prefill_enabled = os.environ.get("RPA_VLM_PREFILL", "true").strip().lower() not in ( + "0", "false", "no", "off" + ) + prefill = DEFAULT_GROUNDING_PREFILL if prefill_enabled else None + + # think=False obligatoire pour qwen3/qwen3.5 (prefill = mécanisme principal) + # et gemma4 (sinon tokens vides Ollama >=0.20). + think_false = is_thinking_model(model) or needs_think_false(model) + + return { + "model": model, + "num_ctx": num_ctx, + "prefill": prefill, + "temperature": DEFAULT_GROUNDING_TEMPERATURE, + "num_predict": DEFAULT_GROUNDING_NUM_PREDICT, + "think": not think_false, # API Ollama : think=False → on envoie False + "keep_alive": DEFAULT_GROUNDING_KEEP_ALIVE, + "fallback_model": fallback, + } + + def needs_think_false(model_name: str) -> bool: """Détermine si un modèle nécessite think=false dans le payload. diff --git a/core/embedding/clip_embedder.py b/core/embedding/clip_embedder.py index db573b6fb..4c29cdfa2 100644 --- a/core/embedding/clip_embedder.py +++ b/core/embedding/clip_embedder.py @@ -59,8 +59,13 @@ class CLIPEmbedder(EmbedderBase): ) if device is None: + # NOTE: utiliser le `torch` du scope module (l. 8). Un import local + # ici rendait `torch` LOCAL à __init__ pour tout le scope, faisant + # planter `with torch.no_grad():` plus bas en UnboundLocalError + # quand l'appelant passait device="cpu" (l'import local n'était + # alors pas exécuté). Voir inbox_codex/2026-05-25_1235_..._enquete- + # feedbackbus-5004.md. try: - import torch if torch.cuda.is_available(): free_vram = torch.cuda.mem_get_info()[0] / 1024**3 if free_vram > 1.5: diff --git a/core/llm/__init__.py b/core/llm/__init__.py index a4056b74b..50242000b 100644 --- a/core/llm/__init__.py +++ b/core/llm/__init__.py @@ -6,7 +6,11 @@ from .t2a_decision import ( analyze_dpi, build_dpi_enriched, ) -from .ocr_extractor import extract_table_from_image, extract_text_from_image +from .ocr_extractor import ( + extract_digits_tesseract_from_image, + extract_table_from_image, + extract_text_from_image, +) __all__ = [ "PROMPT_TEMPLATE", @@ -15,4 +19,5 @@ __all__ = [ "build_dpi_enriched", "extract_text_from_image", "extract_table_from_image", + "extract_digits_tesseract_from_image", ] diff --git a/core/llm/ocr_extractor.py b/core/llm/ocr_extractor.py index 07774f8b5..e1f43ae5f 100644 --- a/core/llm/ocr_extractor.py +++ b/core/llm/ocr_extractor.py @@ -1,6 +1,7 @@ """Extracteur OCR — texte depuis une image (screenshot d'écran). Utilise EasyOCR fr+en. Singleton (chargement modèle ~3s au premier appel). +Ajoute un chemin Tesseract spécialisé pour les chiffres/IPP d'écrans propres. Conçu pour le pipeline streaming serveur (actions `extract_text` / `extract_table`) : récupère un screenshot fresh (dernier heartbeat ou @@ -11,6 +12,7 @@ pour analyse downstream (ex: t2a_decision, boucle sur N patients). from __future__ import annotations import logging +import os import re from pathlib import Path from typing import List, Optional, Tuple @@ -20,6 +22,19 @@ logger = logging.getLogger(__name__) _easyocr_reader = None +def easyocr_gpu_enabled(default: bool = False) -> bool: + """Return whether EasyOCR may allocate GPU memory. + + The replay server shares the GPU with Ollama. Defaulting EasyOCR to CPU + keeps VRAM available for the VLM; set RPA_EASYOCR_GPU=1 only for a measured + OCR benchmark or a runtime that has spare VRAM. + """ + raw = os.getenv("RPA_EASYOCR_GPU", "") + if not raw: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + def _get_reader(): """Initialise EasyOCR fr+en au premier appel (singleton, CPU forcé). @@ -29,8 +44,9 @@ def _get_reader(): global _easyocr_reader if _easyocr_reader is None: import easyocr - _easyocr_reader = easyocr.Reader(['fr', 'en'], gpu=False, verbose=False) - logger.info("EasyOCR initialisé (fr+en, CPU)") + gpu = easyocr_gpu_enabled(default=False) + _easyocr_reader = easyocr.Reader(['fr', 'en'], gpu=gpu, verbose=False) + logger.info("EasyOCR initialisé (fr+en, %s)", "GPU" if gpu else "CPU") return _easyocr_reader @@ -73,17 +89,86 @@ def extract_text_from_image( return "" +def extract_digits_tesseract_from_image( + image_path: str, + region: Optional[Tuple[int, int, int, int]] = None, + pattern: Optional[str] = None, + limit: Optional[int] = None, + psm: int = 6, + lang: str = "eng", + whitelist: str = "0123456789", +) -> List[str]: + """Extrait des valeurs numeriques via Tesseract. + + Cas d'usage principal : IPP/champs chiffres dans des tableaux d'écran. + Ce chemin est volontairement explicite pour ne pas changer le comportement + EasyOCR general utilise par `extract_text`. + + Args: + image_path: chemin du PNG/JPG sur disque. + region: (x, y, w, h) pour cropper avant OCR. None = image entière. + pattern: regex Python appliquee aux sequences de chiffres extraites. + Exemple IPP : r"^25\\d{6}$". + limit: nombre maximal de valeurs retournees. + psm: page segmentation mode Tesseract. 6 = bloc uniforme de texte. + lang: langue Tesseract. + whitelist: caracteres autorises. Par defaut chiffres uniquement. + + Returns: + Liste de sequences numeriques dans l'ordre de lecture Tesseract. + En cas d'erreur, retourne une liste vide et log un warning. + """ + path = Path(image_path) + if not path.exists(): + logger.warning("extract_digits_tesseract: fichier introuvable %s", image_path) + return [] + + try: + from PIL import Image + import pytesseract + + with Image.open(path) as img: + if region: + x, y, w, h = region + img = img.crop((x, y, x + w, y + h)) + if img.mode not in {"L", "RGB"}: + img = img.convert("RGB") + + config_parts = ["--psm", str(psm)] + if whitelist: + config_parts.extend(["-c", f"tessedit_char_whitelist={whitelist}"]) + text = pytesseract.image_to_string( + img, + lang=lang, + config=" ".join(config_parts), + ) + + values = re.findall(r"\d+", text) + if pattern: + compiled = re.compile(pattern) + values = [v for v in values if compiled.match(v)] + if limit: + values = values[:limit] + return values + except Exception as e: + logger.warning("extract_digits_tesseract échoué sur %s : %s", image_path, e) + return [] + + def extract_table_from_image( image_path: str, region: Optional[Tuple[int, int, int, int]] = None, pattern: Optional[str] = None, limit: Optional[int] = None, + engine: str = "easyocr", ) -> List[str]: """Extrait une liste de valeurs d'un tableau via OCR. Cas d'usage principal : lire la liste des IPP d'un tableau de patients - pour boucler dessus. EasyOCR retourne tous les tokens avec leur bbox, - on filtre par regex puis on trie par position (y croissant). + pour boucler dessus. Par défaut, EasyOCR retourne tous les tokens avec + leur bbox, on filtre par regex puis on trie par position (y croissant). + Pour des champs chiffres/IPP, `engine="tesseract"` active le chemin + spécialisé Tesseract validé sur captures Easily. Args: image_path: chemin du PNG sur disque. @@ -92,6 +177,7 @@ def extract_table_from_image( Si None : tous les tokens non vides sont retournés. Exemple IPP : r"^\\d{8,10}$" ou r"^25\\d{6}$" limit: nombre maximal d'entrées à retourner (None = sans limite). + engine: "easyocr" (defaut) ou "tesseract" / "digits" / "ipp". Returns: Liste de strings dans l'ordre top → bottom (par y de bbox). @@ -102,6 +188,15 @@ def extract_table_from_image( logger.warning("extract_table: fichier introuvable %s", image_path) return [] + engine_name = (engine or "easyocr").strip().lower() + if engine_name in {"tesseract", "digits", "ipp"}: + return extract_digits_tesseract_from_image( + image_path, + region=region, + pattern=pattern, + limit=limit, + ) + try: from PIL import Image import numpy as np diff --git a/core/pipeline/workflow_pipeline.py b/core/pipeline/workflow_pipeline.py index fba64a79b..a88f15efd 100644 --- a/core/pipeline/workflow_pipeline.py +++ b/core/pipeline/workflow_pipeline.py @@ -99,10 +99,17 @@ class WorkflowPipeline: logger.info("✓ Fusion Engine initialized") # 3. State Embedding Builder + clip_embedders = { + "image": self.clip_embedder, + "text": self.clip_embedder, + "title": self.clip_embedder, + "ui": self.clip_embedder, + } self.embedding_builder = StateEmbeddingBuilder( fusion_engine=self.fusion_engine, + embedders=clip_embedders, output_dir=self.embeddings_dir, - use_clip=True + use_clip=False ) logger.info("✓ State Embedding Builder initialized") diff --git a/core/semantic/__init__.py b/core/semantic/__init__.py new file mode 100644 index 000000000..22658a946 --- /dev/null +++ b/core/semantic/__init__.py @@ -0,0 +1,38 @@ +"""Phase 2.5 — Analyse sémantique post-apprentissage. + +Module dédié à l'analyse sémantique des écrans capturés en phase Shadow, +**après** ``/api/v1/shadow/stop`` et **avant** restitution Option C. + +Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md`` + +Principes (arbitrage Plato 2026-06-01) : +- Post-apprentissage uniquement, **jamais en hot path replay**. +- OmniParser encapsulé derrière garde-fou anti-fragilité. +- Fallback OCR-seul (docTR) systématique en cas d'exception. +- Stockage ``.semantic.yaml`` séparé du YAML compétence principal. +- Opt-in par compétence (rétrocompat totale). +""" + +from .phase25_analyzer import ( + Phase25Analyzer, + Phase25Result, + ScreenAnalysis, + SemanticStructure, + SEMANTIC_DIR, + OMNIPARSER_CACHE_DIR, + OMNIPARSER_ERROR_LOG, + PHASH_HAMMING_THRESHOLD, + MAX_SCREENS_PER_SESSION, +) + +__all__ = [ + "Phase25Analyzer", + "Phase25Result", + "ScreenAnalysis", + "SemanticStructure", + "SEMANTIC_DIR", + "OMNIPARSER_CACHE_DIR", + "OMNIPARSER_ERROR_LOG", + "PHASH_HAMMING_THRESHOLD", + "MAX_SCREENS_PER_SESSION", +] diff --git a/core/semantic/phase25_analyzer.py b/core/semantic/phase25_analyzer.py new file mode 100644 index 000000000..b371a167f --- /dev/null +++ b/core/semantic/phase25_analyzer.py @@ -0,0 +1,920 @@ +"""Phase 2.5 — Analyseur sémantique post-apprentissage. + +Module isolé qui prend en entrée un ensemble de screenshots capturés +pendant la phase Shadow et produit un payload structuré +``{tables, forms, buttons, text_blocks}`` par écran distinct, +stocké dans un fichier ``.semantic.yaml`` séparé. + +Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md`` + +Garde-fous : +- Wrapper try/except global autour de chaque appel OmniParser. +- Fallback OCR-seul (docTR) si OmniParser indisponible ou KO. +- Healthcheck OmniParser au démarrage : KO ⇒ bascule auto en dégradé. +- Cache disque ``data/cache/omniparser//.json``. +- Cap 10 écrans distincts par session. +- Aucun import de FastAPI, aucun appel réseau direct. +""" + +from __future__ import annotations + +import concurrent.futures +import hashlib +import io +import json +import logging +import re +import time +import traceback +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable, List, Optional, Sequence, Tuple + +try: # pragma: no cover - dépendance externe déjà présente dans le projet + import yaml +except ImportError as exc: # pragma: no cover + raise RuntimeError("PyYAML est requis pour core.semantic.phase25_analyzer") from exc + +try: # PIL toujours présent côté Linux dev / DGX + from PIL import Image + _HAS_PIL = True +except ImportError: # pragma: no cover + Image = None # type: ignore[assignment] + _HAS_PIL = False + +try: + import imagehash # type: ignore + _HAS_IMAGEHASH = True +except ImportError: # pragma: no cover - fallback MD5 thumbnail + imagehash = None # type: ignore[assignment] + _HAS_IMAGEHASH = False + + +logger = logging.getLogger(__name__) + + +# ---------------------------------------------------------------------------- +# Constantes et chemins +# ---------------------------------------------------------------------------- + +REPO_ROOT = Path(__file__).resolve().parents[2] +DATA_ROOT = REPO_ROOT / "data" +SEMANTIC_DIR = DATA_ROOT / "competences" / "candidate" +OMNIPARSER_CACHE_ROOT = DATA_ROOT / "cache" / "omniparser" +OMNIPARSER_CACHE_DIR = OMNIPARSER_CACHE_ROOT # alias public +LOGS_DIR = REPO_ROOT / "logs" +OMNIPARSER_ERROR_LOG = LOGS_DIR / "omniparser_errors.log" + +# Heuristique de regroupement perceptuel (cf. specs §3). +PHASH_HAMMING_THRESHOLD = 8 +MAX_SCREENS_PER_SESSION = 10 +THUMBNAIL_SIZE = (256, 256) # fallback MD5 + +# Timeout par screenshot (cf. specs §2). +OMNIPARSER_TIMEOUT_SEC = 30.0 + +# Slug autorisé (réutilisation du pattern persist : a-z0-9_). +SLUG_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,79}$") +# session_id autorisé : caractères inoffensifs uniquement. +SESSION_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_\-]{0,127}$") + + +# ---------------------------------------------------------------------------- +# Dataclasses +# ---------------------------------------------------------------------------- + + +@dataclass +class SemanticStructure: + """Structure sémantique d'un écran (cf. specs §2).""" + + tables: List[dict] = field(default_factory=list) + forms: List[dict] = field(default_factory=list) + buttons: List[dict] = field(default_factory=list) + text_blocks: List[dict] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "tables": list(self.tables), + "forms": list(self.forms), + "buttons": list(self.buttons), + "text_blocks": list(self.text_blocks), + } + + +@dataclass +class ScreenAnalysis: + """Analyse d'un écran représentatif (cf. specs §3).""" + + index: int + phash: str + screen_id: str + screenshot_path: Optional[str] + structure: SemanticStructure + degraded: bool = False + degraded_reason: Optional[str] = None + elapsed_sec: float = 0.0 + window_title: Optional[str] = None + # Snapshot "contrat Codex" : représentation aplatie destinée à + # l'agent-chat / dashboard. Calculée à la volée par to_dict(). + + def to_dict(self) -> dict: + elements = _structure_to_elements(self.structure) + return { + "index": self.index, + "hash": self.phash, + "screen_id": self.screen_id, + "window_title": self.window_title, + "screenshot_path": self.screenshot_path, + "structure": self.structure.to_dict(), + "elements": elements, + "degraded": self.degraded, + "degraded_reason": self.degraded_reason, + "elapsed_sec": round(self.elapsed_sec, 3), + } + + +@dataclass +class Phase25Result: + """Résultat global d'une analyse Phase 2.5.""" + + session_id: str + generated_at: str + omniparser_available: bool + degraded: bool + too_complex: bool + screens: List[ScreenAnalysis] = field(default_factory=list) + healthcheck_passed: bool = True + healthcheck_reason: Optional[str] = None + + def to_dict(self) -> dict: + return { + "session_id": self.session_id, + "generated_at": self.generated_at, + "omniparser_available": self.omniparser_available, + "degraded": self.degraded, + "too_complex": self.too_complex, + "healthcheck_passed": self.healthcheck_passed, + "healthcheck_reason": self.healthcheck_reason, + "screens": [s.to_dict() for s in self.screens], + } + + +# ---------------------------------------------------------------------------- +# Helpers : validation et FS +# ---------------------------------------------------------------------------- + + +def _validate_session_id(session_id: Any) -> str: + if not isinstance(session_id, str) or not session_id.strip(): + raise ValueError("session_id doit etre une chaine non vide") + sid = session_id.strip() + if not SESSION_ID_PATTERN.match(sid): + raise ValueError( + "session_id invalide (autorise : [A-Za-z0-9][A-Za-z0-9_-]{0,127})" + ) + # Anti path-traversal de ceinture-bretelles : on refuse explicitement + # toute tentative ../ même si le regex ne devrait pas la laisser passer. + if ".." in sid or "/" in sid or "\\" in sid: + raise ValueError("session_id invalide (path-traversal interdit)") + return sid + + +def _validate_slug(slug: Any) -> str: + if not isinstance(slug, str): + raise ValueError("slug doit etre une chaine") + s = slug.strip() + if not SLUG_PATTERN.match(s): + raise ValueError( + f"slug invalide '{s}' (regle : {SLUG_PATTERN.pattern})" + ) + return s + + +def _ensure_dir(path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) + + +def _log_omniparser_error(session_id: str, frame_index: int, exc: BaseException) -> None: + """Append-only sur ``logs/omniparser_errors.log`` (cf. specs §7).""" + try: + _ensure_dir(LOGS_DIR) + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "session_id": session_id, + "frame_index": frame_index, + "error_type": type(exc).__name__, + "error_message": str(exc), + "traceback": traceback.format_exception_only(type(exc), exc), + } + with OMNIPARSER_ERROR_LOG.open("a", encoding="utf-8") as fh: + fh.write(json.dumps(entry, ensure_ascii=False) + "\n") + except OSError as log_exc: # pragma: no cover - log best-effort + logger.warning("[PHASE25] echec ecriture omniparser_errors.log : %s", log_exc) + + +# ---------------------------------------------------------------------------- +# Hash perceptuel (avec fallback MD5) +# ---------------------------------------------------------------------------- + + +def compute_phash(image: "Image.Image") -> str: + """Calcule un hash perceptuel ou un hash MD5 thumbnail (fallback).""" + if _HAS_IMAGEHASH and imagehash is not None: + try: + return str(imagehash.phash(image)) + except Exception as exc: # pragma: no cover + logger.warning("[PHASE25] phash imagehash KO, fallback MD5 : %s", exc) + # Fallback MD5 sur thumbnail. + thumb = image.copy() + thumb.thumbnail(THUMBNAIL_SIZE) + buf = io.BytesIO() + thumb.convert("RGB").save(buf, format="PNG") + return "md5:" + hashlib.md5(buf.getvalue()).hexdigest() + + +def _hamming_distance(h1: str, h2: str) -> int: + """Distance de Hamming entre deux phash imagehash, ou fallback MD5. + + - Cas imagehash : on reconvertit via ``imagehash.hex_to_hash``. + - Cas MD5 (préfixe ``md5:``) : 0 si égal, sinon distance "haute" pour ne + jamais les considérer comme similaires (heuristique conservative). + """ + if h1.startswith("md5:") or h2.startswith("md5:"): + return 0 if h1 == h2 else PHASH_HAMMING_THRESHOLD + 1 + if not _HAS_IMAGEHASH or imagehash is None: + # Pas d'imagehash mais les hashes hex présents (rare) : XOR brut. + try: + i1 = int(h1, 16) + i2 = int(h2, 16) + return bin(i1 ^ i2).count("1") + except ValueError: + return PHASH_HAMMING_THRESHOLD + 1 + try: + return abs(imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)) + except Exception: + return PHASH_HAMMING_THRESHOLD + 1 + + +def identify_distinct_screens( + frames: Sequence[Tuple[int, "Image.Image"]], + threshold: int = PHASH_HAMMING_THRESHOLD, +) -> List[Tuple[int, "Image.Image", str]]: + """Regroupe les frames par similarité phash et retourne un représentant par groupe. + + Args: + frames: séquence ``(frame_index, PIL.Image)``. + threshold: Hamming distance max pour considérer deux frames identiques. + + Returns: + Liste ``(frame_index, image, phash)`` — un représentant par groupe, + dans l'ordre temporel d'apparition (premier vu = représentant). + """ + representatives: List[Tuple[int, Image.Image, str]] = [] + for idx, img in frames: + h = compute_phash(img) + matched = False + for ridx, _rimg, rhash in representatives: + if _hamming_distance(h, rhash) <= threshold: + matched = True + logger.debug( + "[PHASE25] frame %d regroupee avec representant %d (phash=%s)", + idx, ridx, h, + ) + break + if not matched: + representatives.append((idx, img, h)) + return representatives + + +# ---------------------------------------------------------------------------- +# Conversion structure ⇄ "elements" (contrat Codex) +# ---------------------------------------------------------------------------- + + +def _structure_to_elements(struct: SemanticStructure) -> List[dict]: + """Aplatissement structure -> liste d'éléments {kind, label, bbox, confidence}.""" + elements: List[dict] = [] + for tbl in struct.tables: + elements.append({ + "kind": "table", + "label": tbl.get("label", "table"), + "bbox": tbl.get("bbox", []), + "confidence": float(tbl.get("confidence", 0.5)), + }) + for frm in struct.forms: + elements.append({ + "kind": "field", + "label": frm.get("label", "field"), + "bbox": frm.get("bbox", []), + "confidence": float(frm.get("confidence", 0.5)), + }) + for btn in struct.buttons: + elements.append({ + "kind": "button", + "label": btn.get("label", "button"), + "bbox": btn.get("bbox", []), + "confidence": float(btn.get("confidence", 0.5)), + }) + for tb in struct.text_blocks: + elements.append({ + "kind": "text_block", + "label": tb.get("label", tb.get("text", "")), + "bbox": tb.get("bbox", []), + "confidence": float(tb.get("confidence", 0.5)), + }) + return elements + + +def _classify_element(label: str, kind_hint: str | None = None) -> str: + """Heuristique de classification d'un élément OmniParser. + + Cohérente avec ``OmniParserAdapter._classify_element``, mais retourne + nos catégories sémantiques : ``table | field | button | text_block``. + """ + lab = (label or "").lower() + if kind_hint: + kh = kind_hint.lower() + if "table" in kh: + return "table" + if "input" in kh or "field" in kh or "edit" in kh: + return "field" + if "button" in kh or "btn" in kh: + return "button" + if any(kw in lab for kw in ("button", "btn", "submit", "valider", "annuler", "ok", "close")): + return "button" + if any(kw in lab for kw in ("input", "field", "saisie", "textbox", "champ")): + return "field" + if "table" in lab or "grille" in lab: + return "table" + return "text_block" + + +# ---------------------------------------------------------------------------- +# Adapter wrappers : OmniParser et docTR (fallback) +# ---------------------------------------------------------------------------- + + +class _OmniParserSafeWrapper: + """Wrap fragile OmniParserAdapter avec garde-fou anti-exception. + + - Import paresseux (lazy) pour ne pas casser l'import du module si + OmniParser n'est pas installé. + - ``available=False`` ⇒ caller bascule en fallback OCR-seul. + - Timeout effectif appliqué autour de chaque appel ``detect`` via + ``ThreadPoolExecutor`` + ``future.result(timeout=...)``. + """ + + # Executor module-level pour ne pas créer un pool par appel. + _TIMEOUT_EXECUTOR: Optional[concurrent.futures.ThreadPoolExecutor] = None + + @classmethod + def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor: + if cls._TIMEOUT_EXECUTOR is None: + cls._TIMEOUT_EXECUTOR = concurrent.futures.ThreadPoolExecutor( + max_workers=2, thread_name_prefix="phase25-omniparser-timeout", + ) + return cls._TIMEOUT_EXECUTOR + + def __init__(self) -> None: + self._adapter: Any = None + self._available: bool = False + self._import_error: Optional[str] = None + self._try_import() + + def _try_import(self) -> None: + try: + from core.detection.omniparser_adapter import OmniParserAdapter # type: ignore + self._adapter = OmniParserAdapter() + self._available = bool(getattr(self._adapter, "available", False)) + if not self._available: + # L'adapter existe mais le check de disponibilité a échoué. + self._import_error = "OmniParser adapter installé mais modèles non disponibles" + except Exception as exc: + self._adapter = None + self._available = False + self._import_error = f"{type(exc).__name__}: {exc}" + + @property + def available(self) -> bool: + return self._available + + @property + def import_error(self) -> Optional[str]: + return self._import_error + + def detect( + self, + image: "Image.Image", + *, + timeout: Optional[float] = None, + ) -> List[Any]: + """Appel sécurisé : enrobé d'un timeout dur, lève en cas d'exception. + + Args: + image: image PIL à analyser. + timeout: timeout en secondes (défaut : ``OMNIPARSER_TIMEOUT_SEC``). + Si dépassé ⇒ ``concurrent.futures.TimeoutError`` propagée au + caller, qui bascule en fallback docTR + ``degraded=True``. + """ + if not self._available or self._adapter is None: + return [] + effective_timeout = ( + timeout if timeout is not None else OMNIPARSER_TIMEOUT_SEC + ) + executor = self._get_executor() + future = executor.submit(self._adapter.detect, image) + try: + return list(future.result(timeout=effective_timeout)) + except concurrent.futures.TimeoutError as exc: + # Le thread OmniParser continue son travail en arrière-plan mais + # le résultat est ignoré ; le caller bascule en fallback docTR. + logger.warning( + "[PHASE25] OmniParser.detect timeout (%.1fs) -> fallback", + effective_timeout, + ) + raise + except Exception as exc: + logger.warning("[PHASE25] OmniParser.detect KO : %s", exc) + raise # remonté au caller pour log + fallback + + +def _detect_via_omniparser( + wrapper: _OmniParserSafeWrapper, + image: "Image.Image", + *, + timeout: Optional[float] = None, +) -> List[Any]: + return wrapper.detect(image, timeout=timeout) + + +def _detect_via_doctr(image: "Image.Image", screenshot_path: Optional[str]) -> List[dict]: + """Fallback OCR-seul (docTR). Retourne une liste de text_blocks bruts. + + Aucun VLM, aucune classification fine — juste OCR ⇒ ``text_blocks``. + """ + if not _HAS_PIL or image is None: + return [] + try: + from doctr.io import DocumentFile # type: ignore + from doctr.models import ocr_predictor # type: ignore + except ImportError: + logger.info("[PHASE25] docTR non disponible pour fallback OCR") + return [] + + # Cache predictor module-level pour éviter rechargement. + global _DOCTR_PREDICTOR + try: + _DOCTR_PREDICTOR # type: ignore[used-before-def] + except NameError: + _DOCTR_PREDICTOR = None # type: ignore[assignment] + + try: + if _DOCTR_PREDICTOR is None: # type: ignore[has-type] + _DOCTR_PREDICTOR = ocr_predictor( # type: ignore[assignment] + det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True, + ) + except Exception as exc: # pragma: no cover + logger.warning("[PHASE25] docTR init KO : %s", exc) + return [] + + # docTR prend un fichier ou un array numpy ; on privilégie le chemin si fourni. + blocks: List[dict] = [] + try: + if screenshot_path and Path(screenshot_path).exists(): + doc = DocumentFile.from_images([screenshot_path]) + else: + buf = io.BytesIO() + image.convert("RGB").save(buf, format="PNG") + buf.seek(0) + doc = DocumentFile.from_images([buf.getvalue()]) + result = _DOCTR_PREDICTOR(doc) # type: ignore[misc] + W, H = image.size + for page in result.pages: + for block in page.blocks: + for line_obj in block.lines: + text = " ".join(w.value for w in line_obj.words).strip() + if not text: + continue + geom = line_obj.geometry # ((x1,y1), (x2,y2)) norm 0-1 + x1 = int(geom[0][0] * W) + y1 = int(geom[0][1] * H) + x2 = int(geom[1][0] * W) + y2 = int(geom[1][1] * H) + blocks.append({ + "label": text, + "text": text, + "bbox": [x1, y1, x2, y2], + "confidence": 0.6, # docTR ne donne pas de score line-level facilement + }) + except Exception as exc: # pragma: no cover + logger.warning("[PHASE25] docTR predict KO : %s", exc) + return [] + + return blocks + + +def _elements_to_structure(elements: Iterable[Any]) -> SemanticStructure: + """Convertit la liste OmniParser ``DetectedElement`` en SemanticStructure.""" + struct = SemanticStructure() + for el in elements: + # Compatible avec DetectedElement (dataclass) et dict. + if hasattr(el, "label"): + label = getattr(el, "label", "") or "" + bbox = list(getattr(el, "bbox", ()) or ()) + conf = float(getattr(el, "confidence", 0.5) or 0.5) + kind_hint = getattr(el, "element_type", None) + elif isinstance(el, dict): + label = str(el.get("label") or el.get("text") or "") + bbox = list(el.get("bbox") or []) + conf = float(el.get("confidence", el.get("score", 0.5)) or 0.5) + kind_hint = el.get("element_type") or el.get("type") + else: + continue + + kind = _classify_element(label, kind_hint) + entry = {"label": label, "bbox": bbox, "confidence": conf} + if kind == "table": + struct.tables.append(entry) + elif kind == "field": + struct.forms.append(entry) + elif kind == "button": + struct.buttons.append(entry) + else: + struct.text_blocks.append({**entry, "text": label}) + return struct + + +# ---------------------------------------------------------------------------- +# Cache disque +# ---------------------------------------------------------------------------- + + +def _cache_path(session_id: str, frame_index: int) -> Path: + sid = _validate_session_id(session_id) + return OMNIPARSER_CACHE_ROOT / sid / f"{int(frame_index)}.json" + + +def _cache_read(session_id: str, frame_index: int) -> Optional[dict]: + path = _cache_path(session_id, frame_index) + if not path.exists(): + return None + try: + with path.open("r", encoding="utf-8") as fh: + return json.load(fh) + except (OSError, json.JSONDecodeError) as exc: + logger.warning("[PHASE25] cache illisible %s : %s", path, exc) + return None + + +def _cache_write(session_id: str, frame_index: int, payload: dict) -> None: + path = _cache_path(session_id, frame_index) + try: + _ensure_dir(path.parent) + tmp = path.with_suffix(".json.tmp") + with tmp.open("w", encoding="utf-8") as fh: + json.dump(payload, fh, ensure_ascii=False, indent=2) + tmp.replace(path) + except OSError as exc: # pragma: no cover + logger.warning("[PHASE25] cache ecriture KO %s : %s", path, exc) + + +# ---------------------------------------------------------------------------- +# Analyseur principal +# ---------------------------------------------------------------------------- + + +class Phase25Analyzer: + """Analyseur sémantique post-apprentissage. + + Usage minimal : + + analyzer = Phase25Analyzer(session_id="abc123") + result = analyzer.analyze_frames(frames=[(0, img0), (12, img12), ...]) + path = analyzer.write_semantic_yaml(result, slug="ma_competence") + + ``frames`` est une séquence ``(frame_index, PIL.Image[, screenshot_path])``. + """ + + def __init__( + self, + session_id: str, + *, + omniparser: Optional[_OmniParserSafeWrapper] = None, + max_screens: int = MAX_SCREENS_PER_SESSION, + timeout_sec: float = OMNIPARSER_TIMEOUT_SEC, + ) -> None: + self.session_id = _validate_session_id(session_id) + self.omniparser = omniparser if omniparser is not None else _OmniParserSafeWrapper() + self.max_screens = max_screens + self.timeout_sec = timeout_sec + self._healthcheck_passed = True + self._healthcheck_reason: Optional[str] = None + + # -- healthcheck ------------------------------------------------------- + + def healthcheck(self) -> bool: + """Vérifie qu'OmniParser répond sur une image bidon (cf. specs §7). + + - Si l'adapter est ``available=False`` ⇒ healthcheck KO (mais on + continuera quand même en mode dégradé OCR-seul). + - Si l'adapter lève une exception ⇒ KO + log dédié. + """ + if not _HAS_PIL: + self._healthcheck_passed = False + self._healthcheck_reason = "PIL indisponible" + return False + if not self.omniparser.available: + self._healthcheck_passed = False + self._healthcheck_reason = ( + self.omniparser.import_error or "OmniParser indisponible" + ) + return False + try: + dummy = Image.new("RGB", (64, 64), color=(255, 255, 255)) + _ = self.omniparser.detect(dummy, timeout=self.timeout_sec) + self._healthcheck_passed = True + self._healthcheck_reason = None + return True + except Exception as exc: + _log_omniparser_error(self.session_id, -1, exc) + self._healthcheck_passed = False + self._healthcheck_reason = f"{type(exc).__name__}: {exc}" + return False + + # -- analyse écran ---------------------------------------------------- + + def analyze_screen( + self, + frame_index: int, + image: "Image.Image", + phash: str, + *, + screenshot_path: Optional[str] = None, + window_title: Optional[str] = None, + force_fallback: bool = False, + ) -> ScreenAnalysis: + """Analyse un écran représentatif. + + Stratégie : + 1. Cache disque (idempotence par session_id+frame_index). + 2. OmniParser via wrapper safe → sinon fallback OCR-seul docTR. + 3. Exception ⇒ log dédié + ``degraded=True`` + structure docTR. + """ + # 1. Cache + cached = _cache_read(self.session_id, frame_index) + if cached is not None: + struct = SemanticStructure( + tables=cached.get("structure", {}).get("tables", []), + forms=cached.get("structure", {}).get("forms", []), + buttons=cached.get("structure", {}).get("buttons", []), + text_blocks=cached.get("structure", {}).get("text_blocks", []), + ) + return ScreenAnalysis( + index=frame_index, + phash=cached.get("phash", phash), + screen_id=cached.get("screen_id", f"screen_{frame_index:03d}"), + screenshot_path=cached.get("screenshot_path", screenshot_path), + structure=struct, + degraded=bool(cached.get("degraded", False)), + degraded_reason=cached.get("degraded_reason"), + elapsed_sec=float(cached.get("elapsed_sec", 0.0)), + window_title=cached.get("window_title", window_title), + ) + + t0 = time.monotonic() + degraded = False + degraded_reason: Optional[str] = None + structure: SemanticStructure + + use_omniparser = self.omniparser.available and not force_fallback + if use_omniparser: + try: + elements = _detect_via_omniparser( + self.omniparser, image, timeout=self.timeout_sec, + ) + structure = _elements_to_structure(elements) + if not (structure.tables or structure.forms or structure.buttons or structure.text_blocks): + # OmniParser n'a rien produit : on ajoute en complément docTR text_blocks. + blocks = _detect_via_doctr(image, screenshot_path) + structure.text_blocks.extend(blocks) + except Exception as exc: + _log_omniparser_error(self.session_id, frame_index, exc) + degraded = True + degraded_reason = f"omniparser_exception: {type(exc).__name__}" + blocks = _detect_via_doctr(image, screenshot_path) + structure = SemanticStructure(text_blocks=blocks) + else: + degraded = True + degraded_reason = ( + "omniparser_unavailable: " + (self.omniparser.import_error or "n/a") + if not self.omniparser.available + else "forced_fallback" + ) + blocks = _detect_via_doctr(image, screenshot_path) + structure = SemanticStructure(text_blocks=blocks) + + elapsed = time.monotonic() - t0 + analysis = ScreenAnalysis( + index=frame_index, + phash=phash, + screen_id=f"screen_{frame_index:03d}", + screenshot_path=screenshot_path, + structure=structure, + degraded=degraded, + degraded_reason=degraded_reason, + elapsed_sec=elapsed, + window_title=window_title, + ) + + # Cache écriture (best-effort). + _cache_write(self.session_id, frame_index, analysis.to_dict()) + return analysis + + # -- pipeline complet ------------------------------------------------- + + def analyze_frames( + self, + frames: Sequence[Tuple[int, "Image.Image"]], + *, + screenshot_paths: Optional[dict[int, str]] = None, + window_titles: Optional[dict[int, str]] = None, + run_healthcheck: bool = True, + ) -> Phase25Result: + """Pipeline complet : grouping phash → analyse → cap → résultat. + + Args: + frames: liste ``(frame_index, PIL.Image)``. + screenshot_paths: mapping ``frame_index -> path`` (optionnel). + window_titles: mapping ``frame_index -> window_title`` (optionnel). + run_healthcheck: lancer le healthcheck OmniParser avant analyse. + + Returns: + ``Phase25Result`` avec ``too_complex=True`` si > max_screens. + """ + if not _HAS_PIL: + raise RuntimeError("PIL est requis pour Phase25Analyzer.analyze_frames") + + if run_healthcheck: + self.healthcheck() + if not self._healthcheck_passed: + logger.warning( + "[PHASE25] healthcheck OmniParser KO (%s) -> mode degrade docTR", + self._healthcheck_reason, + ) + + force_fallback = not self._healthcheck_passed + + # 1. Regrouper par similarité perceptuelle. + reps = identify_distinct_screens(frames) + + # 2. Cap MAX_SCREENS_PER_SESSION. + too_complex = len(reps) > self.max_screens + if too_complex: + logger.warning( + "[PHASE25] session %s : %d ecrans distincts > cap %d -> too_complex", + self.session_id, len(reps), self.max_screens, + ) + reps = reps[: self.max_screens] + + # 3. Analyser chaque représentant. + sp = screenshot_paths or {} + wt = window_titles or {} + screens: List[ScreenAnalysis] = [] + any_degraded = False + for idx, img, phash in reps: + analysis = self.analyze_screen( + idx, + img, + phash, + screenshot_path=sp.get(idx), + window_title=wt.get(idx), + force_fallback=force_fallback, + ) + screens.append(analysis) + any_degraded = any_degraded or analysis.degraded + + return Phase25Result( + session_id=self.session_id, + generated_at=datetime.now(timezone.utc).isoformat(), + omniparser_available=self.omniparser.available and self._healthcheck_passed, + degraded=any_degraded or not self._healthcheck_passed, + too_complex=too_complex, + screens=screens, + healthcheck_passed=self._healthcheck_passed, + healthcheck_reason=self._healthcheck_reason, + ) + + # -- écriture YAML ----------------------------------------------------- + + def write_semantic_yaml( + self, + result: Phase25Result, + slug: str, + *, + target_dir: Optional[Path] = None, + ) -> Path: + """Écrit le ``.semantic.yaml`` à côté du YAML compétence candidate. + + Args: + result: Résultat d'analyse Phase 2.5. + slug: slug compétence (validé contre SLUG_PATTERN). + target_dir: répertoire cible (défaut : ``data/competences/candidate/``). + + Returns: + Path absolu du fichier écrit. + + Raises: + ValueError: slug invalide. + OSError: écriture impossible. + """ + s = _validate_slug(slug) + out_dir = target_dir if target_dir is not None else SEMANTIC_DIR + out_dir = Path(out_dir) + _ensure_dir(out_dir) + + # Anti écrasement supervised/stable : on refuse explicitement. + forbidden = {"supervised", "stable"} + if out_dir.name in forbidden: + raise ValueError( + f"target_dir interdit '{out_dir.name}' (autorise : candidate uniquement)" + ) + + payload = { + "competence_id": s, + "semantic_version": 1, + "generated_at": result.generated_at, + "session_id": result.session_id, + "omniparser_available": result.omniparser_available, + "degraded": result.degraded, + "too_complex": result.too_complex, + "healthcheck_passed": result.healthcheck_passed, + "healthcheck_reason": result.healthcheck_reason, + "screens": [], + } + for sc in result.screens: + payload["screens"].append({ + "screen_id": sc.screen_id, + "phash": sc.phash, + "representative_frame_index": sc.index, + "screenshot_path": sc.screenshot_path, + "window_title": sc.window_title, + "degraded": sc.degraded, + "degraded_reason": sc.degraded_reason, + "elapsed_sec": round(sc.elapsed_sec, 3), + "structure": sc.structure.to_dict(), + "annotations": [], # placeholder — annotation humaine ultérieure + }) + + target = out_dir / f"{s}.semantic.yaml" + tmp = target.with_suffix(".yaml.tmp") + with tmp.open("w", encoding="utf-8") as fh: + yaml.safe_dump(payload, fh, allow_unicode=True, sort_keys=False) + tmp.replace(target) + logger.info( + "[PHASE25] semantic yaml ecrit : %s (screens=%d, degraded=%s)", + target, len(result.screens), result.degraded, + ) + return target + + +# ---------------------------------------------------------------------------- +# Helpers utilitaires (chargement frames) +# ---------------------------------------------------------------------------- + + +def load_frames_from_paths(paths_by_index: dict[int, str]) -> List[Tuple[int, "Image.Image"]]: + """Charge des images PIL à partir d'un mapping ``frame_index -> path``. + + Ignore silencieusement les chemins inexistants (avec log warning). + """ + if not _HAS_PIL: + raise RuntimeError("PIL est requis pour load_frames_from_paths") + frames: List[Tuple[int, Image.Image]] = [] + for idx in sorted(paths_by_index.keys()): + p = paths_by_index[idx] + try: + img = Image.open(p) + img.load() + frames.append((int(idx), img)) + except (FileNotFoundError, OSError) as exc: + logger.warning("[PHASE25] frame %d illisible (%s) : %s", idx, p, exc) + return frames + + +__all__ = [ + "Phase25Analyzer", + "Phase25Result", + "ScreenAnalysis", + "SemanticStructure", + "SEMANTIC_DIR", + "OMNIPARSER_CACHE_DIR", + "OMNIPARSER_CACHE_ROOT", + "OMNIPARSER_ERROR_LOG", + "PHASH_HAMMING_THRESHOLD", + "MAX_SCREENS_PER_SESSION", + "compute_phash", + "identify_distinct_screens", + "load_frames_from_paths", +] diff --git a/tests/integration/test_phase25_semantic_integration.py b/tests/integration/test_phase25_semantic_integration.py new file mode 100644 index 000000000..3fc2c57ab --- /dev/null +++ b/tests/integration/test_phase25_semantic_integration.py @@ -0,0 +1,476 @@ +"""Tests d'intégration Phase 2.5 sémantique. + +Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md``. + +Vérifie le flux complet : +- Charger des screenshots disque -> identifier écrans distincts -> + analyser avec OmniParser mocké -> écrire ``.semantic.yaml`` valide. +- Tester aussi le fallback OmniParser KO -> degraded YAML. +- L'endpoint FastAPI est testé via ``TestClient`` (auth Bearer désactivée + via ``RPA_AUTH_DISABLED=true`` pour ce test). +- Correctifs P1-SEMANTIQUE GO conditionnel Qwen : + * non-blocage event loop pendant analyze_frames (run_in_executor). + * timeout effectif autour de chaque appel OmniParser. +""" + +from __future__ import annotations + +import asyncio +import os +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +import yaml +from PIL import Image, ImageDraw + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.semantic import phase25_analyzer as P # noqa: E402 + + +def _make_screenshot(path: Path, seed: int) -> None: + import random + rng = random.Random(seed) + img = Image.new("RGB", (320, 240), color=(255, 255, 255)) + d = ImageDraw.Draw(img) + for _ in range(30): + x = rng.randint(0, 300) + y = rng.randint(0, 220) + w = rng.randint(10, 30) + h = rng.randint(10, 30) + col = (rng.randint(0, 255), rng.randint(0, 255), rng.randint(0, 255)) + d.rectangle([x, y, x + w, y + h], fill=col) + path.parent.mkdir(parents=True, exist_ok=True) + img.save(path, format="PNG") + + +def _build_mock_omniparser(elements): + w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + w._adapter = MagicMock() + w._available = True + w._import_error = None + w._adapter.detect.return_value = elements + return w + + +# --------------------------------------------------------------------------- +# Integration : flux complet load -> analyze -> write YAML +# --------------------------------------------------------------------------- + + +class TestFullFlow: + def test_load_analyze_write_yaml(self, tmp_path, monkeypatch): + # Préparer 4 screenshots : 2 visuels identiques + 1 différent + 1 répété. + shots_dir = tmp_path / "shots" + _make_screenshot(shots_dir / "0.png", seed=1) + _make_screenshot(shots_dir / "1.png", seed=1) # même seed = même image + _make_screenshot(shots_dir / "2.png", seed=2) + _make_screenshot(shots_dir / "3.png", seed=2) + paths = {i: str(shots_dir / f"{i}.png") for i in range(4)} + + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + + mock_op = _build_mock_omniparser([ + {"label": "Valider", "bbox": [10, 10, 80, 40], "confidence": 0.9, "element_type": "button"}, + {"label": "Champ texte", "bbox": [100, 10, 300, 40], "confidence": 0.85, "element_type": "input"}, + ]) + analyzer = P.Phase25Analyzer(session_id="integ_sess", omniparser=mock_op) + frames = P.load_frames_from_paths(paths) + assert len(frames) == 4 + result = analyzer.analyze_frames( + frames=frames, + screenshot_paths=paths, + window_titles={0: "Easily Login", 2: "Easily Patient"}, + ) + # Doit avoir regroupé en 2 écrans distincts. + assert len(result.screens) == 2 + assert result.too_complex is False + assert result.degraded is False + # Le window_title du représentant doit être propagé. + rep_indexes = {s.index for s in result.screens} + assert 0 in rep_indexes or 1 in rep_indexes + assert 2 in rep_indexes or 3 in rep_indexes + + # Écrire le YAML. + target = analyzer.write_semantic_yaml( + result, slug="competence_demo", target_dir=tmp_path / "out", + ) + assert target.exists() + data = yaml.safe_load(target.read_text(encoding="utf-8")) + assert data["competence_id"] == "competence_demo" + assert len(data["screens"]) == 2 + # Au moins 1 button + 1 field doivent apparaître dans chaque structure. + for sc in data["screens"]: + assert len(sc["structure"]["buttons"]) >= 1 + assert len(sc["structure"]["forms"]) >= 1 + + def test_omniparser_ko_fallback_yaml_degraded(self, tmp_path, monkeypatch): + shots_dir = tmp_path / "shots" + _make_screenshot(shots_dir / "0.png", seed=10) + paths = {0: str(shots_dir / "0.png")} + + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") + monkeypatch.setattr( + P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log" + ) + # docTR stub léger. + monkeypatch.setattr( + P, "_detect_via_doctr", + lambda image, screenshot_path: [ + {"label": "FAKE_OCR", "text": "FAKE_OCR", "bbox": [0, 0, 50, 20], "confidence": 0.6}, + ], + ) + + # OmniParser disponible mais qui lève. + op = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + op._adapter = MagicMock() + op._available = True + op._import_error = None + op._adapter.detect.side_effect = RuntimeError("OmniParser HS") + + analyzer = P.Phase25Analyzer(session_id="integ_fb", omniparser=op) + frames = P.load_frames_from_paths(paths) + result = analyzer.analyze_frames(frames=frames, screenshot_paths=paths) + assert result.degraded is True + assert len(result.screens) == 1 + assert result.screens[0].degraded is True + + # Le YAML doit être marqué dégradé mais valide. + target = analyzer.write_semantic_yaml( + result, slug="comp_fallback", target_dir=tmp_path / "out", + ) + data = yaml.safe_load(target.read_text(encoding="utf-8")) + assert data["degraded"] is True + assert data["screens"][0]["degraded"] is True + + +# --------------------------------------------------------------------------- +# Integration : endpoint FastAPI +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def app_client(tmp_path_factory): + """TestClient FastAPI avec auth Bearer désactivée.""" + # IMPORTANT : désactiver l'auth AVANT l'import du module api_stream. + os.environ["RPA_AUTH_DISABLED"] = "true" + os.environ.setdefault("RPA_API_TOKEN", "test-token-phase25") + + # Cache root pointe vers un tmp partagé. + cache_root = tmp_path_factory.mktemp("op_cache") + + from fastapi.testclient import TestClient + from agent_v0.server_v1 import api_stream as api # noqa: E402 + + # Redirige le cache et logs Phase 2.5 vers tmp. + from core.semantic import phase25_analyzer as PA + PA.OMNIPARSER_CACHE_ROOT = cache_root + PA.LOGS_DIR = cache_root / "logs" + PA.OMNIPARSER_ERROR_LOG = cache_root / "logs" / "omniparser_errors.log" + + client = TestClient(api.app) + yield client, cache_root + + +class TestEndpoint: + def test_endpoint_returns_empty_result_when_no_paths(self, app_client): + client, _cache = app_client + # Aucun frame réel n'existe -> on attend 200 avec screens=[]. + resp = client.post( + "/api/v1/lea/screen/analyze", + json={ + "session_id": "non_existent_session_xyz", + "screenshot_indexes": [0, 1, 2], + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["session_id"] == "non_existent_session_xyz" + assert data["screens"] == [] + assert data["degraded"] is True + + def test_endpoint_invalid_session_id(self, app_client): + client, _cache = app_client + resp = client.post( + "/api/v1/lea/screen/analyze", + json={ + "session_id": "../etc/passwd", + "screenshot_indexes": [], + }, + ) + assert resp.status_code == 400 + assert resp.json()["detail"]["error"] == "invalid_session_id" + + def test_endpoint_full_flow_with_explicit_paths(self, app_client, tmp_path): + client, _cache = app_client + # Préparer 3 screenshots et passer les chemins explicitement. + shots = tmp_path / "endpoint_shots" + _make_screenshot(shots / "10.png", seed=5) + _make_screenshot(shots / "11.png", seed=5) + _make_screenshot(shots / "12.png", seed=99) + paths = { + "10": str(shots / "10.png"), + "11": str(shots / "11.png"), + "12": str(shots / "12.png"), + } + + # Patcher OmniParserSafeWrapper pour qu'il soit toujours indisponible + # (fallback OCR-seul docTR) -> on stubbe docTR aussi. + from core.semantic import phase25_analyzer as PA + + def _stub_doctr(image, screenshot_path): + return [ + {"label": "TXT", "text": "TXT", "bbox": [0, 0, 30, 10], "confidence": 0.6}, + ] + + original_doctr = PA._detect_via_doctr + PA._detect_via_doctr = _stub_doctr + + # Force fallback : on patche le wrapper pour qu'il soit unavailable. + original_wrapper_init = PA._OmniParserSafeWrapper._try_import + + def _no_op_import(self): + self._adapter = None + self._available = False + self._import_error = "stubbed_unavailable" + + PA._OmniParserSafeWrapper._try_import = _no_op_import + + try: + resp = client.post( + "/api/v1/lea/screen/analyze", + json={ + "session_id": "endpoint_test", + "screenshot_indexes": [10, 11, 12], + "screenshot_paths": paths, + "window_titles": {"10": "Win A", "12": "Win B"}, + "write_yaml": False, + }, + ) + finally: + PA._detect_via_doctr = original_doctr + PA._OmniParserSafeWrapper._try_import = original_wrapper_init + + assert resp.status_code == 200, resp.text + data = resp.json() + assert data["session_id"] == "endpoint_test" + # 2 écrans distincts attendus (idx 10/11 groupés + idx 12). + assert 1 <= len(data["screens"]) <= 3 + # Mode dégradé -> text_blocks docTR. + assert data["degraded"] is True + # Contrat snapshot : présence de "elements" aplatis. + for sc in data["screens"]: + assert "elements" in sc + assert "structure" in sc + assert "hash" in sc + + +# --------------------------------------------------------------------------- +# CORRECTIFS P1-SEMANTIQUE GO conditionnel Qwen : +# 1. Non-blocage event loop pendant analyze_frames (run_in_executor). +# 2. Timeout effectif autour de chaque appel OmniParser + log dédié. +# --------------------------------------------------------------------------- + + +class TestEventLoopNotBlocked: + """L'endpoint POST /api/v1/lea/screen/analyze ne doit pas bloquer + l'event loop pendant l'analyse synchrone (analyze_frames). + + Méthode : monkeypatch ``analyze_frames`` pour qu'il dorme 2s, lancer + 2 appels en parallèle via httpx.AsyncClient + ASGITransport, vérifier + que la durée totale est sensiblement < 4s (donc < ~3s) car les deux + analyses doivent progresser en concurrent dans le ThreadPoolExecutor. + """ + + @pytest.mark.asyncio + async def test_endpoint_does_not_block_event_loop(self, tmp_path, monkeypatch): + import httpx + from httpx import ASGITransport + + os.environ["RPA_AUTH_DISABLED"] = "true" + os.environ.setdefault("RPA_API_TOKEN", "test-token-phase25-loop") + + from agent_v0.server_v1 import api_stream as api + from core.semantic import phase25_analyzer as PA + + # Préparer 1 screenshot pour passer la validation paths. + shots = tmp_path / "shots_loop" + _make_screenshot(shots / "0.png", seed=7) + + # Monkeypatch analyze_frames pour dormir 2s (simule analyse longue + # CPU-bound côté analyzer). Si l'endpoint bloque l'event loop, + # 2 appels concurrents prendront ~4s ; sinon ~2s (executor en //). + sleep_sec = 2.0 + + class _DummyResult: + def __init__(self, sid): + self.session_id = sid + self.generated_at = "2026-06-01T00:00:00Z" + self.omniparser_available = False + self.degraded = False + self.too_complex = False + self.screens = [] + + def to_dict(self): + return { + "session_id": self.session_id, + "generated_at": self.generated_at, + "omniparser_available": self.omniparser_available, + "degraded": self.degraded, + "too_complex": self.too_complex, + "healthcheck_passed": True, + "healthcheck_reason": None, + "screens": [], + } + + def _slow_analyze(self, *args, **kwargs): + time.sleep(sleep_sec) + return _DummyResult(self.session_id) + + monkeypatch.setattr(PA.Phase25Analyzer, "analyze_frames", _slow_analyze) + + transport = ASGITransport(app=api.app) + async with httpx.AsyncClient( + transport=transport, base_url="http://test" + ) as client: + paths = {"0": str(shots / "0.png")} + payload = { + "session_id": "loop_sess_a", + "screenshot_indexes": [0], + "screenshot_paths": paths, + } + payload2 = {**payload, "session_id": "loop_sess_b"} + + t0 = time.monotonic() + r1, r2 = await asyncio.gather( + client.post("/api/v1/lea/screen/analyze", json=payload), + client.post("/api/v1/lea/screen/analyze", json=payload2), + ) + elapsed = time.monotonic() - t0 + + assert r1.status_code == 200, r1.text + assert r2.status_code == 200, r2.text + # Marge confortable : si event loop bloqué -> ~4s, sinon ~2s. + # On accepte jusqu'à 3.5s pour absorber overhead CI / GC. + assert elapsed < 3.5, ( + f"event loop semble bloque : 2 appels paralleles ont pris {elapsed:.2f}s " + f"(attendu < 3.5s avec sleep={sleep_sec}s)" + ) + + +class TestOmniParserTimeout: + """Le ``OMNIPARSER_TIMEOUT_SEC`` doit être appliqué comme timeout dur + autour de chaque appel ``_adapter.detect`` via ThreadPoolExecutor + + ``future.result(timeout=...)``. + + Si OmniParser hang -> ``concurrent.futures.TimeoutError`` -> log dédié + + ``degraded=True`` + fallback docTR. Pas de 500 vers le caller. + """ + + def test_omniparser_timeout_triggers_fallback(self, tmp_path, monkeypatch): + # Forcer un timeout court (0.5s) pour ne pas bloquer la suite de tests. + monkeypatch.setattr(P, "OMNIPARSER_TIMEOUT_SEC", 0.5) + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") + monkeypatch.setattr( + P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log" + ) + + # Stub docTR : retourne 1 text_block pour confirmer le fallback. + monkeypatch.setattr( + P, "_detect_via_doctr", + lambda image, screenshot_path: [ + {"label": "FALLBACK_OCR", "text": "FALLBACK_OCR", + "bbox": [0, 0, 30, 10], "confidence": 0.6}, + ], + ) + + # OmniParser disponible mais qui hang (dort > timeout). + op = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + op._adapter = MagicMock() + op._available = True + op._import_error = None + + def _hang(image): + time.sleep(5.0) # >> 0.5s timeout + return [] + + op._adapter.detect.side_effect = _hang + + analyzer = P.Phase25Analyzer( + session_id="timeout_sess", + omniparser=op, + timeout_sec=0.5, + ) + img = Image.new("RGB", (64, 64), color=(255, 255, 255)) + + t0 = time.monotonic() + # On contourne le healthcheck (qui consommerait aussi un timeout). + result = analyzer.analyze_screen( + frame_index=0, image=img, phash="ab", force_fallback=False, + ) + elapsed = time.monotonic() - t0 + + # Timeout effectif : doit terminer en ~0.5s (large marge à 3s + # pour CPU lent). + assert elapsed < 3.0, ( + f"timeout pas applique : analyze_screen a pris {elapsed:.2f}s " + f"(attendu ~0.5s, hang stub = 5s)" + ) + # Fallback déclenché. + assert result.degraded is True + assert result.degraded_reason and "omniparser_exception" in result.degraded_reason + assert "TimeoutError" in result.degraded_reason + # docTR a pris la main. + assert len(result.structure.text_blocks) == 1 + assert result.structure.text_blocks[0]["label"] == "FALLBACK_OCR" + + def test_omniparser_timeout_logged(self, tmp_path, monkeypatch): + """Vérifie qu'une ligne JSON est ajoutée dans + ``logs/omniparser_errors.log`` avec motif timeout. + """ + monkeypatch.setattr(P, "OMNIPARSER_TIMEOUT_SEC", 0.3) + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") + log_path = tmp_path / "logs" / "omniparser_errors.log" + monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", log_path) + monkeypatch.setattr( + P, "_detect_via_doctr", + lambda image, screenshot_path: [], + ) + + op = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + op._adapter = MagicMock() + op._available = True + op._import_error = None + + def _hang(image): + time.sleep(3.0) + return [] + + op._adapter.detect.side_effect = _hang + + analyzer = P.Phase25Analyzer( + session_id="timeout_log_sess", + omniparser=op, + timeout_sec=0.3, + ) + img = Image.new("RGB", (64, 64), color=(255, 255, 255)) + result = analyzer.analyze_screen( + frame_index=42, image=img, phash="cd", force_fallback=False, + ) + + assert result.degraded is True + assert log_path.exists(), "le log d'erreur omniparser doit etre cree" + log_content = log_path.read_text(encoding="utf-8") + # Le log est append-only JSON-lines. On vérifie qu'au moins une + # ligne contient TimeoutError + session_id. + assert "TimeoutError" in log_content + assert "timeout_log_sess" in log_content + assert '"frame_index": 42' in log_content diff --git a/tests/integration/test_shadow_full_cycle.py b/tests/integration/test_shadow_full_cycle.py new file mode 100644 index 000000000..46565c097 --- /dev/null +++ b/tests/integration/test_shadow_full_cycle.py @@ -0,0 +1,245 @@ +"""Tests integration HTTP — cycle complet Lea-first jusqu'au /persist. + +Specs : docs/POC/SPECS_ENDPOINT_PERSIST_2026-06-01.md + +Le focus est sur l'endpoint /persist : on mocke les phases amont (Shadow +build) en construisant directement le workflow_ir. Pas de lancement reel +de session Shadow ici. +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +pytestmark = pytest.mark.integration + + +_TEST_API_TOKEN = "test_persist_endpoint_token_xyz" + + +@pytest.fixture +def persist_client(monkeypatch, tmp_path): + """TestClient FastAPI + redirection des chemins persist vers tmp_path.""" + monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN) + monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False) + + # DB agents isolee pour eviter le guard fleet + monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "agents.db")) + + from fastapi.testclient import TestClient + from agent_v0.server_v1 import api_stream + from agent_v0.server_v1.agent_registry import AgentRegistry + from core.competences import persist as P + + monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN) + + # api_stream peut deja etre importe par un autre test avant que + # RPA_AGENTS_DB_PATH soit pose. Isoler explicitement le registre global. + test_registry = AgentRegistry(db_path=str(tmp_path / "agents.db")) + monkeypatch.setattr(api_stream, "agent_registry", test_registry) + + # Rediriger toutes les ecritures vers tmp_path + candidate_dir = tmp_path / "competences" / "candidate" + candidate_dir.mkdir(parents=True, exist_ok=True) + audit_path = tmp_path / "competences" / "persist_audit.jsonl" + incomplete_path = tmp_path / "competences" / "incomplete_learnings.jsonl" + + monkeypatch.setattr(P, "COMPETENCES_ROOT", tmp_path / "competences") + monkeypatch.setattr(P, "CANDIDATE_DIR", candidate_dir) + monkeypatch.setattr(P, "AUDIT_PATH", audit_path) + monkeypatch.setattr(P, "INCOMPLETE_PATH", incomplete_path) + + # Reset du rate limiter pour eviter les fuites entre tests + P.persist_rate_limiter.reset() + + client = TestClient(api_stream.app, raise_server_exceptions=False) + return client, tmp_path + + +def _auth_headers(): + return {"Authorization": f"Bearer {_TEST_API_TOKEN}"} + + +def _minimal_workflow_ir(): + return { + "steps": [ + { + "kind": "click", + "primitive_ref": "click", + "parameters": {"target": "Bouton OK"}, + "description": "Clic sur Bouton OK", + } + ], + "preconditions": [], + "success_marker": { + "mode": "all_of", + "timeout_ms": 5000, + "markers": [], + }, + } + + +class TestPersistEndpointSuccess: + """Cas nominal : payload valide -> 201 + YAML cree + audit ecrit.""" + + def test_persist_returns_201_and_writes_yaml(self, persist_client): + client, tmp_path = persist_client + resp = client.post( + "/api/v1/lea/competences/candidate/persist", + json={ + "name": "Test Cycle Complet", + "machine_id": "machine_test_x", + "session_id": "sess_xyz", + "workflow_ir": _minimal_workflow_ir(), + "parameters": [], + "annotations_semantiques": {"intent_fr": "tester cycle"}, + "learning_metadata": {"persist_id": "uuid-cycle-1"}, + }, + headers=_auth_headers(), + ) + assert resp.status_code == 201, resp.text + body = resp.json() + assert body["competence_id"] == "test_cycle_complet" + assert body["learning_state"] == "candidate" + assert body["persist_id"] == "uuid-cycle-1" + assert body["audit_entry_id"] >= 1 + + # Le YAML doit etre present sur disque (chemin redirige par fixture) + yaml_file = tmp_path / "competences" / "candidate" / "test_cycle_complet.yaml" + assert yaml_file.exists() + # Audit enrichi + audit = tmp_path / "competences" / "persist_audit.jsonl" + assert audit.exists() + lines = audit.read_text(encoding="utf-8").strip().splitlines() + assert any(json.loads(li).get("persist_id") == "uuid-cycle-1" for li in lines) + + def test_persist_idempotence_returns_previous(self, persist_client): + client, _ = persist_client + payload = { + "name": "Idempotent Test", + "machine_id": "machine_test_x", + "workflow_ir": _minimal_workflow_ir(), + "learning_metadata": {"persist_id": "uuid-idem-1"}, + } + r1 = client.post( + "/api/v1/lea/competences/candidate/persist", + json=payload, + headers=_auth_headers(), + ) + assert r1.status_code == 201 + r2 = client.post( + "/api/v1/lea/competences/candidate/persist", + json=payload, + headers=_auth_headers(), + ) + # Idempotent : 200 ou 201 avec idempotent_replay=True + assert r2.status_code in (200, 201) + body2 = r2.json() + assert body2.get("idempotent_replay") is True + assert body2["competence_id"] == r1.json()["competence_id"] + + +class TestPersistEndpointEdgeCases: + """Cas particuliers documentes dans specs §5.""" + + def test_partial_true_force_incomplete_state(self, persist_client): + client, tmp_path = persist_client + resp = client.post( + "/api/v1/lea/competences/candidate/persist", + json={ + "name": "Partial Demo", + "machine_id": "machine_test_x", + "workflow_ir": {"steps": []}, # vide accepte si partial + "learning_metadata": { + "persist_id": "uuid-partial-1", + "partial": True, + "dropout_reason": "utilisateur a abandonne", + }, + }, + headers=_auth_headers(), + ) + assert resp.status_code == 201, resp.text + assert resp.json()["learning_state"] == "incomplete" + # Double entree : audit principal + incomplete + incomplete = tmp_path / "competences" / "incomplete_learnings.jsonl" + assert incomplete.exists() + + def test_empty_workflow_without_partial_returns_400(self, persist_client): + client, _ = persist_client + resp = client.post( + "/api/v1/lea/competences/candidate/persist", + json={ + "name": "Empty Demo", + "machine_id": "machine_test_x", + "workflow_ir": {"steps": []}, + "learning_metadata": {"persist_id": "uuid-empty-1"}, + }, + headers=_auth_headers(), + ) + assert resp.status_code == 400 + assert resp.json()["detail"]["error"] == "empty_workflow_ir" + + def test_partial_without_dropout_reason_returns_400(self, persist_client): + client, _ = persist_client + resp = client.post( + "/api/v1/lea/competences/candidate/persist", + json={ + "name": "Partial Sans Raison", + "machine_id": "machine_test_x", + "workflow_ir": {"steps": []}, + "learning_metadata": { + "persist_id": "uuid-partial-noreason", + "partial": True, + }, + }, + headers=_auth_headers(), + ) + assert resp.status_code == 400 + assert resp.json()["detail"]["error"] == "dropout_reason_required" + + def test_stable_request_forced_to_candidate(self, persist_client): + client, _ = persist_client + resp = client.post( + "/api/v1/lea/competences/candidate/persist", + json={ + "name": "Force Candidate Demo", + "machine_id": "machine_test_x", + "workflow_ir": _minimal_workflow_ir(), + "learning_metadata": { + "persist_id": "uuid-stable-attempt", + "learning_state": "stable", + }, + }, + headers=_auth_headers(), + ) + assert resp.status_code == 201 + # Regle d'or HDS : jamais stable par persist direct + assert resp.json()["learning_state"] == "candidate" + + def test_slug_collision_returns_409(self, persist_client): + client, tmp_path = persist_client + # Pre-creer un YAML candidate pour declencher la collision + candidate = tmp_path / "competences" / "candidate" / "collision_demo.yaml" + candidate.write_text("id: collision_demo\nname: x\n", encoding="utf-8") + + resp = client.post( + "/api/v1/lea/competences/candidate/persist", + json={ + "name": "Collision Demo", + "machine_id": "machine_test_x", + "workflow_ir": _minimal_workflow_ir(), + "learning_metadata": {"persist_id": "uuid-coll-1"}, + }, + headers=_auth_headers(), + ) + assert resp.status_code == 409 + assert resp.json()["detail"]["error"] == "slug_collision" diff --git a/tests/unit/test_clip_embedder_device_fix.py b/tests/unit/test_clip_embedder_device_fix.py new file mode 100644 index 000000000..43d001571 --- /dev/null +++ b/tests/unit/test_clip_embedder_device_fix.py @@ -0,0 +1,108 @@ +"""Tests de non-régression pour le fix UnboundLocalError sur 'torch'. + +Cas couvert : appel `CLIPEmbedder(device="cpu")` explicite — le if `device is +None` n'était pas pris, donc l'import local `torch` n'était pas exécuté, mais +Python avait quand même noté `torch` comme local au scope `__init__`, faisant +planter `with torch.no_grad():` plus bas en UnboundLocalError. + +Référence : inbox_codex/2026-05-25_1235_..._enquete-feedbackbus-5004.md +Fix : core/embedding/clip_embedder.py l. 60-77 (import local supprimé). +""" +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + + +@pytest.mark.unit +def test_clip_embedder_init_no_local_torch_shadow(): + """Le source de CLIPEmbedder.__init__ ne contient plus 'import torch' à + l'intérieur du `if device is None:` (qui shadowait le torch module-level).""" + import inspect + from core.embedding import clip_embedder + + src = inspect.getsource(clip_embedder.CLIPEmbedder.__init__) + # Tolérance : on accepte qu'un commentaire mentionne `import torch`, + # mais pas une vraie ligne d'instruction. + code_lines = [ + line for line in src.splitlines() + if line.strip() and not line.strip().startswith("#") + ] + code_only = "\n".join(code_lines) + # On ne doit plus avoir un import torch indenté au-delà du module-level. + # (l'import existe au top du fichier l. 8, pas dans __init__). + assert " import torch" not in code_only, ( + "import torch local trouvé dans __init__ — il faut utiliser le torch " + "du scope module (l. 8 du fichier) pour éviter UnboundLocalError " + "quand l'appelant passe device='cpu'." + ) + + +@pytest.mark.unit +def test_clip_embedder_module_imports_torch(): + """Le module clip_embedder doit avoir `import torch` au scope module + pour que les autres méthodes (embed_image, embed_text) puissent l'utiliser.""" + import core.embedding.clip_embedder as ce + assert hasattr(ce, "torch"), ( + "Le module clip_embedder doit exposer `torch` au scope module." + ) + + +@pytest.mark.unit +def test_clip_embedder_handles_device_cpu_without_unbound_local(monkeypatch): + """Reproduit le cas qui plantait : on appelle l'init avec device='cpu'. + + Avant fix : UnboundLocalError sur `torch` au moment de `torch.no_grad()`. + Après fix : l'init doit échouer proprement sur l'absence éventuelle de + open_clip ou de poids, mais PAS sur UnboundLocalError. + + On mocke open_clip et torch.no_grad pour ne pas charger un vrai modèle. + """ + import types + from core.embedding import clip_embedder + + # Mock open_clip pour éviter le download + fake_open_clip = types.SimpleNamespace( + create_model_and_transforms=lambda *a, **kw: ( + types.SimpleNamespace( + eval=lambda: None, + encode_image=lambda x: type("T", (), {"shape": (1, 512)})(), + ), + None, + lambda img: img, + ), + get_tokenizer=lambda name: lambda t: None, + ) + monkeypatch.setattr(clip_embedder, "open_clip", fake_open_clip) + + # Mock torch.no_grad et torch.zeros pour court-circuiter le dummy embed + class _FakeCtx: + def __enter__(self): return self + def __exit__(self, *a): return False + + fake_zeros = lambda *args, **kwargs: type("Z", (), {"to": lambda self, d: self})() + monkeypatch.setattr(clip_embedder.torch, "no_grad", lambda: _FakeCtx()) + monkeypatch.setattr(clip_embedder.torch, "zeros", fake_zeros) + + # Appel direct avec device="cpu" — ne doit PAS lever UnboundLocalError. + # Peut échouer pour autre raison (ex. encode_image), on isole uniquement + # le bug torch unbound. + try: + embedder = clip_embedder.CLIPEmbedder(device="cpu") + except RuntimeError as e: + msg = str(e) + assert "cannot access local variable 'torch'" not in msg, ( + f"UnboundLocalError torch toujours présent : {msg}" + ) + # Autre erreur acceptée (mock incomplet) + pytest.skip(f"Mock incomplet, mais bug torch absent : {msg}") + except UnboundLocalError as e: + pytest.fail(f"Bug torch toujours présent : {e}") + # Si on arrive ici, init a réussi sans bug torch diff --git a/tests/unit/test_competence_persist.py b/tests/unit/test_competence_persist.py new file mode 100644 index 000000000..cc7aab261 --- /dev/null +++ b/tests/unit/test_competence_persist.py @@ -0,0 +1,234 @@ +"""Tests unit pour core.competences.persist (helpers /persist endpoint). + +Specs : docs/POC/SPECS_ENDPOINT_PERSIST_2026-06-01.md +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest +import yaml + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.competences import persist as P # noqa: E402 + + +# --------------------------------------------------------------------------- +# slugify +# --------------------------------------------------------------------------- + + +class TestSlugify: + def test_slug_generation_normal(self): + assert P.slugify("Saisir Texte Word") == "saisir_texte_word" + + def test_slug_generation_with_accents(self): + assert P.slugify("Créer Compte Patient") == "creer_compte_patient" + + def test_slug_generation_too_short(self): + with pytest.raises(ValueError): + P.slugify("ab") + + def test_slug_generation_empty(self): + with pytest.raises(ValueError): + P.slugify("") + + def test_slug_max_80_chars(self): + long_name = "a" * 200 + slug = P.slugify(long_name) + assert len(slug) <= 80 + + def test_slug_strips_special_chars(self): + # Cas tordu : "tab" est interdit ('\t'), donc on injecte du bruit + assert P.slugify("hello!! world??") == "hello_world" + + def test_slug_starts_with_letter(self): + slug = P.slugify("123 abc def") + assert slug.startswith("c_") # prefix auto pour commencer par lettre + + +# --------------------------------------------------------------------------- +# PII detection +# --------------------------------------------------------------------------- + + +class TestPiiDetection: + def test_pii_email_detected(self): + matches = P.detect_pii({"intent": "envoyer mail a john.doe@example.com"}) + assert matches # au moins un pattern + + def test_pii_phone_detected(self): + matches = P.detect_pii({"steps": [{"value": "tel 01 23 45 67 89"}]}) + assert matches + + def test_no_pii_clean_payload(self): + clean = {"steps": [{"kind": "click", "target": "Bouton Valider"}]} + assert P.detect_pii(clean) == [] + + def test_pii_recursive_in_nested_list(self): + nested = {"a": {"b": [{"c": "email: x@y.fr"}]}} + assert P.detect_pii(nested) + + +# --------------------------------------------------------------------------- +# Atomic write +# --------------------------------------------------------------------------- + + +class TestAtomicWrite: + def test_atomic_write_then_rename(self, tmp_path): + target = tmp_path / "demo.yaml" + data = {"id": "demo", "name": "Demo"} + result = P.atomic_write_yaml(target, data, persist_id="pid-1") + assert result == target + assert target.exists() + # Pas de .tmp residuel + leftovers = list(tmp_path.glob(".*.tmp.*")) + assert leftovers == [] + loaded = yaml.safe_load(target.read_text(encoding="utf-8")) + assert loaded["id"] == "demo" + + def test_atomic_write_cleans_tmp_on_failure(self, tmp_path, monkeypatch): + target = tmp_path / "demo.yaml" + + # Forcer un echec sur os.rename + import os as _os + original_rename = _os.rename + + def boom(*a, **k): + raise OSError("disk full simulated") + + monkeypatch.setattr(_os, "rename", boom) + with pytest.raises(OSError): + P.atomic_write_yaml(target, {"id": "demo"}, persist_id="pid-2") + monkeypatch.setattr(_os, "rename", original_rename) + + # Le .tmp doit avoir ete nettoye + leftovers = list(tmp_path.glob(".*.tmp.*")) + assert leftovers == [] + + +# --------------------------------------------------------------------------- +# Audit append +# --------------------------------------------------------------------------- + + +class TestAuditAppend: + def test_audit_append_monotonic_ids(self, tmp_path): + audit = tmp_path / "persist_audit.jsonl" + id1 = P.audit_append({"persist_id": "p1", "competence_id": "c1"}, audit_path=audit) + id2 = P.audit_append({"persist_id": "p2", "competence_id": "c2"}, audit_path=audit) + assert id1 == 1 + assert id2 == 2 + + def test_audit_append_includes_timestamp(self, tmp_path): + audit = tmp_path / "audit.jsonl" + P.audit_append({"persist_id": "p1", "competence_id": "c1"}, audit_path=audit) + lines = audit.read_text(encoding="utf-8").strip().splitlines() + record = json.loads(lines[0]) + assert "timestamp" in record + assert record["audit_entry_id"] == 1 + + def test_find_existing_audit_entry(self, tmp_path): + audit = tmp_path / "audit.jsonl" + P.audit_append( + {"persist_id": "p-uniq", "competence_id": "c1"}, + audit_path=audit, + ) + found = P.find_existing_audit_entry("p-uniq", audit_path=audit) + assert found is not None + assert found["competence_id"] == "c1" + assert P.find_existing_audit_entry("p-not-here", audit_path=audit) is None + + +# --------------------------------------------------------------------------- +# YAML schema build + validate +# --------------------------------------------------------------------------- + + +class TestBuildYaml: + def test_yaml_schema_required_fields_present(self): + body = P.build_competence_yaml( + slug="demo_test", + name="Demo Test", + workflow_ir={"steps": [{"kind": "click"}], "preconditions": []}, + parameters=[{"name": "x", "type": "string", "required": True}], + intent_fr="faire demo", + learning_state="candidate", + session_id="sess1", + machine_id="machine1", + ) + missing = P.validate_yaml_schema(body) + assert missing == [], f"champs manquants : {missing}" + + def test_payload_stable_forced_to_candidate_via_helper(self): + # Le forcage stable -> candidate est fait dans le handler, mais on + # peut au moins verifier que build accepte le learning_state passe. + body = P.build_competence_yaml( + slug="demo_test_2", + name="Demo 2", + workflow_ir={"steps": [{"kind": "click"}]}, + parameters=None, + intent_fr="demo", + learning_state="candidate", + session_id=None, + machine_id=None, + ) + assert body["learning_state"] == "candidate" + + +# --------------------------------------------------------------------------- +# Cross-state collision +# --------------------------------------------------------------------------- + + +class TestCrossStateCollision: + def test_no_collision_returns_none(self, tmp_path): + root = tmp_path / "competences" + (root / "candidate").mkdir(parents=True) + assert P.detect_cross_state_collision("xyz", competences_root=root) is None + + def test_collision_in_candidate_returns_dirname(self, tmp_path): + root = tmp_path / "competences" + (root / "candidate").mkdir(parents=True) + (root / "candidate" / "xyz.yaml").write_text("id: xyz\n", encoding="utf-8") + assert P.detect_cross_state_collision("xyz", competences_root=root) == "candidate" + + def test_collision_in_stable_returns_dirname(self, tmp_path): + root = tmp_path / "competences" + (root / "stable").mkdir(parents=True) + (root / "stable" / "abc.yaml").write_text("id: abc\n", encoding="utf-8") + assert P.detect_cross_state_collision("abc", competences_root=root) == "stable" + + +# --------------------------------------------------------------------------- +# Rate limiter +# --------------------------------------------------------------------------- + + +class TestRateLimiter: + def test_below_limit_allowed(self): + lim = P.PersistRateLimiter(max_per_minute=3) + for _ in range(3): + allowed, _ = lim.allow("m1") + assert allowed + + def test_above_limit_blocked(self): + lim = P.PersistRateLimiter(max_per_minute=2) + lim.allow("m1") + lim.allow("m1") + allowed, retry = lim.allow("m1") + assert not allowed + assert retry >= 1 + + def test_per_machine_isolation(self): + lim = P.PersistRateLimiter(max_per_minute=1) + a1, _ = lim.allow("m1") + a2, _ = lim.allow("m2") + assert a1 and a2 diff --git a/tests/unit/test_phase25_semantic.py b/tests/unit/test_phase25_semantic.py new file mode 100644 index 000000000..d2df6c48e --- /dev/null +++ b/tests/unit/test_phase25_semantic.py @@ -0,0 +1,450 @@ +"""Tests unitaires pour ``core.semantic.phase25_analyzer``. + +Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md``. + +Couverture obligatoire : +- Hash perceptuel + grouping (Hamming threshold). +- Cap 10 écrans -> too_complex. +- Fallback OCR-seul si OmniParser KO (mock exception). +- Génération .semantic.yaml valide avec ``degraded`` correctement positionné. +- Validation session_id / slug (anti path-traversal). +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +import yaml +from PIL import Image, ImageDraw + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.semantic import phase25_analyzer as P # noqa: E402 + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_image(size=(256, 256), color=(255, 255, 255), text=None): + img = Image.new("RGB", size, color=color) + if text: + draw = ImageDraw.Draw(img) + draw.text((10, 10), text, fill=(0, 0, 0)) + return img + + +@pytest.fixture +def fake_omniparser_ok(): + """Wrapper OmniParser qui retourne des éléments factices sans erreur.""" + w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + w._adapter = MagicMock() + w._available = True + w._import_error = None + + def _fake_detect(image): + return [ + {"label": "Valider", "bbox": [10, 20, 100, 50], "confidence": 0.9, "element_type": "button"}, + {"label": "Nom patient", "bbox": [120, 20, 300, 60], "confidence": 0.85, "element_type": "input"}, + {"label": "MOREL Catherine", "bbox": [120, 80, 300, 100], "confidence": 0.7, "element_type": "text"}, + ] + + w._adapter.detect.side_effect = _fake_detect + return w + + +@pytest.fixture +def fake_omniparser_raising(): + """Wrapper OmniParser disponible qui lève une exception à chaque detect.""" + w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + w._adapter = MagicMock() + w._available = True + w._import_error = None + w._adapter.detect.side_effect = RuntimeError("OmniParser corrupted weights") + return w + + +@pytest.fixture +def fake_omniparser_unavailable(): + """Wrapper OmniParser indisponible (adapter absent).""" + w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) + w._adapter = None + w._available = False + w._import_error = "ImportError: No module named 'OmniParser'" + return w + + +# --------------------------------------------------------------------------- +# Tests : validation session_id / slug +# --------------------------------------------------------------------------- + + +class TestValidation: + def test_session_id_valid(self): + assert P._validate_session_id("abc-123_XYZ") == "abc-123_XYZ" + + def test_session_id_empty_raises(self): + with pytest.raises(ValueError): + P._validate_session_id("") + + def test_session_id_path_traversal_raises(self): + with pytest.raises(ValueError): + P._validate_session_id("../etc/passwd") + + def test_session_id_with_slash_raises(self): + with pytest.raises(ValueError): + P._validate_session_id("abc/def") + + def test_session_id_type_raises(self): + with pytest.raises(ValueError): + P._validate_session_id(None) + + def test_slug_valid(self): + assert P._validate_slug("facturation_urgence") == "facturation_urgence" + + def test_slug_too_short(self): + with pytest.raises(ValueError): + P._validate_slug("ab") + + def test_slug_starts_with_digit(self): + with pytest.raises(ValueError): + P._validate_slug("123_abc") + + +# --------------------------------------------------------------------------- +# Tests : phash et grouping +# --------------------------------------------------------------------------- + + +class TestPerceptualHash: + def test_compute_phash_returns_str(self): + img = _make_image() + h = P.compute_phash(img) + assert isinstance(h, str) and len(h) > 0 + + def test_identical_images_same_phash(self): + img1 = _make_image(color=(255, 255, 255)) + img2 = _make_image(color=(255, 255, 255)) + assert P.compute_phash(img1) == P.compute_phash(img2) + + def _noise_image(self, seed: int): + """Image avec un motif différent par seed (forme + position).""" + import random + rng = random.Random(seed) + img = _make_image(color=(255, 255, 255)) + d = ImageDraw.Draw(img) + for _ in range(40): + x = rng.randint(0, 240) + y = rng.randint(0, 240) + w = rng.randint(20, 60) + h = rng.randint(20, 60) + col = (rng.randint(0, 255), rng.randint(0, 255), rng.randint(0, 255)) + d.rectangle([x, y, x + w, y + h], fill=col) + return img + + def test_different_images_different_phash(self): + img1 = self._noise_image(seed=1) + img2 = self._noise_image(seed=999) + h1 = P.compute_phash(img1) + h2 = P.compute_phash(img2) + if h1.startswith("md5:") or h2.startswith("md5:"): + assert h1 != h2 + else: + # Bruits différents -> distance largement > seuil. + assert P._hamming_distance(h1, h2) > P.PHASH_HAMMING_THRESHOLD + + def test_identify_distinct_screens_groups_identicals(self): + img_a1 = self._noise_image(seed=42) + img_a2 = self._noise_image(seed=42) # même seed = même image = même phash + img_b = self._noise_image(seed=1337) + frames = [(0, img_a1), (1, img_a2), (5, img_b)] + reps = P.identify_distinct_screens(frames) + indexes = [r[0] for r in reps] + assert 0 in indexes + assert 5 in indexes + assert 1 not in indexes # regroupé avec idx 0 + assert len(reps) == 2 + + def test_identify_distinct_screens_empty(self): + assert P.identify_distinct_screens([]) == [] + + +# --------------------------------------------------------------------------- +# Tests : analyze_screen avec OmniParser OK +# --------------------------------------------------------------------------- + + +class TestAnalyzeScreenOmniParserOK: + def test_nominal_run(self, tmp_path, monkeypatch, fake_omniparser_ok): + # Rediriger le cache vers tmp + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + analyzer = P.Phase25Analyzer(session_id="sess1", omniparser=fake_omniparser_ok) + img = _make_image() + result = analyzer.analyze_screen( + frame_index=42, image=img, phash="deadbeef", screenshot_path=None, + ) + assert result.index == 42 + assert result.screen_id == "screen_042" + assert result.degraded is False + # Structure : 1 button + 1 field + 1 text_block (cf. fake_detect). + assert len(result.structure.buttons) == 1 + assert result.structure.buttons[0]["label"] == "Valider" + assert len(result.structure.forms) == 1 + assert len(result.structure.text_blocks) == 1 + + def test_cache_hit_skips_omniparser(self, tmp_path, monkeypatch, fake_omniparser_ok): + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + analyzer = P.Phase25Analyzer(session_id="sess1", omniparser=fake_omniparser_ok) + img = _make_image() + # 1er appel : remplit le cache. + analyzer.analyze_screen(frame_index=7, image=img, phash="aa") + call_count_1 = fake_omniparser_ok._adapter.detect.call_count + # 2e appel : doit lire depuis le cache, pas re-appeler OmniParser. + analyzer.analyze_screen(frame_index=7, image=img, phash="aa") + call_count_2 = fake_omniparser_ok._adapter.detect.call_count + assert call_count_2 == call_count_1 + + +# --------------------------------------------------------------------------- +# Tests : fallback OCR-seul +# --------------------------------------------------------------------------- + + +class TestFallbackOCR: + def test_omniparser_raises_falls_back_degraded( + self, tmp_path, monkeypatch, fake_omniparser_raising + ): + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") + monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log") + # Stub docTR : retourne 2 text_blocks. + monkeypatch.setattr( + P, "_detect_via_doctr", + lambda image, screenshot_path: [ + {"label": "Champ A", "text": "Champ A", "bbox": [0, 0, 50, 20], "confidence": 0.6}, + {"label": "Champ B", "text": "Champ B", "bbox": [60, 0, 110, 20], "confidence": 0.6}, + ], + ) + analyzer = P.Phase25Analyzer( + session_id="sessFB", omniparser=fake_omniparser_raising + ) + img = _make_image() + result = analyzer.analyze_screen(frame_index=3, image=img, phash="zz") + assert result.degraded is True + assert result.degraded_reason and "omniparser_exception" in result.degraded_reason + # Fallback docTR doit avoir produit 2 text_blocks. + assert len(result.structure.text_blocks) == 2 + # Le log d'erreur doit avoir été écrit. + assert (tmp_path / "logs" / "omniparser_errors.log").exists() + + def test_omniparser_unavailable_uses_doctr( + self, tmp_path, monkeypatch, fake_omniparser_unavailable + ): + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + monkeypatch.setattr( + P, "_detect_via_doctr", + lambda image, screenshot_path: [ + {"label": "Hello", "text": "Hello", "bbox": [0, 0, 30, 10], "confidence": 0.6}, + ], + ) + analyzer = P.Phase25Analyzer( + session_id="sessUNAV", omniparser=fake_omniparser_unavailable + ) + img = _make_image() + result = analyzer.analyze_screen(frame_index=1, image=img, phash="aa") + assert result.degraded is True + assert "omniparser_unavailable" in (result.degraded_reason or "") + assert len(result.structure.text_blocks) == 1 + + +# --------------------------------------------------------------------------- +# Tests : healthcheck +# --------------------------------------------------------------------------- + + +class TestHealthcheck: + def test_healthcheck_ok(self, fake_omniparser_ok): + analyzer = P.Phase25Analyzer(session_id="hc1", omniparser=fake_omniparser_ok) + assert analyzer.healthcheck() is True + assert analyzer._healthcheck_reason is None + + def test_healthcheck_unavailable(self, fake_omniparser_unavailable): + analyzer = P.Phase25Analyzer( + session_id="hc2", omniparser=fake_omniparser_unavailable + ) + assert analyzer.healthcheck() is False + assert analyzer._healthcheck_reason is not None + + def test_healthcheck_raises_logs(self, tmp_path, monkeypatch, fake_omniparser_raising): + monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") + monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log") + analyzer = P.Phase25Analyzer( + session_id="hc3", omniparser=fake_omniparser_raising + ) + assert analyzer.healthcheck() is False + assert (tmp_path / "logs" / "omniparser_errors.log").exists() + + +# --------------------------------------------------------------------------- +# Tests : pipeline analyze_frames + cap too_complex +# --------------------------------------------------------------------------- + + +class TestAnalyzeFrames: + def test_pipeline_groups_and_analyzes(self, tmp_path, monkeypatch, fake_omniparser_ok): + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + analyzer = P.Phase25Analyzer(session_id="pipeline1", omniparser=fake_omniparser_ok) + # 4 frames : 2 blancs (groupés) + 2 noirs (groupés). + frames = [ + (0, _make_image(color=(255, 255, 255))), + (1, _make_image(color=(255, 255, 255))), + (2, _make_image(color=(0, 0, 0))), + (3, _make_image(color=(0, 0, 0))), + ] + result = analyzer.analyze_frames(frames=frames, run_healthcheck=True) + assert result.too_complex is False + # Au plus 2 représentants après grouping. + assert len(result.screens) <= 2 + assert result.omniparser_available is True + + def test_too_complex_caps_at_max(self, tmp_path, monkeypatch, fake_omniparser_ok): + monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") + analyzer = P.Phase25Analyzer( + session_id="pipeline2", + omniparser=fake_omniparser_ok, + max_screens=3, # cap volontairement bas pour le test + ) + # 5 frames "visuellement distinctes" avec couleurs très différentes. + frames = [] + colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (0, 255, 255)] + for i, c in enumerate(colors): + img = _make_image(size=(256, 256), color=c) + # Ajouter du bruit pour que phash diffère bien. + draw = ImageDraw.Draw(img) + draw.rectangle([i * 20, i * 20, i * 20 + 50, i * 20 + 50], fill=(128, 128, 128)) + frames.append((i, img)) + result = analyzer.analyze_frames(frames=frames, run_healthcheck=True) + # Le cap doit s'appliquer. + assert len(result.screens) <= 3 + if len(result.screens) == 3: + # too_complex doit refléter le fait qu'on a tronqué. + # (vrai uniquement si phash a vu > 3 représentants). + assert result.too_complex in (True, False) + + +# --------------------------------------------------------------------------- +# Tests : write_semantic_yaml +# --------------------------------------------------------------------------- + + +class TestWriteSemanticYaml: + def test_writes_valid_yaml(self, tmp_path, fake_omniparser_ok): + analyzer = P.Phase25Analyzer(session_id="yaml1", omniparser=fake_omniparser_ok) + result = P.Phase25Result( + session_id="yaml1", + generated_at="2026-06-01T18:30:00Z", + omniparser_available=True, + degraded=False, + too_complex=False, + screens=[ + P.ScreenAnalysis( + index=42, + phash="abc123", + screen_id="screen_042", + screenshot_path="/tmp/shot.png", + structure=P.SemanticStructure( + buttons=[{"label": "OK", "bbox": [0, 0, 10, 10], "confidence": 0.9}], + ), + ), + ], + ) + target = analyzer.write_semantic_yaml( + result, slug="ma_competence", target_dir=tmp_path, + ) + assert target.exists() + data = yaml.safe_load(target.read_text(encoding="utf-8")) + assert data["competence_id"] == "ma_competence" + assert data["semantic_version"] == 1 + assert data["degraded"] is False + assert len(data["screens"]) == 1 + assert data["screens"][0]["structure"]["buttons"][0]["label"] == "OK" + + def test_degraded_yaml_is_valid(self, tmp_path, fake_omniparser_raising): + analyzer = P.Phase25Analyzer(session_id="yaml2", omniparser=fake_omniparser_raising) + result = P.Phase25Result( + session_id="yaml2", + generated_at="2026-06-01T18:30:00Z", + omniparser_available=False, + degraded=True, + too_complex=False, + screens=[ + P.ScreenAnalysis( + index=0, + phash="00", + screen_id="screen_000", + screenshot_path=None, + structure=P.SemanticStructure(), + degraded=True, + degraded_reason="omniparser_exception: RuntimeError", + ), + ], + ) + target = analyzer.write_semantic_yaml(result, slug="fallback_comp", target_dir=tmp_path) + data = yaml.safe_load(target.read_text(encoding="utf-8")) + assert data["degraded"] is True + assert data["screens"][0]["degraded"] is True + assert "omniparser_exception" in data["screens"][0]["degraded_reason"] + + def test_invalid_slug_raises(self, tmp_path, fake_omniparser_ok): + analyzer = P.Phase25Analyzer(session_id="yaml3", omniparser=fake_omniparser_ok) + result = P.Phase25Result( + session_id="yaml3", generated_at="x", omniparser_available=True, + degraded=False, too_complex=False, screens=[], + ) + with pytest.raises(ValueError): + analyzer.write_semantic_yaml(result, slug="../etc/passwd", target_dir=tmp_path) + + def test_forbidden_target_dir(self, tmp_path, fake_omniparser_ok): + analyzer = P.Phase25Analyzer(session_id="yaml4", omniparser=fake_omniparser_ok) + result = P.Phase25Result( + session_id="yaml4", generated_at="x", omniparser_available=True, + degraded=False, too_complex=False, screens=[], + ) + # Anti écriture dans supervised/stable. + forbidden = tmp_path / "supervised" + forbidden.mkdir() + with pytest.raises(ValueError): + analyzer.write_semantic_yaml(result, slug="abc_def", target_dir=forbidden) + + +# --------------------------------------------------------------------------- +# Tests : contrat snapshots (elements aplatis) +# --------------------------------------------------------------------------- + + +class TestSnapshotContract: + def test_screen_to_dict_includes_elements(self, fake_omniparser_ok): + s = P.ScreenAnalysis( + index=1, + phash="aa", + screen_id="screen_001", + screenshot_path="/tmp/s.png", + structure=P.SemanticStructure( + buttons=[{"label": "Valider", "bbox": [0, 0, 50, 20], "confidence": 0.9}], + forms=[{"label": "Nom", "bbox": [60, 0, 200, 20], "confidence": 0.8}], + text_blocks=[{"label": "Hello", "text": "Hello", "bbox": [0, 30, 100, 50], "confidence": 0.6}], + ), + window_title="Easily Assure", + ) + d = s.to_dict() + assert "elements" in d + assert any(e["kind"] == "button" and e["label"] == "Valider" for e in d["elements"]) + assert any(e["kind"] == "field" and e["label"] == "Nom" for e in d["elements"]) + assert any(e["kind"] == "text_block" for e in d["elements"]) + assert d["window_title"] == "Easily Assure" diff --git a/tests/unit/test_replay_critic.py b/tests/unit/test_replay_critic.py index b20abddbd..041d28424 100644 --- a/tests/unit/test_replay_critic.py +++ b/tests/unit/test_replay_critic.py @@ -346,6 +346,28 @@ class TestMergeResults: class TestEnrichActionsWithIntentions: + @patch("requests.post") + @patch("requests.get") + def test_enrichissement_desactive_par_flag( + self, + mock_get, + mock_post, + monkeypatch, + tmp_path, + ): + """Le flag demo evite tout appel Ollama pendant le build replay.""" + from agent_v0.server_v1.stream_processor import _enrich_actions_with_intentions + + monkeypatch.setenv("RPA_SKIP_INTENTION_ENRICHMENT", "1") + actions = [ + {"type": "click", "action_id": "act_001", "target_spec": {"by_text": "OK"}}, + ] + + _enrich_actions_with_intentions(actions, tmp_path) + + assert "intention" not in actions[0] + mock_get.assert_not_called() + mock_post.assert_not_called() @patch("requests.post") @patch("requests.get") diff --git a/tests/unit/test_replay_memory.py b/tests/unit/test_replay_memory.py index 731ad31d4..0a5f1f4d2 100644 --- a/tests/unit/test_replay_memory.py +++ b/tests/unit/test_replay_memory.py @@ -68,6 +68,10 @@ def test_memory_lookup_keeps_learned_visual_coords_with_window_capture(monkeypat target_spec={ "by_text": "Enregistrer", "by_role": "yolo", + "context_hints": { + "expected_window_before": "*test – Bloc-notes", + "interaction": "toolbar_save_button", + }, "window_capture": { "click_relative": [860, 634], "window_size": [1920, 1116], @@ -81,6 +85,71 @@ def test_memory_lookup_keeps_learned_visual_coords_with_window_capture(monkeypat assert result["y_pct"] == 0.578125 +def test_memory_lookup_skips_window_transition_even_if_record_exists(monkeypatch): + fp = SimpleNamespace( + bbox=(0.5, 0.8, 0.0, 0.0), + etype="grounding_vlm", + confidence=0.85, + ) + monkeypatch.setattr(replay_memory, "get_memory_store", lambda: _DummyStore(fp)) + + result = replay_memory.memory_lookup( + window_title="*test – Bloc-notes", + target_spec={ + "by_text": "Enregistrer", + "by_role": "button", + "context_hints": { + "expected_window_before": "*test – Bloc-notes", + "expected_window_after": "Enregistrer sous", + "requires_window_transition": True, + }, + }, + ) + + assert result is None + + +def test_memory_lookup_rejects_generic_button_without_context(monkeypatch): + fp = SimpleNamespace( + bbox=(0.5, 0.8, 0.0, 0.0), + etype="grounding_vlm", + confidence=0.85, + ) + monkeypatch.setattr(replay_memory, "get_memory_store", lambda: _DummyStore(fp)) + + result = replay_memory.memory_lookup( + window_title="*test – Bloc-notes", + target_spec={"by_text": "Enregistrer", "by_role": "button"}, + ) + + assert result is None + + +def test_memory_lookup_allows_generic_button_with_context(monkeypatch): + fp = SimpleNamespace( + bbox=(0.5, 0.8, 0.0, 0.0), + etype="grounding_vlm", + confidence=0.85, + ) + monkeypatch.setattr(replay_memory, "get_memory_store", lambda: _DummyStore(fp)) + + result = replay_memory.memory_lookup( + window_title="Enregistrer sous", + target_spec={ + "by_text": "Enregistrer", + "by_role": "button", + "window_title": "Enregistrer sous", + "context_hints": { + "expected_window_before": "Enregistrer sous", + "interaction": "save_dialog_primary_button", + }, + }, + ) + + assert result is not None + assert result["method"] == "memory_grounding_vlm" + + def test_target_spec_hash_distinguishes_same_text_with_different_spatial_hints(tmp_path): store = TargetMemoryStore(base_path=str(tmp_path / "learning")) diff --git a/visual_workflow_builder/backend/services/learned_workflow_bridge.py b/visual_workflow_builder/backend/services/learned_workflow_bridge.py index 9808be2a4..367e28944 100644 --- a/visual_workflow_builder/backend/services/learned_workflow_bridge.py +++ b/visual_workflow_builder/backend/services/learned_workflow_bridge.py @@ -701,11 +701,47 @@ def _vwb_params_to_core(action_type: str, params: Dict[str, Any]) -> Dict[str, A return core_params +def _first_non_empty(*values: Any) -> str: + for value in values: + text = str(value or "").strip() + if text and text.casefold() not in {"none", "null"}: + return text + return "" + + def _vwb_params_to_target_spec(action_type: str, params: Dict[str, Any]) -> Dict[str, Any]: """Construit un TargetSpec core depuis les paramètres VWB.""" + visual_anchor = params.get("visual_anchor") or {} + if not isinstance(visual_anchor, dict): + visual_anchor = {} + + target_text = _first_non_empty( + params.get("target_text"), + params.get("by_text"), + visual_anchor.get("target_text"), + ) + description = _first_non_empty( + params.get("description"), + visual_anchor.get("description"), + ) + ocr_description = _first_non_empty( + params.get("ocr_description"), + visual_anchor.get("ocr_description"), + ) + vlm_description = _first_non_empty( + params.get("vlm_description"), + description, + ocr_description, + ) + anchor_id = _first_non_empty( + params.get("anchor_id"), + visual_anchor.get("anchor_id"), + visual_anchor.get("id"), + ) + target = { "by_role": params.get("target_role", "unknown_element"), - "by_text": params.get("target_text"), + "by_text": target_text or None, "by_position": None, "selection_policy": "first", "fallback_strategy": "visual_similarity", @@ -717,6 +753,27 @@ def _vwb_params_to_target_spec(action_type: str, params: Dict[str, Any]) -> Dict if x_pct is not None and y_pct is not None: target["by_position"] = [x_pct, y_pct] + context_hints: Dict[str, Any] = {} + if anchor_id: + context_hints["anchor_id"] = anchor_id + if target_text: + context_hints["target_text"] = target_text + context_hints["by_text_source"] = "visual_anchor" + if description: + context_hints["description"] = description + if ocr_description: + context_hints["ocr_description"] = ocr_description + if vlm_description: + context_hints["vlm_description"] = vlm_description + if visual_anchor.get("anchor_bbox") or visual_anchor.get("bounding_box"): + context_hints["anchor_bbox"] = ( + visual_anchor.get("anchor_bbox") or visual_anchor.get("bounding_box") + ) + if visual_anchor.get("original_size"): + context_hints["original_size"] = visual_anchor["original_size"] + if context_hints: + target["context_hints"] = context_hints + return target