"""Helpers de persistance pour les competences candidates (POC Lea-first). Couvre : - slugification stricte (ASCII, regex ^[a-z][a-z0-9_]{2,79}$) - detection PII (regex MVP, paramétrable) - atomic write + rename POSIX - append-only audit JSONL avec verrou fcntl - detection de collision cross-states (candidate / supervised / stable) Le module est volontairement minimal : il n'importe pas FastAPI ni le pipeline VWB, il ne fait pas de logique reseau. Il est consomme depuis ``agent_v0/server_v1/api_stream.py`` endpoint ``/persist``. """ from __future__ import annotations import json import os import re import time import unicodedata import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable, Optional try: # pragma: no cover - dependance externe deja presente dans le projet import yaml except ImportError as exc: # pragma: no cover raise RuntimeError("PyYAML est requis pour core.competences.persist") from exc try: import fcntl # POSIX uniquement _HAS_FCNTL = True except ImportError: # pragma: no cover - Windows fcntl = None # type: ignore[assignment] _HAS_FCNTL = False REPO_ROOT = Path(__file__).resolve().parents[2] COMPETENCES_ROOT = REPO_ROOT / "data" / "competences" CANDIDATE_DIR = COMPETENCES_ROOT / "candidate" SUPERVISED_DIR = COMPETENCES_ROOT / "supervised" STABLE_DIR = COMPETENCES_ROOT / "stable" AUDIT_PATH = COMPETENCES_ROOT / "persist_audit.jsonl" INCOMPLETE_PATH = COMPETENCES_ROOT / "incomplete_learnings.jsonl" # Pattern final autorise pour un slug de competence. SLUG_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,79}$") # Detection PII MVP — regex parametrable via env RPA_PII_PATTERNS # (separes par |). Defaut : couvre patterns simples (IPP, NIR, email, tel FR). _DEFAULT_PII_PATTERNS = [ r"\b\d{13}\b", # NIR FR (13 chiffres) r"\b\d{15}\b", # NIR FR + cle r"\bIPP[\s:_-]*\d{6,}\b", # IPP hospitalier r"[\w\.-]+@[\w\.-]+\.\w{2,}", # email r"\b0[1-9](?:[ .-]?\d{2}){4}\b", # telephone FR ] def _compile_pii_patterns() -> list[re.Pattern[str]]: raw = os.environ.get("RPA_PII_PATTERNS") patterns = raw.split("|") if raw else _DEFAULT_PII_PATTERNS compiled: list[re.Pattern[str]] = [] for pat in patterns: pat = pat.strip() if not pat: continue try: compiled.append(re.compile(pat, re.IGNORECASE)) except re.error: continue return compiled # ---------------------------------------------------------------------------- # Slugification # ---------------------------------------------------------------------------- def slugify(name: str) -> str: """Convertir un nom libre en slug ASCII strict. Regle : - translitteration NFKD (suppression accents) - lowercase, espaces / tirets / points -> '_' - chars hors [a-z0-9_] retires - underscores multiples reduits a 1 - troncature a 80 chars max - doit matcher SLUG_PATTERN Leve ValueError si le slug final ne matche pas le pattern. """ if not isinstance(name, str): raise ValueError("name doit etre une chaine non vide") raw = name.strip() if not raw: raise ValueError("name est vide") # NFKD pour decomposer les accents puis suppression des combinaisons normalized = unicodedata.normalize("NFKD", raw) ascii_only = normalized.encode("ascii", "ignore").decode("ascii") # Espaces / tirets / points / slashes -> underscore cleaned = re.sub(r"[\s\-./\\]+", "_", ascii_only.lower()) # Tout ce qui n'est pas [a-z0-9_] -> supprime cleaned = re.sub(r"[^a-z0-9_]+", "", cleaned) # Reduire underscores multiples cleaned = re.sub(r"_+", "_", cleaned).strip("_") # Forcer commencement par une lettre (si commence par chiffre, prefixer) if cleaned and cleaned[0].isdigit(): cleaned = f"c_{cleaned}" # Tronquer if len(cleaned) > 80: cleaned = cleaned[:80].rstrip("_") if not SLUG_PATTERN.match(cleaned): raise ValueError( f"slug invalide '{cleaned}' (regle : {SLUG_PATTERN.pattern})" ) return cleaned # ---------------------------------------------------------------------------- # Collisions cross-states # ---------------------------------------------------------------------------- def detect_cross_state_collision( slug: str, *, competences_root: Path = COMPETENCES_ROOT, ) -> Optional[str]: """Retourne le sous-dossier ou un YAML .yaml existe deja, sinon None. Verifie candidate/, supervised/, stable/. """ for sub in ("candidate", "supervised", "stable"): target = competences_root / sub / f"{slug}.yaml" if target.exists(): return sub return None # ---------------------------------------------------------------------------- # Detection PII # ---------------------------------------------------------------------------- def detect_pii(payload: Any) -> list[str]: """Parcourt recursivement un payload (dict/list/str) et retourne la liste des patterns PII matches. Liste vide = pas de PII detecte. L'appelant decide quoi en faire (HTTP 400 + log non-sensible). """ matches: list[str] = [] patterns = _compile_pii_patterns() if not patterns: return matches def _walk(node: Any) -> None: if isinstance(node, str): for pat in patterns: if pat.search(node): matches.append(pat.pattern) elif isinstance(node, dict): for v in node.values(): _walk(v) elif isinstance(node, (list, tuple)): for v in node: _walk(v) _walk(payload) # dedoublonner en preservant l'ordre seen = set() out: list[str] = [] for p in matches: if p not in seen: seen.add(p) out.append(p) return out # ---------------------------------------------------------------------------- # Atomic write # ---------------------------------------------------------------------------- def atomic_write_yaml( target_path: Path, data: dict[str, Any], *, persist_id: str, ) -> Path: """Ecrire un dict en YAML de maniere atomique. 1. Ecrit dans /..tmp. 2. os.rename vers target_path (POSIX atomic) 3. En cas d'echec, supprime le .tmp si possible. Retourne le chemin final (target_path). """ target_path = Path(target_path) target_dir = target_path.parent target_dir.mkdir(parents=True, exist_ok=True) tmp_name = f".{target_path.name}.tmp.{persist_id}" tmp_path = target_dir / tmp_name try: with tmp_path.open("w", encoding="utf-8") as handle: yaml.safe_dump( data, handle, allow_unicode=True, sort_keys=False, default_flow_style=False, ) handle.flush() try: os.fsync(handle.fileno()) except OSError: pass # rename atomique (POSIX). Echoue si target existe deja sur Windows, # mais Linux (POSIX) ecrase silencieusement. On a verifie la collision # avant l'appel. os.rename(tmp_path, target_path) except Exception: if tmp_path.exists(): try: tmp_path.unlink() except OSError: pass raise return target_path # ---------------------------------------------------------------------------- # Audit append (JSONL + verrou) # ---------------------------------------------------------------------------- def audit_append( entry: dict[str, Any], *, audit_path: Path = AUDIT_PATH, ) -> int: """Append une ligne JSON dans le fichier audit, retourne audit_entry_id. L'audit_entry_id est un compteur monotone derive du nombre de lignes avant l'append. La concurrence est serialisee via fcntl.flock (POSIX). Sur les systemes sans fcntl (Windows), l'ecriture est best-effort. """ audit_path = Path(audit_path) audit_path.parent.mkdir(parents=True, exist_ok=True) if "timestamp" not in entry: entry["timestamp"] = ( datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") ) # Open en append + lecture pour compter les lignes existantes (audit_entry_id). flags = "a+" with open(audit_path, flags, encoding="utf-8") as handle: if _HAS_FCNTL: try: fcntl.flock(handle.fileno(), fcntl.LOCK_EX) # type: ignore[union-attr] except OSError: pass try: handle.seek(0) line_count = sum(1 for _ in handle) audit_entry_id = line_count + 1 entry["audit_entry_id"] = audit_entry_id handle.write(json.dumps(entry, ensure_ascii=False) + "\n") handle.flush() try: os.fsync(handle.fileno()) except OSError: pass finally: if _HAS_FCNTL: try: fcntl.flock(handle.fileno(), fcntl.LOCK_UN) # type: ignore[union-attr] except OSError: pass return audit_entry_id def find_existing_audit_entry( persist_id: str, *, audit_path: Path = AUDIT_PATH, ) -> Optional[dict[str, Any]]: """Recherche une entree existante par persist_id pour l'idempotence.""" if not persist_id: return None audit_path = Path(audit_path) if not audit_path.exists(): return None try: with audit_path.open("r", encoding="utf-8") as handle: for line in handle: line = line.strip() if not line: continue try: record = json.loads(line) except json.JSONDecodeError: continue if record.get("persist_id") == persist_id: return record except OSError: return None return None # ---------------------------------------------------------------------------- # YAML body construction # ---------------------------------------------------------------------------- REQUIRED_YAML_FIELDS = ( "schema_version", "id", "name", "version", "learning_state", "intent", "parameters", "preconditions", "methods", "success_marker", "failure_message_template", "promotion", "generalisation", "failure_log", "created_at", "last_updated_at", "methods_execution", ) def build_competence_yaml( *, slug: str, name: str, workflow_ir: dict[str, Any], parameters: Optional[list[dict[str, Any]]], intent_fr: str, learning_state: str, session_id: Optional[str], machine_id: Optional[str], external_agent_id: Optional[str] = None, ) -> dict[str, Any]: """Construit le dict YAML conforme au schema de reference. Aligne sur ``data/competences/candidate/key_win_r_wait_explorer_exe.yaml``. """ now_iso = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") steps = list(workflow_ir.get("steps") or []) preconditions = list(workflow_ir.get("preconditions") or []) success_marker = workflow_ir.get("success_marker") or { "mode": "all_of", "timeout_ms": 5000, "markers": [], } methods: list[dict[str, Any]] = [] for idx, step in enumerate(steps, start=1): if not isinstance(step, dict): continue method = dict(step) method.setdefault("id", f"step_{idx}_{step.get('kind') or 'action'}") if "primitive_ref" not in method and method.get("kind"): method["primitive_ref"] = method["kind"] method.setdefault("observed", False) methods.append(method) params_dict: dict[str, Any] = {} for p in (parameters or []): if isinstance(p, dict) and p.get("name"): params_dict[str(p["name"])] = { "type": p.get("type", "string"), "required": bool(p.get("required", False)), "description": p.get("description", ""), } yaml_body: dict[str, Any] = { "schema_version": 1, "id": slug, "name": name, "version": 1, "learning_state": learning_state, "intent": {"fr": intent_fr or name}, "parameters": params_dict, "preconditions": preconditions, "methods": methods, "success_marker": success_marker, "failure_message_template": workflow_ir.get("failure_message_template") or { "intention": intent_fr or name, "attendu": "", "vu": "{observed_human_state}", "demande": "indiquer la correction attendue", }, "promotion": { "history": [ { "at": now_iso, "from": "observed", "to": learning_state, "by": "lea_persist_endpoint", "reason": "persisted via /api/v1/lea/competences/candidate/persist", } ], "candidate_requires": [ "method_trace_present", "success_marker_defined", "failure_message_template_valid", ], "supervised_requires": ["replay_verified_once", "human_validation"], "stable_requires": { "min_successes": 3, "distinct_contexts": 3, "max_unexplained_failures": 0, }, "t2_known_gaps": [], }, "generalisation": { "seen_contexts": [], "method_success_rate": {}, "variance_log": [], }, "failure_log": [], "created_at": now_iso, "last_updated_at": now_iso, "methods_execution": "sequence", } if session_id or machine_id or external_agent_id: yaml_body["chain_refs"] = { "source_session": session_id, "machine_id": machine_id, "external_agent_id": external_agent_id, } return yaml_body def validate_yaml_schema(data: dict[str, Any]) -> list[str]: """Verifie la presence des champs obligatoires. Retourne la liste des manquants.""" return [field for field in REQUIRED_YAML_FIELDS if field not in data] # ---------------------------------------------------------------------------- # Rate limit token-bucket simple (en memoire, par machine_id) # ---------------------------------------------------------------------------- class PersistRateLimiter: """Token-bucket minimal pour /persist. Par defaut : 10 requetes / minute / machine_id (cf. specs §6). Instance unique attendue ; thread-safe via lock minimal. """ def __init__(self, *, max_per_minute: int = 10, window_seconds: int = 60) -> None: self.max_per_minute = max_per_minute self.window_seconds = window_seconds self._timestamps: dict[str, list[float]] = {} def allow(self, machine_id: str) -> tuple[bool, int]: """Renvoie (allowed, retry_after_seconds). retry_after_seconds = 0 si autorise. """ if not machine_id: return True, 0 now = time.time() bucket = self._timestamps.setdefault(machine_id, []) # Purger les entrees hors fenetre bucket[:] = [ts for ts in bucket if now - ts < self.window_seconds] if len(bucket) >= self.max_per_minute: oldest = bucket[0] retry_after = max(1, int(self.window_seconds - (now - oldest))) return False, retry_after bucket.append(now) return True, 0 def reset(self, machine_id: Optional[str] = None) -> None: if machine_id is None: self._timestamps.clear() else: self._timestamps.pop(machine_id, None) # Instance partagee importable depuis api_stream persist_rate_limiter = PersistRateLimiter() __all__ = [ "SLUG_PATTERN", "COMPETENCES_ROOT", "CANDIDATE_DIR", "AUDIT_PATH", "INCOMPLETE_PATH", "REQUIRED_YAML_FIELDS", "slugify", "detect_cross_state_collision", "detect_pii", "atomic_write_yaml", "audit_append", "find_existing_audit_entry", "build_competence_yaml", "validate_yaml_schema", "PersistRateLimiter", "persist_rate_limiter", ]