feat(p1): persist workflows and semantic learning artifacts
This commit is contained in:
234
tests/unit/test_competence_persist.py
Normal file
234
tests/unit/test_competence_persist.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""Tests unit pour core.competences.persist (helpers /persist endpoint).
|
||||
|
||||
Specs : docs/POC/SPECS_ENDPOINT_PERSIST_2026-06-01.md
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
_ROOT = str(Path(__file__).resolve().parents[2])
|
||||
if _ROOT not in sys.path:
|
||||
sys.path.insert(0, _ROOT)
|
||||
|
||||
from core.competences import persist as P # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# slugify
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSlugify:
|
||||
def test_slug_generation_normal(self):
|
||||
assert P.slugify("Saisir Texte Word") == "saisir_texte_word"
|
||||
|
||||
def test_slug_generation_with_accents(self):
|
||||
assert P.slugify("Créer Compte Patient") == "creer_compte_patient"
|
||||
|
||||
def test_slug_generation_too_short(self):
|
||||
with pytest.raises(ValueError):
|
||||
P.slugify("ab")
|
||||
|
||||
def test_slug_generation_empty(self):
|
||||
with pytest.raises(ValueError):
|
||||
P.slugify("")
|
||||
|
||||
def test_slug_max_80_chars(self):
|
||||
long_name = "a" * 200
|
||||
slug = P.slugify(long_name)
|
||||
assert len(slug) <= 80
|
||||
|
||||
def test_slug_strips_special_chars(self):
|
||||
# Cas tordu : "tab" est interdit ('\t'), donc on injecte du bruit
|
||||
assert P.slugify("hello!! world??") == "hello_world"
|
||||
|
||||
def test_slug_starts_with_letter(self):
|
||||
slug = P.slugify("123 abc def")
|
||||
assert slug.startswith("c_") # prefix auto pour commencer par lettre
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PII detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPiiDetection:
|
||||
def test_pii_email_detected(self):
|
||||
matches = P.detect_pii({"intent": "envoyer mail a john.doe@example.com"})
|
||||
assert matches # au moins un pattern
|
||||
|
||||
def test_pii_phone_detected(self):
|
||||
matches = P.detect_pii({"steps": [{"value": "tel 01 23 45 67 89"}]})
|
||||
assert matches
|
||||
|
||||
def test_no_pii_clean_payload(self):
|
||||
clean = {"steps": [{"kind": "click", "target": "Bouton Valider"}]}
|
||||
assert P.detect_pii(clean) == []
|
||||
|
||||
def test_pii_recursive_in_nested_list(self):
|
||||
nested = {"a": {"b": [{"c": "email: x@y.fr"}]}}
|
||||
assert P.detect_pii(nested)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Atomic write
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAtomicWrite:
|
||||
def test_atomic_write_then_rename(self, tmp_path):
|
||||
target = tmp_path / "demo.yaml"
|
||||
data = {"id": "demo", "name": "Demo"}
|
||||
result = P.atomic_write_yaml(target, data, persist_id="pid-1")
|
||||
assert result == target
|
||||
assert target.exists()
|
||||
# Pas de .tmp residuel
|
||||
leftovers = list(tmp_path.glob(".*.tmp.*"))
|
||||
assert leftovers == []
|
||||
loaded = yaml.safe_load(target.read_text(encoding="utf-8"))
|
||||
assert loaded["id"] == "demo"
|
||||
|
||||
def test_atomic_write_cleans_tmp_on_failure(self, tmp_path, monkeypatch):
|
||||
target = tmp_path / "demo.yaml"
|
||||
|
||||
# Forcer un echec sur os.rename
|
||||
import os as _os
|
||||
original_rename = _os.rename
|
||||
|
||||
def boom(*a, **k):
|
||||
raise OSError("disk full simulated")
|
||||
|
||||
monkeypatch.setattr(_os, "rename", boom)
|
||||
with pytest.raises(OSError):
|
||||
P.atomic_write_yaml(target, {"id": "demo"}, persist_id="pid-2")
|
||||
monkeypatch.setattr(_os, "rename", original_rename)
|
||||
|
||||
# Le .tmp doit avoir ete nettoye
|
||||
leftovers = list(tmp_path.glob(".*.tmp.*"))
|
||||
assert leftovers == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Audit append
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAuditAppend:
|
||||
def test_audit_append_monotonic_ids(self, tmp_path):
|
||||
audit = tmp_path / "persist_audit.jsonl"
|
||||
id1 = P.audit_append({"persist_id": "p1", "competence_id": "c1"}, audit_path=audit)
|
||||
id2 = P.audit_append({"persist_id": "p2", "competence_id": "c2"}, audit_path=audit)
|
||||
assert id1 == 1
|
||||
assert id2 == 2
|
||||
|
||||
def test_audit_append_includes_timestamp(self, tmp_path):
|
||||
audit = tmp_path / "audit.jsonl"
|
||||
P.audit_append({"persist_id": "p1", "competence_id": "c1"}, audit_path=audit)
|
||||
lines = audit.read_text(encoding="utf-8").strip().splitlines()
|
||||
record = json.loads(lines[0])
|
||||
assert "timestamp" in record
|
||||
assert record["audit_entry_id"] == 1
|
||||
|
||||
def test_find_existing_audit_entry(self, tmp_path):
|
||||
audit = tmp_path / "audit.jsonl"
|
||||
P.audit_append(
|
||||
{"persist_id": "p-uniq", "competence_id": "c1"},
|
||||
audit_path=audit,
|
||||
)
|
||||
found = P.find_existing_audit_entry("p-uniq", audit_path=audit)
|
||||
assert found is not None
|
||||
assert found["competence_id"] == "c1"
|
||||
assert P.find_existing_audit_entry("p-not-here", audit_path=audit) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# YAML schema build + validate
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBuildYaml:
|
||||
def test_yaml_schema_required_fields_present(self):
|
||||
body = P.build_competence_yaml(
|
||||
slug="demo_test",
|
||||
name="Demo Test",
|
||||
workflow_ir={"steps": [{"kind": "click"}], "preconditions": []},
|
||||
parameters=[{"name": "x", "type": "string", "required": True}],
|
||||
intent_fr="faire demo",
|
||||
learning_state="candidate",
|
||||
session_id="sess1",
|
||||
machine_id="machine1",
|
||||
)
|
||||
missing = P.validate_yaml_schema(body)
|
||||
assert missing == [], f"champs manquants : {missing}"
|
||||
|
||||
def test_payload_stable_forced_to_candidate_via_helper(self):
|
||||
# Le forcage stable -> candidate est fait dans le handler, mais on
|
||||
# peut au moins verifier que build accepte le learning_state passe.
|
||||
body = P.build_competence_yaml(
|
||||
slug="demo_test_2",
|
||||
name="Demo 2",
|
||||
workflow_ir={"steps": [{"kind": "click"}]},
|
||||
parameters=None,
|
||||
intent_fr="demo",
|
||||
learning_state="candidate",
|
||||
session_id=None,
|
||||
machine_id=None,
|
||||
)
|
||||
assert body["learning_state"] == "candidate"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cross-state collision
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCrossStateCollision:
|
||||
def test_no_collision_returns_none(self, tmp_path):
|
||||
root = tmp_path / "competences"
|
||||
(root / "candidate").mkdir(parents=True)
|
||||
assert P.detect_cross_state_collision("xyz", competences_root=root) is None
|
||||
|
||||
def test_collision_in_candidate_returns_dirname(self, tmp_path):
|
||||
root = tmp_path / "competences"
|
||||
(root / "candidate").mkdir(parents=True)
|
||||
(root / "candidate" / "xyz.yaml").write_text("id: xyz\n", encoding="utf-8")
|
||||
assert P.detect_cross_state_collision("xyz", competences_root=root) == "candidate"
|
||||
|
||||
def test_collision_in_stable_returns_dirname(self, tmp_path):
|
||||
root = tmp_path / "competences"
|
||||
(root / "stable").mkdir(parents=True)
|
||||
(root / "stable" / "abc.yaml").write_text("id: abc\n", encoding="utf-8")
|
||||
assert P.detect_cross_state_collision("abc", competences_root=root) == "stable"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rate limiter
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRateLimiter:
|
||||
def test_below_limit_allowed(self):
|
||||
lim = P.PersistRateLimiter(max_per_minute=3)
|
||||
for _ in range(3):
|
||||
allowed, _ = lim.allow("m1")
|
||||
assert allowed
|
||||
|
||||
def test_above_limit_blocked(self):
|
||||
lim = P.PersistRateLimiter(max_per_minute=2)
|
||||
lim.allow("m1")
|
||||
lim.allow("m1")
|
||||
allowed, retry = lim.allow("m1")
|
||||
assert not allowed
|
||||
assert retry >= 1
|
||||
|
||||
def test_per_machine_isolation(self):
|
||||
lim = P.PersistRateLimiter(max_per_minute=1)
|
||||
a1, _ = lim.allow("m1")
|
||||
a2, _ = lim.allow("m2")
|
||||
assert a1 and a2
|
||||
Reference in New Issue
Block a user