feat(lea): add dashboard competence promotion dry run
This commit is contained in:
@@ -15,15 +15,25 @@ from .verdicts import (
|
||||
iter_competence_verdicts,
|
||||
store_competence_verdict,
|
||||
)
|
||||
from .promotions import (
|
||||
CompetencePromotionError,
|
||||
iter_competence_promotions,
|
||||
promote_competence_from_verdicts,
|
||||
summarize_competence_promotions,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"CompetenceSummary",
|
||||
"CompetencePromotionError",
|
||||
"CompetenceVerdictError",
|
||||
"build_competence_replay_actions",
|
||||
"build_competence_replay_payload",
|
||||
"find_competence",
|
||||
"iter_competence_promotions",
|
||||
"iter_competence_verdicts",
|
||||
"load_competence_catalog_actions",
|
||||
"load_competences",
|
||||
"promote_competence_from_verdicts",
|
||||
"summarize_competence_promotions",
|
||||
"store_competence_verdict",
|
||||
]
|
||||
|
||||
666
core/competences/promotions.py
Normal file
666
core/competences/promotions.py
Normal file
@@ -0,0 +1,666 @@
|
||||
"""Promote Lea competences from supervised verdict evidence."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import difflib
|
||||
import hashlib
|
||||
import json
|
||||
import shutil
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
from .catalog import (
|
||||
DEFAULT_COMPETENCE_ROOT,
|
||||
KNOWN_STATES,
|
||||
REPO_ROOT,
|
||||
load_competence_file,
|
||||
)
|
||||
from .replay import find_competence
|
||||
from .verdicts import DEFAULT_VERDICT_LOG, iter_competence_verdicts
|
||||
|
||||
|
||||
DEFAULT_PROMOTION_LOG = REPO_ROOT / "data" / "competences" / "promotions.jsonl"
|
||||
PROMOTION_SCHEMA_VERSION = "lea_competence_promotion.v1"
|
||||
PROMOTABLE_STATES = {"candidate", "stable"}
|
||||
|
||||
|
||||
class CompetencePromotionError(ValueError):
|
||||
"""Raised when a competence promotion request is invalid."""
|
||||
|
||||
|
||||
def promote_competence_from_verdicts(
|
||||
competence_id: str,
|
||||
payload: Dict[str, Any],
|
||||
*,
|
||||
competence_root: Path | str = DEFAULT_COMPETENCE_ROOT,
|
||||
verdict_log_path: Path | str = DEFAULT_VERDICT_LOG,
|
||||
promotion_log_path: Path | str = DEFAULT_PROMOTION_LOG,
|
||||
states: Optional[Iterable[str]] = None,
|
||||
now: Optional[datetime] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Dry-run or apply a dashboard-controlled competence promotion.
|
||||
|
||||
``dry_run=True`` never writes. A real write requires the exact
|
||||
``dry_run_token`` returned by a prior dry-run for the same evidence.
|
||||
"""
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
raise CompetencePromotionError("Payload promotion invalide")
|
||||
|
||||
dry_run = bool(payload.get("dry_run", True))
|
||||
promotion_id = _promotion_id(payload, dry_run=dry_run)
|
||||
target_state = _target_state(payload)
|
||||
confirmed_by = _text(payload.get("confirmed_by") or "human:dom", "confirmed_by")
|
||||
verdict_ids = _verdict_ids(payload.get("verdict_ids"))
|
||||
timestamp = _timestamp(now)
|
||||
|
||||
root = Path(competence_root)
|
||||
promotion_log = Path(promotion_log_path)
|
||||
|
||||
existing = _find_existing_promotion(promotion_id, log_path=promotion_log)
|
||||
if existing:
|
||||
duplicate = dict(existing)
|
||||
duplicate["duplicate"] = True
|
||||
duplicate["dry_run"] = dry_run
|
||||
return duplicate
|
||||
|
||||
plan = _build_promotion_plan(
|
||||
competence_id=competence_id,
|
||||
target_state=target_state,
|
||||
verdict_ids=verdict_ids,
|
||||
promotion_id=promotion_id,
|
||||
confirmed_by=confirmed_by,
|
||||
timestamp=timestamp,
|
||||
competence_root=root,
|
||||
verdict_log_path=verdict_log_path,
|
||||
states=states,
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
return {
|
||||
**plan,
|
||||
"dry_run": True,
|
||||
"write_applied": False,
|
||||
"duplicate": False,
|
||||
}
|
||||
|
||||
provided_token = _text(payload.get("dry_run_token"), "dry_run_token")
|
||||
if provided_token != plan["dry_run_token"]:
|
||||
raise CompetencePromotionError("dry_run_token invalide ou absent")
|
||||
if not plan["eligible"]:
|
||||
raise CompetencePromotionError(
|
||||
"Promotion refusee: " + "; ".join(plan["blocking_reasons"])
|
||||
)
|
||||
|
||||
record = {
|
||||
"schema_version": PROMOTION_SCHEMA_VERSION,
|
||||
"promotion_id": promotion_id,
|
||||
"competence_id": competence_id,
|
||||
"from_state": plan["from_state"],
|
||||
"to_state": target_state,
|
||||
"triggered_by": confirmed_by,
|
||||
"promoted_at": timestamp,
|
||||
"evidence_verdict_ids": verdict_ids,
|
||||
"evidence_summary": plan["evidence_summary"],
|
||||
"yaml_path_before": plan["yaml_path_before"],
|
||||
"yaml_path_after": plan["yaml_path_after"],
|
||||
"backup_path": "",
|
||||
"dry_run_token": plan["dry_run_token"],
|
||||
"write_back_enabled": True,
|
||||
"yaml_write": True,
|
||||
"duplicate": False,
|
||||
}
|
||||
|
||||
backup_path = _apply_yaml_plan(plan, root=root, timestamp=timestamp)
|
||||
record["backup_path"] = _relative_path(backup_path)
|
||||
_append_jsonl(promotion_log, record)
|
||||
|
||||
return {
|
||||
**plan,
|
||||
"dry_run": False,
|
||||
"write_applied": True,
|
||||
"promotion": record,
|
||||
"backup_path": record["backup_path"],
|
||||
"promotions_log_path": _relative_path(promotion_log),
|
||||
"duplicate": False,
|
||||
}
|
||||
|
||||
|
||||
def summarize_competence_promotions(
|
||||
*,
|
||||
competence_root: Path | str = DEFAULT_COMPETENCE_ROOT,
|
||||
verdict_log_path: Path | str = DEFAULT_VERDICT_LOG,
|
||||
states: Optional[Iterable[str]] = None,
|
||||
) -> list[Dict[str, Any]]:
|
||||
"""Return dashboard-safe promotion state for all known competences."""
|
||||
|
||||
root = Path(competence_root)
|
||||
summaries: list[Dict[str, Any]] = []
|
||||
for state in KNOWN_STATES:
|
||||
if states and state not in set(states):
|
||||
continue
|
||||
state_dir = root / state
|
||||
if not state_dir.exists():
|
||||
continue
|
||||
for path in sorted(state_dir.glob("*.yaml")):
|
||||
competence = load_competence_file(path, repo_root=REPO_ROOT)
|
||||
verdicts = iter_competence_verdicts(
|
||||
log_path=verdict_log_path,
|
||||
competence_id=competence.id,
|
||||
)
|
||||
counts = _verdict_counts(verdicts)
|
||||
valid_ids = [
|
||||
str(verdict.get("verdict_id"))
|
||||
for verdict in verdicts
|
||||
if verdict.get("verdict_kind") == "valid" and verdict.get("verdict_id")
|
||||
]
|
||||
targets = {}
|
||||
for target in _available_targets(competence.learning_state):
|
||||
try:
|
||||
plan = _build_promotion_plan(
|
||||
competence_id=competence.id,
|
||||
target_state=target,
|
||||
verdict_ids=valid_ids,
|
||||
promotion_id=str(uuid.uuid4()),
|
||||
confirmed_by="dashboard:summary",
|
||||
timestamp=_timestamp(None),
|
||||
competence_root=root,
|
||||
verdict_log_path=verdict_log_path,
|
||||
states=states,
|
||||
)
|
||||
targets[target] = {
|
||||
"eligible": plan["eligible"],
|
||||
"blocking_reasons": plan["blocking_reasons"],
|
||||
"recommended_verdict_ids": valid_ids,
|
||||
}
|
||||
except (CompetencePromotionError, KeyError) as exc:
|
||||
targets[target] = {
|
||||
"eligible": False,
|
||||
"blocking_reasons": [str(exc)],
|
||||
"recommended_verdict_ids": valid_ids,
|
||||
}
|
||||
|
||||
summaries.append({
|
||||
"id": competence.id,
|
||||
"name": competence.name,
|
||||
"intent_fr": competence.intent_fr,
|
||||
"learning_state": competence.learning_state,
|
||||
"source_path": competence.source_path,
|
||||
"verdict_counts": counts,
|
||||
"distinct_contexts": len(_distinct_contexts([
|
||||
verdict for verdict in verdicts
|
||||
if verdict.get("verdict_kind") == "valid"
|
||||
])),
|
||||
"latest_verdict_at": _latest_verdict_at(verdicts),
|
||||
"eligible_targets": targets,
|
||||
"regression_suspected": _regression_suspected(verdicts),
|
||||
})
|
||||
|
||||
return sorted(summaries, key=lambda item: (item["learning_state"], item["id"]))
|
||||
|
||||
|
||||
def iter_competence_promotions(
|
||||
*,
|
||||
log_path: Path | str = DEFAULT_PROMOTION_LOG,
|
||||
competence_id: Optional[str] = None,
|
||||
) -> list[Dict[str, Any]]:
|
||||
log = Path(log_path)
|
||||
if not log.exists():
|
||||
return []
|
||||
|
||||
records: list[Dict[str, Any]] = []
|
||||
with log.open("r", encoding="utf-8") as handle:
|
||||
for line in handle:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
record = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if not isinstance(record, dict):
|
||||
continue
|
||||
if competence_id and record.get("competence_id") != competence_id:
|
||||
continue
|
||||
records.append(record)
|
||||
return records
|
||||
|
||||
|
||||
def _build_promotion_plan(
|
||||
*,
|
||||
competence_id: str,
|
||||
target_state: str,
|
||||
verdict_ids: list[str],
|
||||
promotion_id: str,
|
||||
confirmed_by: str,
|
||||
timestamp: str,
|
||||
competence_root: Path,
|
||||
verdict_log_path: Path | str,
|
||||
states: Optional[Iterable[str]],
|
||||
) -> Dict[str, Any]:
|
||||
competence = find_competence(competence_id, root=competence_root, states=states)
|
||||
if target_state == competence.learning_state:
|
||||
raise CompetencePromotionError("target_state identique a l'etat courant")
|
||||
if target_state not in _available_targets(competence.learning_state):
|
||||
raise CompetencePromotionError(
|
||||
f"Promotion {competence.learning_state} -> {target_state} interdite"
|
||||
)
|
||||
|
||||
source_path = _absolute_source_path(competence.source_path)
|
||||
data = _load_yaml_mapping(source_path)
|
||||
verdicts = _selected_verdicts(
|
||||
competence_id=competence_id,
|
||||
verdict_ids=verdict_ids,
|
||||
verdict_log_path=verdict_log_path,
|
||||
)
|
||||
|
||||
evidence_summary = _evidence_summary(verdicts)
|
||||
blocking_reasons = _blocking_reasons(
|
||||
current_state=competence.learning_state,
|
||||
target_state=target_state,
|
||||
verdicts=verdicts,
|
||||
all_verdicts=iter_competence_verdicts(
|
||||
log_path=verdict_log_path,
|
||||
competence_id=competence_id,
|
||||
),
|
||||
)
|
||||
eligible = not blocking_reasons
|
||||
|
||||
updated = _updated_yaml_data(
|
||||
data=data,
|
||||
competence_id=competence_id,
|
||||
current_state=competence.learning_state,
|
||||
target_state=target_state,
|
||||
verdicts=verdicts,
|
||||
promotion_id=promotion_id,
|
||||
confirmed_by=confirmed_by,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
current_text = source_path.read_text(encoding="utf-8")
|
||||
updated_text = yaml.safe_dump(
|
||||
updated,
|
||||
allow_unicode=True,
|
||||
sort_keys=False,
|
||||
default_flow_style=False,
|
||||
)
|
||||
target_path = competence_root / target_state / f"{competence_id}.yaml"
|
||||
yaml_diff = "\n".join(difflib.unified_diff(
|
||||
current_text.splitlines(),
|
||||
updated_text.splitlines(),
|
||||
fromfile=_relative_path(source_path),
|
||||
tofile=_relative_path(target_path),
|
||||
lineterm="",
|
||||
))
|
||||
dry_run_token = _dry_run_token(
|
||||
promotion_id=promotion_id,
|
||||
competence_id=competence_id,
|
||||
target_state=target_state,
|
||||
verdict_ids=verdict_ids,
|
||||
source_text=current_text,
|
||||
updated_text=updated_text,
|
||||
)
|
||||
|
||||
return {
|
||||
"schema_version": PROMOTION_SCHEMA_VERSION,
|
||||
"promotion_id": promotion_id,
|
||||
"competence_id": competence_id,
|
||||
"from_state": competence.learning_state,
|
||||
"to_state": target_state,
|
||||
"target_state": target_state,
|
||||
"confirmed_by": confirmed_by,
|
||||
"eligible": eligible,
|
||||
"blocking_reasons": blocking_reasons,
|
||||
"evidence_summary": evidence_summary,
|
||||
"verdict_ids": verdict_ids,
|
||||
"yaml_path_before": _relative_path(source_path),
|
||||
"yaml_path_after": _relative_path(target_path),
|
||||
"yaml_diff": yaml_diff,
|
||||
"dry_run_token": dry_run_token,
|
||||
"_source_path": source_path,
|
||||
"_target_path": target_path,
|
||||
"_updated_text": updated_text,
|
||||
}
|
||||
|
||||
|
||||
def _blocking_reasons(
|
||||
*,
|
||||
current_state: str,
|
||||
target_state: str,
|
||||
verdicts: list[Dict[str, Any]],
|
||||
all_verdicts: list[Dict[str, Any]],
|
||||
) -> list[str]:
|
||||
valid = [verdict for verdict in verdicts if verdict.get("verdict_kind") == "valid"]
|
||||
reasons: list[str] = []
|
||||
if len(valid) != len(verdicts):
|
||||
reasons.append("Tous les verdict_ids selectionnes doivent etre valid")
|
||||
if not valid:
|
||||
reasons.append("Au moins un verdict valid est requis")
|
||||
missing_evidence = [
|
||||
str(verdict.get("verdict_id"))
|
||||
for verdict in valid
|
||||
if not verdict.get("workflow_id") or not verdict.get("step_results")
|
||||
]
|
||||
if missing_evidence:
|
||||
reasons.append(
|
||||
"Evidence workflow_id/step_results manquante: "
|
||||
+ ", ".join(missing_evidence)
|
||||
)
|
||||
|
||||
if current_state == "candidate" and target_state == "stable":
|
||||
contexts = _distinct_contexts(valid)
|
||||
if len(valid) < 3:
|
||||
reasons.append(f"3 verdicts valid requis pour stable ({len(valid)}/3)")
|
||||
if len(contexts) < 3:
|
||||
reasons.append(f"3 contextes distincts requis pour stable ({len(contexts)}/3)")
|
||||
invalid_unexplained = [
|
||||
verdict for verdict in all_verdicts
|
||||
if verdict.get("verdict_kind") == "invalid" and not _is_explained(verdict)
|
||||
]
|
||||
if invalid_unexplained:
|
||||
reasons.append(
|
||||
"Invalid non explique present: "
|
||||
+ ", ".join(str(v.get("verdict_id")) for v in invalid_unexplained)
|
||||
)
|
||||
return reasons
|
||||
|
||||
|
||||
def _updated_yaml_data(
|
||||
*,
|
||||
data: Dict[str, Any],
|
||||
competence_id: str,
|
||||
current_state: str,
|
||||
target_state: str,
|
||||
verdicts: list[Dict[str, Any]],
|
||||
promotion_id: str,
|
||||
confirmed_by: str,
|
||||
timestamp: str,
|
||||
) -> Dict[str, Any]:
|
||||
updated = json.loads(json.dumps(data, ensure_ascii=False))
|
||||
updated["learning_state"] = target_state
|
||||
updated["last_updated_at"] = timestamp
|
||||
|
||||
promotion = updated.setdefault("promotion", {})
|
||||
history = promotion.setdefault("history", [])
|
||||
if isinstance(history, list):
|
||||
history.append({
|
||||
"at": timestamp,
|
||||
"from": current_state,
|
||||
"to": target_state,
|
||||
"by": confirmed_by,
|
||||
"reason": "Promotion dashboard supervisee par verdicts humains",
|
||||
"promotion_id": promotion_id,
|
||||
"evidence_verdict_ids": [
|
||||
verdict.get("verdict_id") for verdict in verdicts
|
||||
],
|
||||
})
|
||||
|
||||
generalisation = updated.setdefault("generalisation", {})
|
||||
seen_contexts = generalisation.setdefault("seen_contexts", [])
|
||||
if isinstance(seen_contexts, list):
|
||||
existing_ids = {
|
||||
context.get("verdict_id")
|
||||
for context in seen_contexts
|
||||
if isinstance(context, dict)
|
||||
}
|
||||
for verdict in verdicts:
|
||||
verdict_id = verdict.get("verdict_id")
|
||||
if verdict_id in existing_ids:
|
||||
continue
|
||||
context = verdict.get("context_signature") or {}
|
||||
seen_contexts.append({
|
||||
"at": timestamp,
|
||||
"verdict_id": verdict_id,
|
||||
"promotion_id": promotion_id,
|
||||
"machine_id": context.get("machine_id", ""),
|
||||
"workflow_id": verdict.get("workflow_id", ""),
|
||||
"screen_state_initial": context.get("screen_state_initial", ""),
|
||||
"screen_state_after_action": context.get("screen_state_after_action", ""),
|
||||
"verdict_at": verdict.get("verdict_at", ""),
|
||||
})
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
def _apply_yaml_plan(plan: Dict[str, Any], *, root: Path, timestamp: str) -> Path:
|
||||
source_path = Path(plan["_source_path"])
|
||||
target_path = Path(plan["_target_path"])
|
||||
updated_text = str(plan["_updated_text"])
|
||||
|
||||
backup_path = source_path.with_name(
|
||||
f"{source_path.name}.{timestamp.replace(':', '').replace('+', '_')}.bak"
|
||||
)
|
||||
shutil.copy2(source_path, backup_path)
|
||||
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
|
||||
tmp_path.write_text(updated_text, encoding="utf-8")
|
||||
|
||||
try:
|
||||
load_competence_file(tmp_path, repo_root=REPO_ROOT)
|
||||
tmp_path.replace(target_path)
|
||||
load_competence_file(target_path, repo_root=REPO_ROOT)
|
||||
if source_path != target_path and source_path.exists():
|
||||
source_path.unlink()
|
||||
except Exception:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
if source_path.exists():
|
||||
shutil.copy2(backup_path, source_path)
|
||||
raise
|
||||
|
||||
return backup_path
|
||||
|
||||
|
||||
def _selected_verdicts(
|
||||
*,
|
||||
competence_id: str,
|
||||
verdict_ids: list[str],
|
||||
verdict_log_path: Path | str,
|
||||
) -> list[Dict[str, Any]]:
|
||||
all_records = iter_competence_verdicts(
|
||||
log_path=verdict_log_path,
|
||||
competence_id=competence_id,
|
||||
)
|
||||
by_id = {str(record.get("verdict_id")): record for record in all_records}
|
||||
missing = [verdict_id for verdict_id in verdict_ids if verdict_id not in by_id]
|
||||
if missing:
|
||||
raise CompetencePromotionError(
|
||||
"Verdicts introuvables: " + ", ".join(missing)
|
||||
)
|
||||
return [by_id[verdict_id] for verdict_id in verdict_ids]
|
||||
|
||||
|
||||
def _evidence_summary(verdicts: list[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
return {
|
||||
"counts": _verdict_counts(verdicts),
|
||||
"distinct_contexts": len(_distinct_contexts([
|
||||
verdict for verdict in verdicts
|
||||
if verdict.get("verdict_kind") == "valid"
|
||||
])),
|
||||
"verdicts": [
|
||||
{
|
||||
"verdict_id": verdict.get("verdict_id"),
|
||||
"verdict_kind": verdict.get("verdict_kind"),
|
||||
"verdict_at": verdict.get("verdict_at"),
|
||||
"workflow_id": verdict.get("workflow_id", ""),
|
||||
"machine_id": (verdict.get("context_signature") or {}).get("machine_id", ""),
|
||||
"step_results_count": len(verdict.get("step_results") or []),
|
||||
}
|
||||
for verdict in verdicts
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _verdict_counts(verdicts: list[Dict[str, Any]]) -> Dict[str, int]:
|
||||
return {
|
||||
"valid": sum(1 for item in verdicts if item.get("verdict_kind") == "valid"),
|
||||
"invalid": sum(1 for item in verdicts if item.get("verdict_kind") == "invalid"),
|
||||
"inconclusive": sum(
|
||||
1 for item in verdicts if item.get("verdict_kind") == "inconclusive"
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _distinct_contexts(verdicts: list[Dict[str, Any]]) -> set[str]:
|
||||
contexts: set[str] = set()
|
||||
for verdict in verdicts:
|
||||
context = verdict.get("context_signature") or {}
|
||||
parts = [
|
||||
str(context.get("machine_id") or ""),
|
||||
str(context.get("os_name") or ""),
|
||||
str(context.get("os_version") or ""),
|
||||
str(context.get("keyboard_layout") or ""),
|
||||
str(context.get("screen_resolution") or ""),
|
||||
str(context.get("scaling") or ""),
|
||||
str(context.get("app_name") or ""),
|
||||
str(context.get("app_version") or ""),
|
||||
str(context.get("screen_state_initial") or ""),
|
||||
str(context.get("screen_state_after_action") or ""),
|
||||
]
|
||||
contexts.add("|".join(parts))
|
||||
return contexts
|
||||
|
||||
|
||||
def _regression_suspected(verdicts: list[Dict[str, Any]]) -> bool:
|
||||
latest = sorted(
|
||||
verdicts,
|
||||
key=lambda item: str(item.get("verdict_at") or ""),
|
||||
reverse=True,
|
||||
)[:3]
|
||||
return len(latest) == 3 and all(
|
||||
item.get("verdict_kind") == "invalid" for item in latest
|
||||
)
|
||||
|
||||
|
||||
def _is_explained(verdict: Dict[str, Any]) -> bool:
|
||||
evidence = verdict.get("evidence") if isinstance(verdict.get("evidence"), dict) else {}
|
||||
if evidence.get("explained") is True:
|
||||
return True
|
||||
return bool(str(verdict.get("comments") or "").strip())
|
||||
|
||||
|
||||
def _available_targets(current_state: str) -> list[str]:
|
||||
if current_state == "observed":
|
||||
return ["candidate"]
|
||||
if current_state == "candidate":
|
||||
return ["stable"]
|
||||
return []
|
||||
|
||||
|
||||
def _target_state(payload: Dict[str, Any]) -> str:
|
||||
target = _text(payload.get("target_state"), "target_state")
|
||||
if target not in PROMOTABLE_STATES:
|
||||
raise CompetencePromotionError("target_state doit etre candidate ou stable")
|
||||
return target
|
||||
|
||||
|
||||
def _promotion_id(payload: Dict[str, Any], *, dry_run: bool) -> str:
|
||||
value = payload.get("promotion_id")
|
||||
if value is None and dry_run:
|
||||
return str(uuid.uuid4())
|
||||
text = _text(value, "promotion_id")
|
||||
_validate_uuid(text, field="promotion_id")
|
||||
return text
|
||||
|
||||
|
||||
def _verdict_ids(value: Any) -> list[str]:
|
||||
if not isinstance(value, list) or not value:
|
||||
raise CompetencePromotionError("verdict_ids doit etre une liste non vide")
|
||||
verdict_ids: list[str] = []
|
||||
for item in value:
|
||||
text = _text(item, "verdict_id")
|
||||
_validate_uuid(text, field="verdict_id")
|
||||
verdict_ids.append(text)
|
||||
return verdict_ids
|
||||
|
||||
|
||||
def _text(value: Any, field: str) -> str:
|
||||
if not isinstance(value, str) or not value.strip():
|
||||
raise CompetencePromotionError(f"{field} requis")
|
||||
return value.strip()
|
||||
|
||||
|
||||
def _validate_uuid(value: str, *, field: str) -> None:
|
||||
try:
|
||||
parsed = uuid.UUID(value, version=4)
|
||||
except ValueError as exc:
|
||||
raise CompetencePromotionError(f"{field} doit etre un UUID v4") from exc
|
||||
if str(parsed) != value.lower():
|
||||
raise CompetencePromotionError(f"{field} UUID v4 invalide")
|
||||
|
||||
|
||||
def _timestamp(now: Optional[datetime]) -> str:
|
||||
timestamp = now or datetime.now(timezone.utc)
|
||||
if timestamp.tzinfo is None:
|
||||
timestamp = timestamp.replace(tzinfo=timezone.utc)
|
||||
return timestamp.astimezone(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _dry_run_token(
|
||||
*,
|
||||
promotion_id: str,
|
||||
competence_id: str,
|
||||
target_state: str,
|
||||
verdict_ids: list[str],
|
||||
source_text: str,
|
||||
updated_text: str,
|
||||
) -> str:
|
||||
payload = {
|
||||
"promotion_id": promotion_id,
|
||||
"competence_id": competence_id,
|
||||
"target_state": target_state,
|
||||
"verdict_ids": verdict_ids,
|
||||
"source_hash": hashlib.sha256(source_text.encode("utf-8")).hexdigest(),
|
||||
"updated_hash": hashlib.sha256(updated_text.encode("utf-8")).hexdigest(),
|
||||
}
|
||||
raw = json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
|
||||
return hashlib.sha256(raw).hexdigest()
|
||||
|
||||
|
||||
def _find_existing_promotion(
|
||||
promotion_id: str,
|
||||
*,
|
||||
log_path: Path,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
for record in iter_competence_promotions(log_path=log_path):
|
||||
if record.get("promotion_id") == promotion_id:
|
||||
return record
|
||||
return None
|
||||
|
||||
|
||||
def _load_yaml_mapping(path: Path) -> Dict[str, Any]:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
data = yaml.safe_load(handle) or {}
|
||||
if not isinstance(data, dict):
|
||||
raise CompetencePromotionError(f"{path} doit contenir un objet YAML")
|
||||
return data
|
||||
|
||||
|
||||
def _absolute_source_path(source_path: str) -> Path:
|
||||
path = Path(source_path)
|
||||
if path.is_absolute():
|
||||
return path
|
||||
return REPO_ROOT / path
|
||||
|
||||
|
||||
def _relative_path(path: Path) -> str:
|
||||
try:
|
||||
return str(path.resolve().relative_to(REPO_ROOT.resolve()))
|
||||
except ValueError:
|
||||
return str(path)
|
||||
|
||||
|
||||
def _latest_verdict_at(verdicts: list[Dict[str, Any]]) -> str:
|
||||
values = [str(item.get("verdict_at") or "") for item in verdicts]
|
||||
return max(values) if values else ""
|
||||
|
||||
|
||||
def _append_jsonl(log_path: Path, record: Dict[str, Any]) -> None:
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with log_path.open("a", encoding="utf-8") as handle:
|
||||
handle.write(json.dumps(record, ensure_ascii=False, sort_keys=True))
|
||||
handle.write("\n")
|
||||
Reference in New Issue
Block a user