"""Tests unitaires pour ``core.semantic.phase25_analyzer``. Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md``. Couverture obligatoire : - Hash perceptuel + grouping (Hamming threshold). - Cap 10 écrans -> too_complex. - Fallback OCR-seul si OmniParser KO (mock exception). - Génération .semantic.yaml valide avec ``degraded`` correctement positionné. - Validation session_id / slug (anti path-traversal). """ from __future__ import annotations import sys from pathlib import Path from unittest.mock import MagicMock, patch import pytest import yaml from PIL import Image, ImageDraw _ROOT = str(Path(__file__).resolve().parents[2]) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) from core.semantic import phase25_analyzer as P # noqa: E402 # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- def _make_image(size=(256, 256), color=(255, 255, 255), text=None): img = Image.new("RGB", size, color=color) if text: draw = ImageDraw.Draw(img) draw.text((10, 10), text, fill=(0, 0, 0)) return img @pytest.fixture def fake_omniparser_ok(): """Wrapper OmniParser qui retourne des éléments factices sans erreur.""" w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) w._adapter = MagicMock() w._available = True w._import_error = None def _fake_detect(image): return [ {"label": "Valider", "bbox": [10, 20, 100, 50], "confidence": 0.9, "element_type": "button"}, {"label": "Nom patient", "bbox": [120, 20, 300, 60], "confidence": 0.85, "element_type": "input"}, {"label": "MOREL Catherine", "bbox": [120, 80, 300, 100], "confidence": 0.7, "element_type": "text"}, ] w._adapter.detect.side_effect = _fake_detect return w @pytest.fixture def fake_omniparser_raising(): """Wrapper OmniParser disponible qui lève une exception à chaque detect.""" w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) w._adapter = MagicMock() w._available = True w._import_error = None w._adapter.detect.side_effect = RuntimeError("OmniParser corrupted weights") return w @pytest.fixture def fake_omniparser_unavailable(): """Wrapper OmniParser indisponible (adapter absent).""" w = P._OmniParserSafeWrapper.__new__(P._OmniParserSafeWrapper) w._adapter = None w._available = False w._import_error = "ImportError: No module named 'OmniParser'" return w # --------------------------------------------------------------------------- # Tests : validation session_id / slug # --------------------------------------------------------------------------- class TestValidation: def test_session_id_valid(self): assert P._validate_session_id("abc-123_XYZ") == "abc-123_XYZ" def test_session_id_empty_raises(self): with pytest.raises(ValueError): P._validate_session_id("") def test_session_id_path_traversal_raises(self): with pytest.raises(ValueError): P._validate_session_id("../etc/passwd") def test_session_id_with_slash_raises(self): with pytest.raises(ValueError): P._validate_session_id("abc/def") def test_session_id_type_raises(self): with pytest.raises(ValueError): P._validate_session_id(None) def test_slug_valid(self): assert P._validate_slug("facturation_urgence") == "facturation_urgence" def test_slug_too_short(self): with pytest.raises(ValueError): P._validate_slug("ab") def test_slug_starts_with_digit(self): with pytest.raises(ValueError): P._validate_slug("123_abc") # --------------------------------------------------------------------------- # Tests : phash et grouping # --------------------------------------------------------------------------- class TestPerceptualHash: def test_compute_phash_returns_str(self): img = _make_image() h = P.compute_phash(img) assert isinstance(h, str) and len(h) > 0 def test_identical_images_same_phash(self): img1 = _make_image(color=(255, 255, 255)) img2 = _make_image(color=(255, 255, 255)) assert P.compute_phash(img1) == P.compute_phash(img2) def _noise_image(self, seed: int): """Image avec un motif différent par seed (forme + position).""" import random rng = random.Random(seed) img = _make_image(color=(255, 255, 255)) d = ImageDraw.Draw(img) for _ in range(40): x = rng.randint(0, 240) y = rng.randint(0, 240) w = rng.randint(20, 60) h = rng.randint(20, 60) col = (rng.randint(0, 255), rng.randint(0, 255), rng.randint(0, 255)) d.rectangle([x, y, x + w, y + h], fill=col) return img def test_different_images_different_phash(self): img1 = self._noise_image(seed=1) img2 = self._noise_image(seed=999) h1 = P.compute_phash(img1) h2 = P.compute_phash(img2) if h1.startswith("md5:") or h2.startswith("md5:"): assert h1 != h2 else: # Bruits différents -> distance largement > seuil. assert P._hamming_distance(h1, h2) > P.PHASH_HAMMING_THRESHOLD def test_identify_distinct_screens_groups_identicals(self): img_a1 = self._noise_image(seed=42) img_a2 = self._noise_image(seed=42) # même seed = même image = même phash img_b = self._noise_image(seed=1337) frames = [(0, img_a1), (1, img_a2), (5, img_b)] reps = P.identify_distinct_screens(frames) indexes = [r[0] for r in reps] assert 0 in indexes assert 5 in indexes assert 1 not in indexes # regroupé avec idx 0 assert len(reps) == 2 def test_identify_distinct_screens_empty(self): assert P.identify_distinct_screens([]) == [] # --------------------------------------------------------------------------- # Tests : analyze_screen avec OmniParser OK # --------------------------------------------------------------------------- class TestAnalyzeScreenOmniParserOK: def test_nominal_run(self, tmp_path, monkeypatch, fake_omniparser_ok): # Rediriger le cache vers tmp monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") analyzer = P.Phase25Analyzer(session_id="sess1", omniparser=fake_omniparser_ok) img = _make_image() result = analyzer.analyze_screen( frame_index=42, image=img, phash="deadbeef", screenshot_path=None, ) assert result.index == 42 assert result.screen_id == "screen_042" assert result.degraded is False # Structure : 1 button + 1 field + 1 text_block (cf. fake_detect). assert len(result.structure.buttons) == 1 assert result.structure.buttons[0]["label"] == "Valider" assert len(result.structure.forms) == 1 assert len(result.structure.text_blocks) == 1 def test_cache_hit_skips_omniparser(self, tmp_path, monkeypatch, fake_omniparser_ok): monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") analyzer = P.Phase25Analyzer(session_id="sess1", omniparser=fake_omniparser_ok) img = _make_image() # 1er appel : remplit le cache. analyzer.analyze_screen(frame_index=7, image=img, phash="aa") call_count_1 = fake_omniparser_ok._adapter.detect.call_count # 2e appel : doit lire depuis le cache, pas re-appeler OmniParser. analyzer.analyze_screen(frame_index=7, image=img, phash="aa") call_count_2 = fake_omniparser_ok._adapter.detect.call_count assert call_count_2 == call_count_1 # --------------------------------------------------------------------------- # Tests : fallback OCR-seul # --------------------------------------------------------------------------- class TestFallbackOCR: def test_omniparser_raises_falls_back_degraded( self, tmp_path, monkeypatch, fake_omniparser_raising ): monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log") # Stub docTR : retourne 2 text_blocks. monkeypatch.setattr( P, "_detect_via_doctr", lambda image, screenshot_path: [ {"label": "Champ A", "text": "Champ A", "bbox": [0, 0, 50, 20], "confidence": 0.6}, {"label": "Champ B", "text": "Champ B", "bbox": [60, 0, 110, 20], "confidence": 0.6}, ], ) analyzer = P.Phase25Analyzer( session_id="sessFB", omniparser=fake_omniparser_raising ) img = _make_image() result = analyzer.analyze_screen(frame_index=3, image=img, phash="zz") assert result.degraded is True assert result.degraded_reason and "omniparser_exception" in result.degraded_reason # Fallback docTR doit avoir produit 2 text_blocks. assert len(result.structure.text_blocks) == 2 # Le log d'erreur doit avoir été écrit. assert (tmp_path / "logs" / "omniparser_errors.log").exists() def test_omniparser_unavailable_uses_doctr( self, tmp_path, monkeypatch, fake_omniparser_unavailable ): monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") monkeypatch.setattr( P, "_detect_via_doctr", lambda image, screenshot_path: [ {"label": "Hello", "text": "Hello", "bbox": [0, 0, 30, 10], "confidence": 0.6}, ], ) analyzer = P.Phase25Analyzer( session_id="sessUNAV", omniparser=fake_omniparser_unavailable ) img = _make_image() result = analyzer.analyze_screen(frame_index=1, image=img, phash="aa") assert result.degraded is True assert "omniparser_unavailable" in (result.degraded_reason or "") assert len(result.structure.text_blocks) == 1 # --------------------------------------------------------------------------- # Tests : healthcheck # --------------------------------------------------------------------------- class TestHealthcheck: def test_healthcheck_ok(self, fake_omniparser_ok): analyzer = P.Phase25Analyzer(session_id="hc1", omniparser=fake_omniparser_ok) assert analyzer.healthcheck() is True assert analyzer._healthcheck_reason is None def test_healthcheck_unavailable(self, fake_omniparser_unavailable): analyzer = P.Phase25Analyzer( session_id="hc2", omniparser=fake_omniparser_unavailable ) assert analyzer.healthcheck() is False assert analyzer._healthcheck_reason is not None def test_healthcheck_raises_logs(self, tmp_path, monkeypatch, fake_omniparser_raising): monkeypatch.setattr(P, "LOGS_DIR", tmp_path / "logs") monkeypatch.setattr(P, "OMNIPARSER_ERROR_LOG", tmp_path / "logs" / "omniparser_errors.log") analyzer = P.Phase25Analyzer( session_id="hc3", omniparser=fake_omniparser_raising ) assert analyzer.healthcheck() is False assert (tmp_path / "logs" / "omniparser_errors.log").exists() # --------------------------------------------------------------------------- # Tests : pipeline analyze_frames + cap too_complex # --------------------------------------------------------------------------- class TestAnalyzeFrames: def test_pipeline_groups_and_analyzes(self, tmp_path, monkeypatch, fake_omniparser_ok): monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") analyzer = P.Phase25Analyzer(session_id="pipeline1", omniparser=fake_omniparser_ok) # 4 frames : 2 blancs (groupés) + 2 noirs (groupés). frames = [ (0, _make_image(color=(255, 255, 255))), (1, _make_image(color=(255, 255, 255))), (2, _make_image(color=(0, 0, 0))), (3, _make_image(color=(0, 0, 0))), ] result = analyzer.analyze_frames(frames=frames, run_healthcheck=True) assert result.too_complex is False # Au plus 2 représentants après grouping. assert len(result.screens) <= 2 assert result.omniparser_available is True def test_too_complex_caps_at_max(self, tmp_path, monkeypatch, fake_omniparser_ok): monkeypatch.setattr(P, "OMNIPARSER_CACHE_ROOT", tmp_path / "cache") analyzer = P.Phase25Analyzer( session_id="pipeline2", omniparser=fake_omniparser_ok, max_screens=3, # cap volontairement bas pour le test ) # 5 frames "visuellement distinctes" avec couleurs très différentes. frames = [] colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (0, 255, 255)] for i, c in enumerate(colors): img = _make_image(size=(256, 256), color=c) # Ajouter du bruit pour que phash diffère bien. draw = ImageDraw.Draw(img) draw.rectangle([i * 20, i * 20, i * 20 + 50, i * 20 + 50], fill=(128, 128, 128)) frames.append((i, img)) result = analyzer.analyze_frames(frames=frames, run_healthcheck=True) # Le cap doit s'appliquer. assert len(result.screens) <= 3 if len(result.screens) == 3: # too_complex doit refléter le fait qu'on a tronqué. # (vrai uniquement si phash a vu > 3 représentants). assert result.too_complex in (True, False) # --------------------------------------------------------------------------- # Tests : write_semantic_yaml # --------------------------------------------------------------------------- class TestWriteSemanticYaml: def test_writes_valid_yaml(self, tmp_path, fake_omniparser_ok): analyzer = P.Phase25Analyzer(session_id="yaml1", omniparser=fake_omniparser_ok) result = P.Phase25Result( session_id="yaml1", generated_at="2026-06-01T18:30:00Z", omniparser_available=True, degraded=False, too_complex=False, screens=[ P.ScreenAnalysis( index=42, phash="abc123", screen_id="screen_042", screenshot_path="/tmp/shot.png", structure=P.SemanticStructure( buttons=[{"label": "OK", "bbox": [0, 0, 10, 10], "confidence": 0.9}], ), ), ], ) target = analyzer.write_semantic_yaml( result, slug="ma_competence", target_dir=tmp_path, ) assert target.exists() data = yaml.safe_load(target.read_text(encoding="utf-8")) assert data["competence_id"] == "ma_competence" assert data["semantic_version"] == 1 assert data["degraded"] is False assert len(data["screens"]) == 1 assert data["screens"][0]["structure"]["buttons"][0]["label"] == "OK" def test_degraded_yaml_is_valid(self, tmp_path, fake_omniparser_raising): analyzer = P.Phase25Analyzer(session_id="yaml2", omniparser=fake_omniparser_raising) result = P.Phase25Result( session_id="yaml2", generated_at="2026-06-01T18:30:00Z", omniparser_available=False, degraded=True, too_complex=False, screens=[ P.ScreenAnalysis( index=0, phash="00", screen_id="screen_000", screenshot_path=None, structure=P.SemanticStructure(), degraded=True, degraded_reason="omniparser_exception: RuntimeError", ), ], ) target = analyzer.write_semantic_yaml(result, slug="fallback_comp", target_dir=tmp_path) data = yaml.safe_load(target.read_text(encoding="utf-8")) assert data["degraded"] is True assert data["screens"][0]["degraded"] is True assert "omniparser_exception" in data["screens"][0]["degraded_reason"] def test_invalid_slug_raises(self, tmp_path, fake_omniparser_ok): analyzer = P.Phase25Analyzer(session_id="yaml3", omniparser=fake_omniparser_ok) result = P.Phase25Result( session_id="yaml3", generated_at="x", omniparser_available=True, degraded=False, too_complex=False, screens=[], ) with pytest.raises(ValueError): analyzer.write_semantic_yaml(result, slug="../etc/passwd", target_dir=tmp_path) def test_forbidden_target_dir(self, tmp_path, fake_omniparser_ok): analyzer = P.Phase25Analyzer(session_id="yaml4", omniparser=fake_omniparser_ok) result = P.Phase25Result( session_id="yaml4", generated_at="x", omniparser_available=True, degraded=False, too_complex=False, screens=[], ) # Anti écriture dans supervised/stable. forbidden = tmp_path / "supervised" forbidden.mkdir() with pytest.raises(ValueError): analyzer.write_semantic_yaml(result, slug="abc_def", target_dir=forbidden) # --------------------------------------------------------------------------- # Tests : contrat snapshots (elements aplatis) # --------------------------------------------------------------------------- class TestSnapshotContract: def test_screen_to_dict_includes_elements(self, fake_omniparser_ok): s = P.ScreenAnalysis( index=1, phash="aa", screen_id="screen_001", screenshot_path="/tmp/s.png", structure=P.SemanticStructure( buttons=[{"label": "Valider", "bbox": [0, 0, 50, 20], "confidence": 0.9}], forms=[{"label": "Nom", "bbox": [60, 0, 200, 20], "confidence": 0.8}], text_blocks=[{"label": "Hello", "text": "Hello", "bbox": [0, 30, 100, 50], "confidence": 0.6}], ), window_title="Easily Assure", ) d = s.to_dict() assert "elements" in d assert any(e["kind"] == "button" and e["label"] == "Valider" for e in d["elements"]) assert any(e["kind"] == "field" and e["label"] == "Nom" for e in d["elements"]) assert any(e["kind"] == "text_block" for e in d["elements"]) assert d["window_title"] == "Easily Assure"