""" core/system/artifact_retention.py Artifact Retention & Rotation (Fiche #21) But : appliquer une politique de rétention simple et sûre sur les artefacts générés en prod (failure cases, dumps watchdog, guard reports, etc.). Principe : - on travaille dans data/ (aucun fichier système) - on se base sur la date du dossier (YYYY-MM-DD) quand possible, sinon mtime - on évite les suppressions agressives : par défaut on archive les failure cases Usage: python -m core.system.artifact_retention python -m core.system.artifact_retention --dry-run Variables d'env (optionnel): RPA_RETENTION_FAILURE_CASES_DAYS=14 RPA_RETENTION_WATCHDOG_DAYS=7 RPA_RETENTION_GUARD_REPORTS_DAYS=30 RPA_RETENTION_ARCHIVE_FAILURE_CASES=true|false RPA_DATA_DIR=data """ from __future__ import annotations import argparse import logging import os import tarfile from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Iterable, List, Optional, Tuple logger = logging.getLogger(__name__) def _env_int(name: str, default: int) -> int: try: return int(os.getenv(name, str(default))) except Exception: return default def _env_bool(name: str, default: bool) -> bool: val = os.getenv(name) if val is None: return default return val.strip().lower() in {"1", "true", "yes", "y", "on"} def _parse_date_folder(name: str) -> Optional[datetime]: """Parse YYYY-MM-DD.""" try: return datetime.strptime(name, "%Y-%m-%d") except Exception: return None def _is_older_than(path: Path, cutoff: datetime) -> bool: """Décide si un chemin est plus vieux que cutoff.""" # 1) date dans le chemin (parent YYYY-MM-DD) for part in reversed(path.parts): dt = _parse_date_folder(part) if dt: return dt < cutoff # 2) fallback mtime try: return datetime.fromtimestamp(path.stat().st_mtime) < cutoff except Exception: return False def _ensure_dir(p: Path) -> None: p.mkdir(parents=True, exist_ok=True) @dataclass(frozen=True) class RetentionRule: name: str root: Path days: int mode: str # "delete" | "archive_then_delete" class ArtifactRetention: def __init__(self, data_dir: Path, archive_failure_cases: bool = True): self.data_dir = data_dir self.archive_failure_cases = archive_failure_cases self.failure_cases_dir = self.data_dir / "failure_cases" self.runtime_dir = self.data_dir / "runtime" self.archives_dir = self.data_dir / "archives" def _iter_children(self, root: Path) -> Iterable[Path]: if not root.exists(): return [] # Date folders first, else files/dirs directly return list(root.iterdir()) def _archive_folder(self, folder: Path, archive_root: Path, dry_run: bool) -> Tuple[bool, str]: """Archive un dossier en tar.gz, retourne (ok, message).""" # Archive path: .../archives/failure_cases/YYYY-MM-DD/.tar.gz date_part = None for part in folder.parts: if _parse_date_folder(part): date_part = part date_part = date_part or "undated" out_dir = archive_root / date_part out_file = out_dir / f"{folder.name}.tar.gz" if out_file.exists(): return True, f"archive exists: {out_file}" if dry_run: return True, f"would archive -> {out_file}" _ensure_dir(out_dir) try: with tarfile.open(out_file, "w:gz") as tar: tar.add(folder, arcname=folder.name) return True, f"archived -> {out_file}" except Exception as e: return False, f"archive failed for {folder}: {e}" def _delete_path(self, p: Path, dry_run: bool) -> Tuple[bool, str]: if dry_run: return True, f"would delete: {p}" try: if p.is_dir(): # safe recursive delete for child in sorted(p.rglob("*"), reverse=True): try: if child.is_file() or child.is_symlink(): child.unlink(missing_ok=True) elif child.is_dir(): child.rmdir() except Exception: # best effort pass p.rmdir() else: p.unlink(missing_ok=True) return True, f"deleted: {p}" except Exception as e: return False, f"delete failed for {p}: {e}" def apply(self, dry_run: bool = False) -> dict: now = datetime.now() rules: List[RetentionRule] = [ RetentionRule( name="failure_cases", root=self.failure_cases_dir, days=_env_int("RPA_RETENTION_FAILURE_CASES_DAYS", 14), mode="archive_then_delete" if self.archive_failure_cases else "delete", ), RetentionRule( name="runtime_watchdog", root=self.runtime_dir / "watchdog", days=_env_int("RPA_RETENTION_WATCHDOG_DAYS", 7), mode="delete", ), RetentionRule( name="runtime_guard_reports", root=self.runtime_dir / "guard_reports", days=_env_int("RPA_RETENTION_GUARD_REPORTS_DAYS", 30), mode="delete", ), ] results = { "dry_run": dry_run, "data_dir": str(self.data_dir), "now": now.isoformat(), "rules": [], } for rule in rules: cutoff = now - timedelta(days=rule.days) rule_res = { "name": rule.name, "root": str(rule.root), "days": rule.days, "mode": rule.mode, "cutoff": cutoff.isoformat(), "touched": [], "errors": [], } for child in self._iter_children(rule.root): if not _is_older_than(child, cutoff): continue # failure cases: structure date/case_xxx if rule.name == "failure_cases": # archive each case folder under date # if we hit a date folder, archive its children if child.is_dir() and _parse_date_folder(child.name): for case_dir in child.iterdir(): if not case_dir.is_dir(): continue ok, msg = (True, "") if rule.mode == "archive_then_delete": ok, msg = self._archive_folder( case_dir, self.archives_dir / "failure_cases", dry_run, ) if not ok: rule_res["errors"].append(msg) continue rule_res["touched"].append(msg) ok2, msg2 = self._delete_path(case_dir, dry_run) if ok2: rule_res["touched"].append(msg2) else: rule_res["errors"].append(msg2) # remove date folder if empty try: if not dry_run and child.exists() and not any(child.iterdir()): child.rmdir() except Exception: pass continue # if child is already a case folder, handle it too if child.is_dir(): if rule.mode == "archive_then_delete": ok, msg = self._archive_folder( child, self.archives_dir / "failure_cases", dry_run, ) if ok: rule_res["touched"].append(msg) else: rule_res["errors"].append(msg) continue ok2, msg2 = self._delete_path(child, dry_run) if ok2: rule_res["touched"].append(msg2) else: rule_res["errors"].append(msg2) continue # other rules: delete the child directly (file or folder) ok, msg = self._delete_path(child, dry_run) if ok: rule_res["touched"].append(msg) else: rule_res["errors"].append(msg) results["rules"].append(rule_res) return results def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser(description="RPA Vision V3 - Artifact retention") parser.add_argument("--dry-run", action="store_true", help="ne supprime rien") parser.add_argument("--json", action="store_true", help="sortie JSON only") args = parser.parse_args(argv) logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") data_dir = Path(os.getenv("RPA_DATA_DIR", "data")) archive_failure_cases = _env_bool("RPA_RETENTION_ARCHIVE_FAILURE_CASES", True) retention = ArtifactRetention(data_dir=data_dir, archive_failure_cases=archive_failure_cases) res = retention.apply(dry_run=args.dry_run) # stdout friendly if args.json: import json print(json.dumps(res, indent=2, ensure_ascii=False)) return 0 # pretty summary total_touched = sum(len(r["touched"]) for r in res["rules"]) total_errors = sum(len(r["errors"]) for r in res["rules"]) logger.info(f"Retention completed. touched={total_touched} errors={total_errors} dry_run={args.dry_run}") for r in res["rules"]: if r["errors"]: logger.warning(f"Rule {r['name']} errors: {len(r['errors'])}") return 0 if total_errors == 0 else 2 if __name__ == "__main__": raise SystemExit(main())