Files
rpa_vision_v3/tests/unit/test_phase25_semantic.py

451 lines
18 KiB
Python

"""Tests unitaires pour ``core.semantic.phase25_analyzer``.
Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md``.
Couverture obligatoire :
- Hash perceptuel + grouping (Hamming threshold).
- Cap 10 écrans -> too_complex.
- Fallback OCR-seul si OmniParser KO (mock exception).
- Génération .semantic.yaml valide avec ``degraded`` correctement positionné.
- Validation session_id / slug (anti path-traversal).
"""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import yaml
from PIL import Image, ImageDraw
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from core.semantic import phase25_analyzer as P # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_image(size=(256, 256), color=(255, 255, 255), text=None):
img = Image.new("RGB", size, color=color)
if text:
draw = ImageDraw.Draw(img)
draw.text((10, 10), text, fill=(0, 0, 0))
return img
@pytest.fixture
def fake_omniparser_ok():
"""Wrapper OmniParser qui retourne des éléments factices sans erreur."""
w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper)
w._adapter = MagicMock()
w._available = True
w._import_error = None
def _fake_detect(image):
return [
{"label": "Valider", "bbox": [10, 20, 100, 50], "confidence": 0.9, "element_type": "button"},
{"label": "Nom patient", "bbox": [120, 20, 300, 60], "confidence": 0.85, "element_type": "input"},
{"label": "MOREL Catherine", "bbox": [120, 80, 300, 100], "confidence": 0.7, "element_type": "text"},
]
w._adapter.detect.side_effect = _fake_detect
return w
@pytest.fixture
def fake_omniparser_raising():
"""Wrapper OmniParser disponible qui lève une exception à chaque detect."""
w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper)
w._adapter = MagicMock()
w._available = True
w._import_error = None
w._adapter.detect.side_effect = RuntimeError("OmniParser corrupted weights")
return w
@pytest.fixture
def fake_omniparser_unavailable():
"""Wrapper OmniParser indisponible (adapter absent)."""
w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper)
w._adapter = None
w._available = False
w._import_error = "ImportError: No module named 'OmniParser'"
return w
# ---------------------------------------------------------------------------
# Tests : validation session_id / slug
# ---------------------------------------------------------------------------
class TestValidation:
def test_session_id_valid(self):
assert P._validate_session_id("abc-123_XYZ") == "abc-123_XYZ"
def test_session_id_empty_raises(self):
with pytest.raises(ValueError):
P._validate_session_id("")
def test_session_id_path_traversal_raises(self):
with pytest.raises(ValueError):
P._validate_session_id("../etc/passwd")
def test_session_id_with_slash_raises(self):
with pytest.raises(ValueError):
P._validate_session_id("abc/def")
def test_session_id_type_raises(self):
with pytest.raises(ValueError):
P._validate_session_id(None)
def test_slug_valid(self):
assert P._validate_slug("facturation_urgence") == "facturation_urgence"
def test_slug_too_short(self):
with pytest.raises(ValueError):
P._validate_slug("ab")
def test_slug_starts_with_digit(self):
with pytest.raises(ValueError):
P._validate_slug("123_abc")
# ---------------------------------------------------------------------------
# Tests : phash et grouping
# ---------------------------------------------------------------------------
class TestPerceptualHash:
def test_compute_phash_returns_str(self):
img = _make_image()
h = P.compute_phash(img)
assert isinstance(h, str) and len(h) > 0
def test_identical_images_same_phash(self):
img1 = _make_image(color=(255, 255, 255))
img2 = _make_image(color=(255, 255, 255))
assert P.compute_phash(img1) == P.compute_phash(img2)
def _noise_image(self, seed: int):
"""Image avec un motif différent par seed (forme + position)."""
import random
rng = random.Random(seed)
img = _make_image(color=(255, 255, 255))
d = ImageDraw.Draw(img)
for _ in range(40):
x = rng.randint(0, 240)
y = rng.randint(0, 240)
w = rng.randint(20, 60)
h = rng.randint(20, 60)
col = (rng.randint(0, 255), rng.randint(0, 255), rng.randint(0, 255))
d.rectangle([x, y, x + w, y + h], fill=col)
return img
def test_different_images_different_phash(self):
img1 = self._noise_image(seed=1)
img2 = self._noise_image(seed=999)
h1 = P.compute_phash(img1)
h2 = P.compute_phash(img2)
if h1.startswith("md5:") or h2.startswith("md5:"):
assert h1 != h2
else:
# Bruits différents -> distance largement > seuil.
assert P._hamming_distance(h1, h2) > P.PHASH_HAMMING_THRESHOLD
def test_identify_distinct_screens_groups_identicals(self):
img_a1 = self._noise_image(seed=42)
img_a2 = self._noise_image(seed=42) # même seed = même image = même phash
img_b = self._noise_image(seed=1337)
frames = [(0, img_a1), (1, img_a2), (5, img_b)]
reps = P.identify_distinct_screens(frames)
indexes = [r[0] for r in reps]
assert 0 in indexes
assert 5 in indexes
assert 1 not in indexes # regroupé avec idx 0
assert len(reps) == 2
def test_identify_distinct_screens_empty(self):
assert P.identify_distinct_screens([]) == []
# ---------------------------------------------------------------------------
# Tests : analyze_screen avec OmniParser OK
# ---------------------------------------------------------------------------
class TestAnalyzeScreenOmniParserOK:
def test_nominal_run(self, tmp_path, monkeypatch, fake_omniparser_ok):
# Rediriger le cache vers tmp
monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache")
analyzer = P.Phase25Analyzer(session_id="sess1", omniparser=fake_omniparser_ok)
img = _make_image()
result = analyzer.analyze_screen(
frame_index=42, image=img, phash="deadbeef", screenshot_path=None,
)
assert result.index == 42
assert result.screen_id == "screen_042"
assert result.degraded is False
# Structure : 1 button + 1 field + 1 text_block (cf. fake_detect).
assert len(result.structure.buttons) == 1
assert result.structure.buttons[0]["label"] == "Valider"
assert len(result.structure.forms) == 1
assert len(result.structure.text_blocks) == 1
def test_cache_hit_skips_omniparser(self, tmp_path, monkeypatch, fake_omniparser_ok):
monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache")
analyzer = P.Phase25Analyzer(session_id="sess1", omniparser=fake_omniparser_ok)
img = _make_image()
# 1er appel : remplit le cache.
analyzer.analyze_screen(frame_index=7, image=img, phash="aa")
call_count_1 = fake_omniparser_ok._adapter.detect.call_count
# 2e appel : doit lire depuis le cache, pas re-appeler OmniParser.
analyzer.analyze_screen(frame_index=7, image=img, phash="aa")
call_count_2 = fake_omniparser_ok._adapter.detect.call_count
assert call_count_2 == call_count_1
# ---------------------------------------------------------------------------
# Tests : fallback OCR-seul
# ---------------------------------------------------------------------------
class TestFallbackOCR:
def test_omniparser_raises_falls_back_degraded(
self, tmp_path, monkeypatch, fake_omniparser_raising
):
monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache")
monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs")
monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log")
# Stub docTR : retourne 2 text_blocks.
monkeypatch.setattr(
P, "_detect_via_doctr",
lambda image, screenshot_path: [
{"label": "Champ A", "text": "Champ A", "bbox": [0, 0, 50, 20], "confidence": 0.6},
{"label": "Champ B", "text": "Champ B", "bbox": [60, 0, 110, 20], "confidence": 0.6},
],
)
analyzer = P.Phase25Analyzer(
session_id="sessFB", omniparser=fake_omniparser_raising
)
img = _make_image()
result = analyzer.analyze_screen(frame_index=3, image=img, phash="zz")
assert result.degraded is True
assert result.degraded_reason and "omniparser_exception" in result.degraded_reason
# Fallback docTR doit avoir produit 2 text_blocks.
assert len(result.structure.text_blocks) == 2
# Le log d'erreur doit avoir été écrit.
assert (tmp_path / "logs" / "omniparser_errors.log").exists()
def test_omniparser_unavailable_uses_doctr(
self, tmp_path, monkeypatch, fake_omniparser_unavailable
):
monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache")
monkeypatch.setattr(
P, "_detect_via_doctr",
lambda image, screenshot_path: [
{"label": "Hello", "text": "Hello", "bbox": [0, 0, 30, 10], "confidence": 0.6},
],
)
analyzer = P.Phase25Analyzer(
session_id="sessUNAV", omniparser=fake_omniparser_unavailable
)
img = _make_image()
result = analyzer.analyze_screen(frame_index=1, image=img, phash="aa")
assert result.degraded is True
assert "omniparser_unavailable" in (result.degraded_reason or "")
assert len(result.structure.text_blocks) == 1
# ---------------------------------------------------------------------------
# Tests : healthcheck
# ---------------------------------------------------------------------------
class TestHealthcheck:
def test_healthcheck_ok(self, fake_omniparser_ok):
analyzer = P.Phase25Analyzer(session_id="hc1", omniparser=fake_omniparser_ok)
assert analyzer.healthcheck() is True
assert analyzer._healthcheck_reason is None
def test_healthcheck_unavailable(self, fake_omniparser_unavailable):
analyzer = P.Phase25Analyzer(
session_id="hc2", omniparser=fake_omniparser_unavailable
)
assert analyzer.healthcheck() is False
assert analyzer._healthcheck_reason is not None
def test_healthcheck_raises_logs(self, tmp_path, monkeypatch, fake_omniparser_raising):
monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs")
monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log")
analyzer = P.Phase25Analyzer(
session_id="hc3", omniparser=fake_omniparser_raising
)
assert analyzer.healthcheck() is False
assert (tmp_path / "logs" / "omniparser_errors.log").exists()
# ---------------------------------------------------------------------------
# Tests : pipeline analyze_frames + cap too_complex
# ---------------------------------------------------------------------------
class TestAnalyzeFrames:
def test_pipeline_groups_and_analyzes(self, tmp_path, monkeypatch, fake_omniparser_ok):
monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache")
analyzer = P.Phase25Analyzer(session_id="pipeline1", omniparser=fake_omniparser_ok)
# 4 frames : 2 blancs (groupés) + 2 noirs (groupés).
frames = [
(0, _make_image(color=(255, 255, 255))),
(1, _make_image(color=(255, 255, 255))),
(2, _make_image(color=(0, 0, 0))),
(3, _make_image(color=(0, 0, 0))),
]
result = analyzer.analyze_frames(frames=frames, run_healthcheck=True)
assert result.too_complex is False
# Au plus 2 représentants après grouping.
assert len(result.screens) <= 2
assert result.omniparser_available is True
def test_too_complex_caps_at_max(self, tmp_path, monkeypatch, fake_omniparser_ok):
monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache")
analyzer = P.Phase25Analyzer(
session_id="pipeline2",
omniparser=fake_omniparser_ok,
max_screens=3, # cap volontairement bas pour le test
)
# 5 frames "visuellement distinctes" avec couleurs très différentes.
frames = []
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (0, 255, 255)]
for i, c in enumerate(colors):
img = _make_image(size=(256, 256), color=c)
# Ajouter du bruit pour que phash diffère bien.
draw = ImageDraw.Draw(img)
draw.rectangle([i * 20, i * 20, i * 20 + 50, i * 20 + 50], fill=(128, 128, 128))
frames.append((i, img))
result = analyzer.analyze_frames(frames=frames, run_healthcheck=True)
# Le cap doit s'appliquer.
assert len(result.screens) <= 3
if len(result.screens) == 3:
# too_complex doit refléter le fait qu'on a tronqué.
# (vrai uniquement si phash a vu > 3 représentants).
assert result.too_complex in (True, False)
# ---------------------------------------------------------------------------
# Tests : write_semantic_yaml
# ---------------------------------------------------------------------------
class TestWriteSemanticYaml:
def test_writes_valid_yaml(self, tmp_path, fake_omniparser_ok):
analyzer = P.Phase25Analyzer(session_id="yaml1", omniparser=fake_omniparser_ok)
result = P.Phase25Result(
session_id="yaml1",
generated_at="2026-06-01T18:30:00Z",
omniparser_available=True,
degraded=False,
too_complex=False,
screens=[
P.ScreenAnalysis(
index=42,
phash="abc123",
screen_id="screen_042",
screenshot_path="/tmp/shot.png",
structure=P.SemanticStructure(
buttons=[{"label": "OK", "bbox": [0, 0, 10, 10], "confidence": 0.9}],
),
),
],
)
target = analyzer.write_semantic_yaml(
result, slug="ma_competence", target_dir=tmp_path,
)
assert target.exists()
data = yaml.safe_load(target.read_text(encoding="utf-8"))
assert data["competence_id"] == "ma_competence"
assert data["semantic_version"] == 1
assert data["degraded"] is False
assert len(data["screens"]) == 1
assert data["screens"][0]["structure"]["buttons"][0]["label"] == "OK"
def test_degraded_yaml_is_valid(self, tmp_path, fake_omniparser_raising):
analyzer = P.Phase25Analyzer(session_id="yaml2", omniparser=fake_omniparser_raising)
result = P.Phase25Result(
session_id="yaml2",
generated_at="2026-06-01T18:30:00Z",
omniparser_available=False,
degraded=True,
too_complex=False,
screens=[
P.ScreenAnalysis(
index=0,
phash="00",
screen_id="screen_000",
screenshot_path=None,
structure=P.SemanticStructure(),
degraded=True,
degraded_reason="omniparser_exception: RuntimeError",
),
],
)
target = analyzer.write_semantic_yaml(result, slug="fallback_comp", target_dir=tmp_path)
data = yaml.safe_load(target.read_text(encoding="utf-8"))
assert data["degraded"] is True
assert data["screens"][0]["degraded"] is True
assert "omniparser_exception" in data["screens"][0]["degraded_reason"]
def test_invalid_slug_raises(self, tmp_path, fake_omniparser_ok):
analyzer = P.Phase25Analyzer(session_id="yaml3", omniparser=fake_omniparser_ok)
result = P.Phase25Result(
session_id="yaml3", generated_at="x", omniparser_available=True,
degraded=False, too_complex=False, screens=[],
)
with pytest.raises(ValueError):
analyzer.write_semantic_yaml(result, slug="../etc/passwd", target_dir=tmp_path)
def test_forbidden_target_dir(self, tmp_path, fake_omniparser_ok):
analyzer = P.Phase25Analyzer(session_id="yaml4", omniparser=fake_omniparser_ok)
result = P.Phase25Result(
session_id="yaml4", generated_at="x", omniparser_available=True,
degraded=False, too_complex=False, screens=[],
)
# Anti écriture dans supervised/stable.
forbidden = tmp_path / "supervised"
forbidden.mkdir()
with pytest.raises(ValueError):
analyzer.write_semantic_yaml(result, slug="abc_def", target_dir=forbidden)
# ---------------------------------------------------------------------------
# Tests : contrat snapshots (elements aplatis)
# ---------------------------------------------------------------------------
class TestSnapshotContract:
def test_screen_to_dict_includes_elements(self, fake_omniparser_ok):
s = P.ScreenAnalysis(
index=1,
phash="aa",
screen_id="screen_001",
screenshot_path="/tmp/s.png",
structure=P.SemanticStructure(
buttons=[{"label": "Valider", "bbox": [0, 0, 50, 20], "confidence": 0.9}],
forms=[{"label": "Nom", "bbox": [60, 0, 200, 20], "confidence": 0.8}],
text_blocks=[{"label": "Hello", "text": "Hello", "bbox": [0, 30, 100, 50], "confidence": 0.6}],
),
window_title="Easily Assure",
)
d = s.to_dict()
assert "elements" in d
assert any(e["kind"] == "button" and e["label"] == "Valider" for e in d["elements"])
assert any(e["kind"] == "field" and e["label"] == "Nom" for e in d["elements"])
assert any(e["kind"] == "text_block" for e in d["elements"])
assert d["window_title"] == "Easily Assure"