"""Tests integration HTTP — cycle complet Lea-first jusqu'au /persist. Specs : docs/POC/SPECS_ENDPOINT_PERSIST_2026-06-01.md Le focus est sur l'endpoint /persist : on mocke les phases amont (Shadow build) en construisant directement le workflow_ir. Pas de lancement reel de session Shadow ici. """ from __future__ import annotations import json import sys from pathlib import Path import pytest _ROOT = str(Path(__file__).resolve().parents[2]) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) pytestmark = pytest.mark.integration _TEST_API_TOKEN = "test_persist_endpoint_token_xyz" @pytest.fixture def persist_client(monkeypatch, tmp_path): """TestClient FastAPI + redirection des chemins persist vers tmp_path.""" monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN) monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False) # DB agents isolee pour eviter le guard fleet monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "agents.db")) from fastapi.testclient import TestClient from agent_v0.server_v1 import api_stream from agent_v0.server_v1.agent_registry import AgentRegistry from core.competences import persist as P monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN) # api_stream peut deja etre importe par un autre test avant que # RPA_AGENTS_DB_PATH soit pose. Isoler explicitement le registre global. test_registry = AgentRegistry(db_path=str(tmp_path / "agents.db")) monkeypatch.setattr(api_stream, "agent_registry", test_registry) # Rediriger toutes les ecritures vers tmp_path candidate_dir = tmp_path / "competences" / "candidate" candidate_dir.mkdir(parents=True, exist_ok=True) audit_path = tmp_path / "competences" / "persist_audit.jsonl" incomplete_path = tmp_path / "competences" / "incomplete_learnings.jsonl" monkeypatch.setattr(P, "COMPETENCES_ROOT", tmp_path / "competences") monkeypatch.setattr(P, "CANDIDATE_DIR", candidate_dir) monkeypatch.setattr(P, "AUDIT_PATH", audit_path) monkeypatch.setattr(P, "INCOMPLETE_PATH", incomplete_path) # Reset du rate limiter pour eviter les fuites entre tests P.persist_rate_limiter.reset() client = TestClient(api_stream.app, raise_server_exceptions=False) return client, tmp_path def _auth_headers(): return {"Authorization": f"Bearer {_TEST_API_TOKEN}"} def _minimal_workflow_ir(): return { "steps": [ { "kind": "click", "primitive_ref": "click", "parameters": {"target": "Bouton OK"}, "description": "Clic sur Bouton OK", } ], "preconditions": [], "success_marker": { "mode": "all_of", "timeout_ms": 5000, "markers": [], }, } class TestPersistEndpointSuccess: """Cas nominal : payload valide -> 201 + YAML cree + audit ecrit.""" def test_persist_returns_201_and_writes_yaml(self, persist_client): client, tmp_path = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ "name": "Test Cycle Complet", "machine_id": "machine_test_x", "session_id": "sess_xyz", "workflow_ir": _minimal_workflow_ir(), "parameters": [], "annotations_semantiques": {"intent_fr": "tester cycle"}, "learning_metadata": {"persist_id": "uuid-cycle-1"}, }, headers=_auth_headers(), ) assert resp.status_code == 201, resp.text body = resp.json() assert body["competence_id"] == "test_cycle_complet" assert body["learning_state"] == "candidate" assert body["persist_id"] == "uuid-cycle-1" assert body["audit_entry_id"] >= 1 # Le YAML doit etre present sur disque (chemin redirige par fixture) yaml_file = tmp_path / "competences" / "candidate" / "test_cycle_complet.yaml" assert yaml_file.exists() # Audit enrichi audit = tmp_path / "competences" / "persist_audit.jsonl" assert audit.exists() lines = audit.read_text(encoding="utf-8").strip().splitlines() assert any(json.loads(li).get("persist_id") == "uuid-cycle-1" for li in lines) def test_persist_idempotence_returns_previous(self, persist_client): client, _ = persist_client payload = { "name": "Idempotent Test", "machine_id": "machine_test_x", "workflow_ir": _minimal_workflow_ir(), "learning_metadata": {"persist_id": "uuid-idem-1"}, } r1 = client.post( "/api/v1/lea/competences/candidate/persist", json=payload, headers=_auth_headers(), ) assert r1.status_code == 201 r2 = client.post( "/api/v1/lea/competences/candidate/persist", json=payload, headers=_auth_headers(), ) # Idempotent : 200 ou 201 avec idempotent_replay=True assert r2.status_code in (200, 201) body2 = r2.json() assert body2.get("idempotent_replay") is True assert body2["competence_id"] == r1.json()["competence_id"] class TestPersistEndpointEdgeCases: """Cas particuliers documentes dans specs §5.""" def test_partial_true_force_incomplete_state(self, persist_client): client, tmp_path = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ "name": "Partial Demo", "machine_id": "machine_test_x", "workflow_ir": {"steps": []}, # vide accepte si partial "learning_metadata": { "persist_id": "uuid-partial-1", "partial": True, "dropout_reason": "utilisateur a abandonne", }, }, headers=_auth_headers(), ) assert resp.status_code == 201, resp.text assert resp.json()["learning_state"] == "incomplete" # Double entree : audit principal + incomplete incomplete = tmp_path / "competences" / "incomplete_learnings.jsonl" assert incomplete.exists() def test_empty_workflow_without_partial_returns_400(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ "name": "Empty Demo", "machine_id": "machine_test_x", "workflow_ir": {"steps": []}, "learning_metadata": {"persist_id": "uuid-empty-1"}, }, headers=_auth_headers(), ) assert resp.status_code == 400 assert resp.json()["detail"]["error"] == "empty_workflow_ir" def test_partial_without_dropout_reason_returns_400(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ "name": "Partial Sans Raison", "machine_id": "machine_test_x", "workflow_ir": {"steps": []}, "learning_metadata": { "persist_id": "uuid-partial-noreason", "partial": True, }, }, headers=_auth_headers(), ) assert resp.status_code == 400 assert resp.json()["detail"]["error"] == "dropout_reason_required" def test_stable_request_forced_to_candidate(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ "name": "Force Candidate Demo", "machine_id": "machine_test_x", "workflow_ir": _minimal_workflow_ir(), "learning_metadata": { "persist_id": "uuid-stable-attempt", "learning_state": "stable", }, }, headers=_auth_headers(), ) assert resp.status_code == 201 # Regle d'or HDS : jamais stable par persist direct assert resp.json()["learning_state"] == "candidate" def test_slug_collision_returns_409(self, persist_client): client, tmp_path = persist_client # Pre-creer un YAML candidate pour declencher la collision candidate = tmp_path / "competences" / "candidate" / "collision_demo.yaml" candidate.write_text("id: collision_demo\nname: x\n", encoding="utf-8") resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ "name": "Collision Demo", "machine_id": "machine_test_x", "workflow_ir": _minimal_workflow_ir(), "learning_metadata": {"persist_id": "uuid-coll-1"}, }, headers=_auth_headers(), ) assert resp.status_code == 409 assert resp.json()["detail"]["error"] == "slug_collision"