667 lines
22 KiB
Python
667 lines
22 KiB
Python
"""Promote Lea competences from supervised verdict evidence."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import difflib
|
|
import hashlib
|
|
import json
|
|
import shutil
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, Optional
|
|
|
|
import yaml
|
|
|
|
from .catalog import (
|
|
DEFAULT_COMPETENCE_ROOT,
|
|
KNOWN_STATES,
|
|
REPO_ROOT,
|
|
load_competence_file,
|
|
)
|
|
from .replay import find_competence
|
|
from .verdicts import DEFAULT_VERDICT_LOG, iter_competence_verdicts
|
|
|
|
|
|
DEFAULT_PROMOTION_LOG = REPO_ROOT / "data" / "competences" / "promotions.jsonl"
|
|
PROMOTION_SCHEMA_VERSION = "lea_competence_promotion.v1"
|
|
PROMOTABLE_STATES = {"candidate", "stable"}
|
|
|
|
|
|
class CompetencePromotionError(ValueError):
|
|
"""Raised when a competence promotion request is invalid."""
|
|
|
|
|
|
def promote_competence_from_verdicts(
|
|
competence_id: str,
|
|
payload: Dict[str, Any],
|
|
*,
|
|
competence_root: Path | str = DEFAULT_COMPETENCE_ROOT,
|
|
verdict_log_path: Path | str = DEFAULT_VERDICT_LOG,
|
|
promotion_log_path: Path | str = DEFAULT_PROMOTION_LOG,
|
|
states: Optional[Iterable[str]] = None,
|
|
now: Optional[datetime] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Dry-run or apply a dashboard-controlled competence promotion.
|
|
|
|
``dry_run=True`` never writes. A real write requires the exact
|
|
``dry_run_token`` returned by a prior dry-run for the same evidence.
|
|
"""
|
|
|
|
if not isinstance(payload, dict):
|
|
raise CompetencePromotionError("Payload promotion invalide")
|
|
|
|
dry_run = bool(payload.get("dry_run", True))
|
|
promotion_id = _promotion_id(payload, dry_run=dry_run)
|
|
target_state = _target_state(payload)
|
|
confirmed_by = _text(payload.get("confirmed_by") or "human:dom", "confirmed_by")
|
|
verdict_ids = _verdict_ids(payload.get("verdict_ids"))
|
|
timestamp = _timestamp(now)
|
|
|
|
root = Path(competence_root)
|
|
promotion_log = Path(promotion_log_path)
|
|
|
|
existing = _find_existing_promotion(promotion_id, log_path=promotion_log)
|
|
if existing:
|
|
duplicate = dict(existing)
|
|
duplicate["duplicate"] = True
|
|
duplicate["dry_run"] = dry_run
|
|
return duplicate
|
|
|
|
plan = _build_promotion_plan(
|
|
competence_id=competence_id,
|
|
target_state=target_state,
|
|
verdict_ids=verdict_ids,
|
|
promotion_id=promotion_id,
|
|
confirmed_by=confirmed_by,
|
|
timestamp=timestamp,
|
|
competence_root=root,
|
|
verdict_log_path=verdict_log_path,
|
|
states=states,
|
|
)
|
|
|
|
if dry_run:
|
|
return {
|
|
**plan,
|
|
"dry_run": True,
|
|
"write_applied": False,
|
|
"duplicate": False,
|
|
}
|
|
|
|
provided_token = _text(payload.get("dry_run_token"), "dry_run_token")
|
|
if provided_token != plan["dry_run_token"]:
|
|
raise CompetencePromotionError("dry_run_token invalide ou absent")
|
|
if not plan["eligible"]:
|
|
raise CompetencePromotionError(
|
|
"Promotion refusee: " + "; ".join(plan["blocking_reasons"])
|
|
)
|
|
|
|
record = {
|
|
"schema_version": PROMOTION_SCHEMA_VERSION,
|
|
"promotion_id": promotion_id,
|
|
"competence_id": competence_id,
|
|
"from_state": plan["from_state"],
|
|
"to_state": target_state,
|
|
"triggered_by": confirmed_by,
|
|
"promoted_at": timestamp,
|
|
"evidence_verdict_ids": verdict_ids,
|
|
"evidence_summary": plan["evidence_summary"],
|
|
"yaml_path_before": plan["yaml_path_before"],
|
|
"yaml_path_after": plan["yaml_path_after"],
|
|
"backup_path": "",
|
|
"dry_run_token": plan["dry_run_token"],
|
|
"write_back_enabled": True,
|
|
"yaml_write": True,
|
|
"duplicate": False,
|
|
}
|
|
|
|
backup_path = _apply_yaml_plan(plan, root=root, timestamp=timestamp)
|
|
record["backup_path"] = _relative_path(backup_path)
|
|
_append_jsonl(promotion_log, record)
|
|
|
|
return {
|
|
**plan,
|
|
"dry_run": False,
|
|
"write_applied": True,
|
|
"promotion": record,
|
|
"backup_path": record["backup_path"],
|
|
"promotions_log_path": _relative_path(promotion_log),
|
|
"duplicate": False,
|
|
}
|
|
|
|
|
|
def summarize_competence_promotions(
|
|
*,
|
|
competence_root: Path | str = DEFAULT_COMPETENCE_ROOT,
|
|
verdict_log_path: Path | str = DEFAULT_VERDICT_LOG,
|
|
states: Optional[Iterable[str]] = None,
|
|
) -> list[Dict[str, Any]]:
|
|
"""Return dashboard-safe promotion state for all known competences."""
|
|
|
|
root = Path(competence_root)
|
|
summaries: list[Dict[str, Any]] = []
|
|
for state in KNOWN_STATES:
|
|
if states and state not in set(states):
|
|
continue
|
|
state_dir = root / state
|
|
if not state_dir.exists():
|
|
continue
|
|
for path in sorted(state_dir.glob("*.yaml")):
|
|
competence = load_competence_file(path, repo_root=REPO_ROOT)
|
|
verdicts = iter_competence_verdicts(
|
|
log_path=verdict_log_path,
|
|
competence_id=competence.id,
|
|
)
|
|
counts = _verdict_counts(verdicts)
|
|
valid_ids = [
|
|
str(verdict.get("verdict_id"))
|
|
for verdict in verdicts
|
|
if verdict.get("verdict_kind") == "valid" and verdict.get("verdict_id")
|
|
]
|
|
targets = {}
|
|
for target in _available_targets(competence.learning_state):
|
|
try:
|
|
plan = _build_promotion_plan(
|
|
competence_id=competence.id,
|
|
target_state=target,
|
|
verdict_ids=valid_ids,
|
|
promotion_id=str(uuid.uuid4()),
|
|
confirmed_by="dashboard:summary",
|
|
timestamp=_timestamp(None),
|
|
competence_root=root,
|
|
verdict_log_path=verdict_log_path,
|
|
states=states,
|
|
)
|
|
targets[target] = {
|
|
"eligible": plan["eligible"],
|
|
"blocking_reasons": plan["blocking_reasons"],
|
|
"recommended_verdict_ids": valid_ids,
|
|
}
|
|
except (CompetencePromotionError, KeyError) as exc:
|
|
targets[target] = {
|
|
"eligible": False,
|
|
"blocking_reasons": [str(exc)],
|
|
"recommended_verdict_ids": valid_ids,
|
|
}
|
|
|
|
summaries.append({
|
|
"id": competence.id,
|
|
"name": competence.name,
|
|
"intent_fr": competence.intent_fr,
|
|
"learning_state": competence.learning_state,
|
|
"source_path": competence.source_path,
|
|
"verdict_counts": counts,
|
|
"distinct_contexts": len(_distinct_contexts([
|
|
verdict for verdict in verdicts
|
|
if verdict.get("verdict_kind") == "valid"
|
|
])),
|
|
"latest_verdict_at": _latest_verdict_at(verdicts),
|
|
"eligible_targets": targets,
|
|
"regression_suspected": _regression_suspected(verdicts),
|
|
})
|
|
|
|
return sorted(summaries, key=lambda item: (item["learning_state"], item["id"]))
|
|
|
|
|
|
def iter_competence_promotions(
|
|
*,
|
|
log_path: Path | str = DEFAULT_PROMOTION_LOG,
|
|
competence_id: Optional[str] = None,
|
|
) -> list[Dict[str, Any]]:
|
|
log = Path(log_path)
|
|
if not log.exists():
|
|
return []
|
|
|
|
records: list[Dict[str, Any]] = []
|
|
with log.open("r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
record = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if not isinstance(record, dict):
|
|
continue
|
|
if competence_id and record.get("competence_id") != competence_id:
|
|
continue
|
|
records.append(record)
|
|
return records
|
|
|
|
|
|
def _build_promotion_plan(
|
|
*,
|
|
competence_id: str,
|
|
target_state: str,
|
|
verdict_ids: list[str],
|
|
promotion_id: str,
|
|
confirmed_by: str,
|
|
timestamp: str,
|
|
competence_root: Path,
|
|
verdict_log_path: Path | str,
|
|
states: Optional[Iterable[str]],
|
|
) -> Dict[str, Any]:
|
|
competence = find_competence(competence_id, root=competence_root, states=states)
|
|
if target_state == competence.learning_state:
|
|
raise CompetencePromotionError("target_state identique a l'etat courant")
|
|
if target_state not in _available_targets(competence.learning_state):
|
|
raise CompetencePromotionError(
|
|
f"Promotion {competence.learning_state} -> {target_state} interdite"
|
|
)
|
|
|
|
source_path = _absolute_source_path(competence.source_path)
|
|
data = _load_yaml_mapping(source_path)
|
|
verdicts = _selected_verdicts(
|
|
competence_id=competence_id,
|
|
verdict_ids=verdict_ids,
|
|
verdict_log_path=verdict_log_path,
|
|
)
|
|
|
|
evidence_summary = _evidence_summary(verdicts)
|
|
blocking_reasons = _blocking_reasons(
|
|
current_state=competence.learning_state,
|
|
target_state=target_state,
|
|
verdicts=verdicts,
|
|
all_verdicts=iter_competence_verdicts(
|
|
log_path=verdict_log_path,
|
|
competence_id=competence_id,
|
|
),
|
|
)
|
|
eligible = not blocking_reasons
|
|
|
|
updated = _updated_yaml_data(
|
|
data=data,
|
|
competence_id=competence_id,
|
|
current_state=competence.learning_state,
|
|
target_state=target_state,
|
|
verdicts=verdicts,
|
|
promotion_id=promotion_id,
|
|
confirmed_by=confirmed_by,
|
|
timestamp=timestamp,
|
|
)
|
|
current_text = source_path.read_text(encoding="utf-8")
|
|
updated_text = yaml.safe_dump(
|
|
updated,
|
|
allow_unicode=True,
|
|
sort_keys=False,
|
|
default_flow_style=False,
|
|
)
|
|
target_path = competence_root / target_state / f"{competence_id}.yaml"
|
|
yaml_diff = "\n".join(difflib.unified_diff(
|
|
current_text.splitlines(),
|
|
updated_text.splitlines(),
|
|
fromfile=_relative_path(source_path),
|
|
tofile=_relative_path(target_path),
|
|
lineterm="",
|
|
))
|
|
dry_run_token = _dry_run_token(
|
|
promotion_id=promotion_id,
|
|
competence_id=competence_id,
|
|
target_state=target_state,
|
|
verdict_ids=verdict_ids,
|
|
source_text=current_text,
|
|
updated_text=updated_text,
|
|
)
|
|
|
|
return {
|
|
"schema_version": PROMOTION_SCHEMA_VERSION,
|
|
"promotion_id": promotion_id,
|
|
"competence_id": competence_id,
|
|
"from_state": competence.learning_state,
|
|
"to_state": target_state,
|
|
"target_state": target_state,
|
|
"confirmed_by": confirmed_by,
|
|
"eligible": eligible,
|
|
"blocking_reasons": blocking_reasons,
|
|
"evidence_summary": evidence_summary,
|
|
"verdict_ids": verdict_ids,
|
|
"yaml_path_before": _relative_path(source_path),
|
|
"yaml_path_after": _relative_path(target_path),
|
|
"yaml_diff": yaml_diff,
|
|
"dry_run_token": dry_run_token,
|
|
"_source_path": source_path,
|
|
"_target_path": target_path,
|
|
"_updated_text": updated_text,
|
|
}
|
|
|
|
|
|
def _blocking_reasons(
|
|
*,
|
|
current_state: str,
|
|
target_state: str,
|
|
verdicts: list[Dict[str, Any]],
|
|
all_verdicts: list[Dict[str, Any]],
|
|
) -> list[str]:
|
|
valid = [verdict for verdict in verdicts if verdict.get("verdict_kind") == "valid"]
|
|
reasons: list[str] = []
|
|
if len(valid) != len(verdicts):
|
|
reasons.append("Tous les verdict_ids selectionnes doivent etre valid")
|
|
if not valid:
|
|
reasons.append("Au moins un verdict valid est requis")
|
|
missing_evidence = [
|
|
str(verdict.get("verdict_id"))
|
|
for verdict in valid
|
|
if not verdict.get("workflow_id") or not verdict.get("step_results")
|
|
]
|
|
if missing_evidence:
|
|
reasons.append(
|
|
"Evidence workflow_id/step_results manquante: "
|
|
+ ", ".join(missing_evidence)
|
|
)
|
|
|
|
if current_state == "candidate" and target_state == "stable":
|
|
contexts = _distinct_contexts(valid)
|
|
if len(valid) < 3:
|
|
reasons.append(f"3 verdicts valid requis pour stable ({len(valid)}/3)")
|
|
if len(contexts) < 3:
|
|
reasons.append(f"3 contextes distincts requis pour stable ({len(contexts)}/3)")
|
|
invalid_unexplained = [
|
|
verdict for verdict in all_verdicts
|
|
if verdict.get("verdict_kind") == "invalid" and not _is_explained(verdict)
|
|
]
|
|
if invalid_unexplained:
|
|
reasons.append(
|
|
"Invalid non explique present: "
|
|
+ ", ".join(str(v.get("verdict_id")) for v in invalid_unexplained)
|
|
)
|
|
return reasons
|
|
|
|
|
|
def _updated_yaml_data(
|
|
*,
|
|
data: Dict[str, Any],
|
|
competence_id: str,
|
|
current_state: str,
|
|
target_state: str,
|
|
verdicts: list[Dict[str, Any]],
|
|
promotion_id: str,
|
|
confirmed_by: str,
|
|
timestamp: str,
|
|
) -> Dict[str, Any]:
|
|
updated = json.loads(json.dumps(data, ensure_ascii=False))
|
|
updated["learning_state"] = target_state
|
|
updated["last_updated_at"] = timestamp
|
|
|
|
promotion = updated.setdefault("promotion", {})
|
|
history = promotion.setdefault("history", [])
|
|
if isinstance(history, list):
|
|
history.append({
|
|
"at": timestamp,
|
|
"from": current_state,
|
|
"to": target_state,
|
|
"by": confirmed_by,
|
|
"reason": "Promotion dashboard supervisee par verdicts humains",
|
|
"promotion_id": promotion_id,
|
|
"evidence_verdict_ids": [
|
|
verdict.get("verdict_id") for verdict in verdicts
|
|
],
|
|
})
|
|
|
|
generalisation = updated.setdefault("generalisation", {})
|
|
seen_contexts = generalisation.setdefault("seen_contexts", [])
|
|
if isinstance(seen_contexts, list):
|
|
existing_ids = {
|
|
context.get("verdict_id")
|
|
for context in seen_contexts
|
|
if isinstance(context, dict)
|
|
}
|
|
for verdict in verdicts:
|
|
verdict_id = verdict.get("verdict_id")
|
|
if verdict_id in existing_ids:
|
|
continue
|
|
context = verdict.get("context_signature") or {}
|
|
seen_contexts.append({
|
|
"at": timestamp,
|
|
"verdict_id": verdict_id,
|
|
"promotion_id": promotion_id,
|
|
"machine_id": context.get("machine_id", ""),
|
|
"workflow_id": verdict.get("workflow_id", ""),
|
|
"screen_state_initial": context.get("screen_state_initial", ""),
|
|
"screen_state_after_action": context.get("screen_state_after_action", ""),
|
|
"verdict_at": verdict.get("verdict_at", ""),
|
|
})
|
|
|
|
return updated
|
|
|
|
|
|
def _apply_yaml_plan(plan: Dict[str, Any], *, root: Path, timestamp: str) -> Path:
|
|
source_path = Path(plan["_source_path"])
|
|
target_path = Path(plan["_target_path"])
|
|
updated_text = str(plan["_updated_text"])
|
|
|
|
backup_path = source_path.with_name(
|
|
f"{source_path.name}.{timestamp.replace(':', '').replace('+', '_')}.bak"
|
|
)
|
|
shutil.copy2(source_path, backup_path)
|
|
|
|
target_path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
|
|
tmp_path.write_text(updated_text, encoding="utf-8")
|
|
|
|
try:
|
|
load_competence_file(tmp_path, repo_root=REPO_ROOT)
|
|
tmp_path.replace(target_path)
|
|
load_competence_file(target_path, repo_root=REPO_ROOT)
|
|
if source_path != target_path and source_path.exists():
|
|
source_path.unlink()
|
|
except Exception:
|
|
if tmp_path.exists():
|
|
tmp_path.unlink()
|
|
if source_path.exists():
|
|
shutil.copy2(backup_path, source_path)
|
|
raise
|
|
|
|
return backup_path
|
|
|
|
|
|
def _selected_verdicts(
|
|
*,
|
|
competence_id: str,
|
|
verdict_ids: list[str],
|
|
verdict_log_path: Path | str,
|
|
) -> list[Dict[str, Any]]:
|
|
all_records = iter_competence_verdicts(
|
|
log_path=verdict_log_path,
|
|
competence_id=competence_id,
|
|
)
|
|
by_id = {str(record.get("verdict_id")): record for record in all_records}
|
|
missing = [verdict_id for verdict_id in verdict_ids if verdict_id not in by_id]
|
|
if missing:
|
|
raise CompetencePromotionError(
|
|
"Verdicts introuvables: " + ", ".join(missing)
|
|
)
|
|
return [by_id[verdict_id] for verdict_id in verdict_ids]
|
|
|
|
|
|
def _evidence_summary(verdicts: list[Dict[str, Any]]) -> Dict[str, Any]:
|
|
return {
|
|
"counts": _verdict_counts(verdicts),
|
|
"distinct_contexts": len(_distinct_contexts([
|
|
verdict for verdict in verdicts
|
|
if verdict.get("verdict_kind") == "valid"
|
|
])),
|
|
"verdicts": [
|
|
{
|
|
"verdict_id": verdict.get("verdict_id"),
|
|
"verdict_kind": verdict.get("verdict_kind"),
|
|
"verdict_at": verdict.get("verdict_at"),
|
|
"workflow_id": verdict.get("workflow_id", ""),
|
|
"machine_id": (verdict.get("context_signature") or {}).get("machine_id", ""),
|
|
"step_results_count": len(verdict.get("step_results") or []),
|
|
}
|
|
for verdict in verdicts
|
|
],
|
|
}
|
|
|
|
|
|
def _verdict_counts(verdicts: list[Dict[str, Any]]) -> Dict[str, int]:
|
|
return {
|
|
"valid": sum(1 for item in verdicts if item.get("verdict_kind") == "valid"),
|
|
"invalid": sum(1 for item in verdicts if item.get("verdict_kind") == "invalid"),
|
|
"inconclusive": sum(
|
|
1 for item in verdicts if item.get("verdict_kind") == "inconclusive"
|
|
),
|
|
}
|
|
|
|
|
|
def _distinct_contexts(verdicts: list[Dict[str, Any]]) -> set[str]:
|
|
contexts: set[str] = set()
|
|
for verdict in verdicts:
|
|
context = verdict.get("context_signature") or {}
|
|
parts = [
|
|
str(context.get("machine_id") or ""),
|
|
str(context.get("os_name") or ""),
|
|
str(context.get("os_version") or ""),
|
|
str(context.get("keyboard_layout") or ""),
|
|
str(context.get("screen_resolution") or ""),
|
|
str(context.get("scaling") or ""),
|
|
str(context.get("app_name") or ""),
|
|
str(context.get("app_version") or ""),
|
|
str(context.get("screen_state_initial") or ""),
|
|
str(context.get("screen_state_after_action") or ""),
|
|
]
|
|
contexts.add("|".join(parts))
|
|
return contexts
|
|
|
|
|
|
def _regression_suspected(verdicts: list[Dict[str, Any]]) -> bool:
|
|
latest = sorted(
|
|
verdicts,
|
|
key=lambda item: str(item.get("verdict_at") or ""),
|
|
reverse=True,
|
|
)[:3]
|
|
return len(latest) == 3 and all(
|
|
item.get("verdict_kind") == "invalid" for item in latest
|
|
)
|
|
|
|
|
|
def _is_explained(verdict: Dict[str, Any]) -> bool:
|
|
evidence = verdict.get("evidence") if isinstance(verdict.get("evidence"), dict) else {}
|
|
if evidence.get("explained") is True:
|
|
return True
|
|
return bool(str(verdict.get("comments") or "").strip())
|
|
|
|
|
|
def _available_targets(current_state: str) -> list[str]:
|
|
if current_state == "observed":
|
|
return ["candidate"]
|
|
if current_state == "candidate":
|
|
return ["stable"]
|
|
return []
|
|
|
|
|
|
def _target_state(payload: Dict[str, Any]) -> str:
|
|
target = _text(payload.get("target_state"), "target_state")
|
|
if target not in PROMOTABLE_STATES:
|
|
raise CompetencePromotionError("target_state doit etre candidate ou stable")
|
|
return target
|
|
|
|
|
|
def _promotion_id(payload: Dict[str, Any], *, dry_run: bool) -> str:
|
|
value = payload.get("promotion_id")
|
|
if value is None and dry_run:
|
|
return str(uuid.uuid4())
|
|
text = _text(value, "promotion_id")
|
|
_validate_uuid(text, field="promotion_id")
|
|
return text
|
|
|
|
|
|
def _verdict_ids(value: Any) -> list[str]:
|
|
if not isinstance(value, list) or not value:
|
|
raise CompetencePromotionError("verdict_ids doit etre une liste non vide")
|
|
verdict_ids: list[str] = []
|
|
for item in value:
|
|
text = _text(item, "verdict_id")
|
|
_validate_uuid(text, field="verdict_id")
|
|
verdict_ids.append(text)
|
|
return verdict_ids
|
|
|
|
|
|
def _text(value: Any, field: str) -> str:
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise CompetencePromotionError(f"{field} requis")
|
|
return value.strip()
|
|
|
|
|
|
def _validate_uuid(value: str, *, field: str) -> None:
|
|
try:
|
|
parsed = uuid.UUID(value, version=4)
|
|
except ValueError as exc:
|
|
raise CompetencePromotionError(f"{field} doit etre un UUID v4") from exc
|
|
if str(parsed) != value.lower():
|
|
raise CompetencePromotionError(f"{field} UUID v4 invalide")
|
|
|
|
|
|
def _timestamp(now: Optional[datetime]) -> str:
|
|
timestamp = now or datetime.now(timezone.utc)
|
|
if timestamp.tzinfo is None:
|
|
timestamp = timestamp.replace(tzinfo=timezone.utc)
|
|
return timestamp.astimezone(timezone.utc).isoformat()
|
|
|
|
|
|
def _dry_run_token(
|
|
*,
|
|
promotion_id: str,
|
|
competence_id: str,
|
|
target_state: str,
|
|
verdict_ids: list[str],
|
|
source_text: str,
|
|
updated_text: str,
|
|
) -> str:
|
|
payload = {
|
|
"promotion_id": promotion_id,
|
|
"competence_id": competence_id,
|
|
"target_state": target_state,
|
|
"verdict_ids": verdict_ids,
|
|
"source_hash": hashlib.sha256(source_text.encode("utf-8")).hexdigest(),
|
|
"updated_hash": hashlib.sha256(updated_text.encode("utf-8")).hexdigest(),
|
|
}
|
|
raw = json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
|
|
return hashlib.sha256(raw).hexdigest()
|
|
|
|
|
|
def _find_existing_promotion(
|
|
promotion_id: str,
|
|
*,
|
|
log_path: Path,
|
|
) -> Optional[Dict[str, Any]]:
|
|
for record in iter_competence_promotions(log_path=log_path):
|
|
if record.get("promotion_id") == promotion_id:
|
|
return record
|
|
return None
|
|
|
|
|
|
def _load_yaml_mapping(path: Path) -> Dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = yaml.safe_load(handle) or {}
|
|
if not isinstance(data, dict):
|
|
raise CompetencePromotionError(f"{path} doit contenir un objet YAML")
|
|
return data
|
|
|
|
|
|
def _absolute_source_path(source_path: str) -> Path:
|
|
path = Path(source_path)
|
|
if path.is_absolute():
|
|
return path
|
|
return REPO_ROOT / path
|
|
|
|
|
|
def _relative_path(path: Path) -> str:
|
|
try:
|
|
return str(path.resolve().relative_to(REPO_ROOT.resolve()))
|
|
except ValueError:
|
|
return str(path)
|
|
|
|
|
|
def _latest_verdict_at(verdicts: list[Dict[str, Any]]) -> str:
|
|
values = [str(item.get("verdict_at") or "") for item in verdicts]
|
|
return max(values) if values else ""
|
|
|
|
|
|
def _append_jsonl(log_path: Path, record: Dict[str, Any]) -> None:
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with log_path.open("a", encoding="utf-8") as handle:
|
|
handle.write(json.dumps(record, ensure_ascii=False, sort_keys=True))
|
|
handle.write("\n")
|