Files
rpa_vision_v3/core/system/artifact_retention.py
Dom 4b96524964 feat(system): Ajouter gestionnaires backup et version pour Dashboard
BackupExporter (backup_exporter.py):
- Export complet (workflows, correction packs, coaching sessions, configs)
- Export sélectif (workflows only, configs only, etc.)
- Export modèles entraînés opt-in (embeddings, FAISS anonymisés)
- Sanitisation des configs (masquage des secrets)
- Statistiques de backup disponibles

VersionManager (version_manager.py):
- Suivi de version avec composants
- Vérification des mises à jour (manifest local)
- Vérification intégrité packages (SHA-256)
- Création/restauration de backups pour rollback
- Information système complète

Ces modules supportent les fonctionnalités Dashboard:
- Téléchargement sauvegardes par le client
- Mise à jour du système
- Rollback en cas de problème

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 15:34:51 +01:00

293 lines
10 KiB
Python

"""
core/system/artifact_retention.py
Artifact Retention & Rotation (Fiche #21)
But : appliquer une politique de rétention simple et sûre sur les artefacts générés
en prod (failure cases, dumps watchdog, guard reports, etc.).
Principe :
- on travaille dans data/ (aucun fichier système)
- on se base sur la date du dossier (YYYY-MM-DD) quand possible, sinon mtime
- on évite les suppressions agressives : par défaut on archive les failure cases
Usage:
python -m core.system.artifact_retention
python -m core.system.artifact_retention --dry-run
Variables d'env (optionnel):
RPA_RETENTION_FAILURE_CASES_DAYS=14
RPA_RETENTION_WATCHDOG_DAYS=7
RPA_RETENTION_GUARD_REPORTS_DAYS=30
RPA_RETENTION_ARCHIVE_FAILURE_CASES=true|false
RPA_DATA_DIR=data
"""
from __future__ import annotations
import argparse
import logging
import os
import tarfile
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
logger = logging.getLogger(__name__)
def _env_int(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except Exception:
return default
def _env_bool(name: str, default: bool) -> bool:
val = os.getenv(name)
if val is None:
return default
return val.strip().lower() in {"1", "true", "yes", "y", "on"}
def _parse_date_folder(name: str) -> Optional[datetime]:
"""Parse YYYY-MM-DD."""
try:
return datetime.strptime(name, "%Y-%m-%d")
except Exception:
return None
def _is_older_than(path: Path, cutoff: datetime) -> bool:
"""Décide si un chemin est plus vieux que cutoff."""
# 1) date dans le chemin (parent YYYY-MM-DD)
for part in reversed(path.parts):
dt = _parse_date_folder(part)
if dt:
return dt < cutoff
# 2) fallback mtime
try:
return datetime.fromtimestamp(path.stat().st_mtime) < cutoff
except Exception:
return False
def _ensure_dir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
@dataclass(frozen=True)
class RetentionRule:
name: str
root: Path
days: int
mode: str # "delete" | "archive_then_delete"
class ArtifactRetention:
def __init__(self, data_dir: Path, archive_failure_cases: bool = True):
self.data_dir = data_dir
self.archive_failure_cases = archive_failure_cases
self.failure_cases_dir = self.data_dir / "failure_cases"
self.runtime_dir = self.data_dir / "runtime"
self.archives_dir = self.data_dir / "archives"
def _iter_children(self, root: Path) -> Iterable[Path]:
if not root.exists():
return []
# Date folders first, else files/dirs directly
return list(root.iterdir())
def _archive_folder(self, folder: Path, archive_root: Path, dry_run: bool) -> Tuple[bool, str]:
"""Archive un dossier en tar.gz, retourne (ok, message)."""
# Archive path: .../archives/failure_cases/YYYY-MM-DD/<folder>.tar.gz
date_part = None
for part in folder.parts:
if _parse_date_folder(part):
date_part = part
date_part = date_part or "undated"
out_dir = archive_root / date_part
out_file = out_dir / f"{folder.name}.tar.gz"
if out_file.exists():
return True, f"archive exists: {out_file}"
if dry_run:
return True, f"would archive -> {out_file}"
_ensure_dir(out_dir)
try:
with tarfile.open(out_file, "w:gz") as tar:
tar.add(folder, arcname=folder.name)
return True, f"archived -> {out_file}"
except Exception as e:
return False, f"archive failed for {folder}: {e}"
def _delete_path(self, p: Path, dry_run: bool) -> Tuple[bool, str]:
if dry_run:
return True, f"would delete: {p}"
try:
if p.is_dir():
# safe recursive delete
for child in sorted(p.rglob("*"), reverse=True):
try:
if child.is_file() or child.is_symlink():
child.unlink(missing_ok=True)
elif child.is_dir():
child.rmdir()
except Exception:
# best effort
pass
p.rmdir()
else:
p.unlink(missing_ok=True)
return True, f"deleted: {p}"
except Exception as e:
return False, f"delete failed for {p}: {e}"
def apply(self, dry_run: bool = False) -> dict:
now = datetime.now()
rules: List[RetentionRule] = [
RetentionRule(
name="failure_cases",
root=self.failure_cases_dir,
days=_env_int("RPA_RETENTION_FAILURE_CASES_DAYS", 14),
mode="archive_then_delete" if self.archive_failure_cases else "delete",
),
RetentionRule(
name="runtime_watchdog",
root=self.runtime_dir / "watchdog",
days=_env_int("RPA_RETENTION_WATCHDOG_DAYS", 7),
mode="delete",
),
RetentionRule(
name="runtime_guard_reports",
root=self.runtime_dir / "guard_reports",
days=_env_int("RPA_RETENTION_GUARD_REPORTS_DAYS", 30),
mode="delete",
),
]
results = {
"dry_run": dry_run,
"data_dir": str(self.data_dir),
"now": now.isoformat(),
"rules": [],
}
for rule in rules:
cutoff = now - timedelta(days=rule.days)
rule_res = {
"name": rule.name,
"root": str(rule.root),
"days": rule.days,
"mode": rule.mode,
"cutoff": cutoff.isoformat(),
"touched": [],
"errors": [],
}
for child in self._iter_children(rule.root):
if not _is_older_than(child, cutoff):
continue
# failure cases: structure date/case_xxx
if rule.name == "failure_cases":
# archive each case folder under date
# if we hit a date folder, archive its children
if child.is_dir() and _parse_date_folder(child.name):
for case_dir in child.iterdir():
if not case_dir.is_dir():
continue
ok, msg = (True, "")
if rule.mode == "archive_then_delete":
ok, msg = self._archive_folder(
case_dir,
self.archives_dir / "failure_cases",
dry_run,
)
if not ok:
rule_res["errors"].append(msg)
continue
rule_res["touched"].append(msg)
ok2, msg2 = self._delete_path(case_dir, dry_run)
if ok2:
rule_res["touched"].append(msg2)
else:
rule_res["errors"].append(msg2)
# remove date folder if empty
try:
if not dry_run and child.exists() and not any(child.iterdir()):
child.rmdir()
except Exception:
pass
continue
# if child is already a case folder, handle it too
if child.is_dir():
if rule.mode == "archive_then_delete":
ok, msg = self._archive_folder(
child,
self.archives_dir / "failure_cases",
dry_run,
)
if ok:
rule_res["touched"].append(msg)
else:
rule_res["errors"].append(msg)
continue
ok2, msg2 = self._delete_path(child, dry_run)
if ok2:
rule_res["touched"].append(msg2)
else:
rule_res["errors"].append(msg2)
continue
# other rules: delete the child directly (file or folder)
ok, msg = self._delete_path(child, dry_run)
if ok:
rule_res["touched"].append(msg)
else:
rule_res["errors"].append(msg)
results["rules"].append(rule_res)
return results
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(description="RPA Vision V3 - Artifact retention")
parser.add_argument("--dry-run", action="store_true", help="ne supprime rien")
parser.add_argument("--json", action="store_true", help="sortie JSON only")
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
data_dir = Path(os.getenv("RPA_DATA_DIR", "data"))
archive_failure_cases = _env_bool("RPA_RETENTION_ARCHIVE_FAILURE_CASES", True)
retention = ArtifactRetention(data_dir=data_dir, archive_failure_cases=archive_failure_cases)
res = retention.apply(dry_run=args.dry_run)
# stdout friendly
if args.json:
import json
print(json.dumps(res, indent=2, ensure_ascii=False))
return 0
# pretty summary
total_touched = sum(len(r["touched"]) for r in res["rules"])
total_errors = sum(len(r["errors"]) for r in res["rules"])
logger.info(f"Retention completed. touched={total_touched} errors={total_errors} dry_run={args.dry_run}")
for r in res["rules"]:
if r["errors"]:
logger.warning(f"Rule {r['name']} errors: {len(r['errors'])}")
return 0 if total_errors == 0 else 2
if __name__ == "__main__":
raise SystemExit(main())