#!/usr/bin/env python3 """ Helpers partagés pour les règles d'administration. """ from __future__ import annotations from copy import deepcopy from pathlib import Path from typing import Any import re try: import yaml except Exception: yaml = None from config_defaults import CONFIG_DIR, deep_merge_dict DEFAULT_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.default.yml" RUNTIME_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.yml" _RUNTIME_ADMIN_RULES_OVERLAY_TEXT = """# Surcharge locale des règles d'administration. # Ce fichier est optionnel. Les règles actives de config/admin_rules.default.yml # restent valides tant qu'aucune surcharge locale n'est définie ici. # # Exemple : # version: 1 # rules: # - id: rule_identifier_1234567 # status: active # governance: # approved_by: responsable_qualite version: 1 rules: [] """ _FALLBACK_DEFAULT_ADMIN_RULES_DICT: dict[str, Any] = { "version": 1, "rules": [], } def _is_non_empty_string(value: Any) -> bool: return isinstance(value, str) and bool(value.strip()) def read_default_admin_rules_text() -> str: try: return DEFAULT_ADMIN_RULES_CONFIG_PATH.read_text(encoding="utf-8") except Exception: return "version: 1\nrules: []\n" def read_runtime_admin_rules_overlay_text() -> str: return _RUNTIME_ADMIN_RULES_OVERLAY_TEXT def load_default_admin_rules_dict() -> dict[str, Any]: if yaml is None: return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT) try: loaded = yaml.safe_load(read_default_admin_rules_text()) or {} if isinstance(loaded, dict): return loaded except Exception: pass return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT) def load_runtime_admin_rules_overlay_dict(path: Path | None = None) -> dict[str, Any]: target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH if not target.exists() or yaml is None: return {} try: loaded = yaml.safe_load(target.read_text(encoding="utf-8")) or {} if isinstance(loaded, dict): return loaded except Exception: pass return {} def _merge_rules_by_id(base_rules: list[dict[str, Any]], overlay_rules: list[dict[str, Any]]) -> list[dict[str, Any]]: merged: list[dict[str, Any]] = [deepcopy(rule) for rule in base_rules] index_by_id = { rule.get("id"): idx for idx, rule in enumerate(merged) if isinstance(rule, dict) and _is_non_empty_string(rule.get("id")) } for overlay_rule in overlay_rules: if not isinstance(overlay_rule, dict): continue rule_id = overlay_rule.get("id") if _is_non_empty_string(rule_id) and rule_id in index_by_id: idx = index_by_id[rule_id] merged[idx] = deep_merge_dict(merged[idx], overlay_rule) else: merged.append(deepcopy(overlay_rule)) if _is_non_empty_string(rule_id): index_by_id[rule_id] = len(merged) - 1 return merged def merge_admin_rules_dict(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]: merged = deep_merge_dict(base, {k: v for k, v in overlay.items() if k != "rules"}) merged["rules"] = _merge_rules_by_id(base.get("rules", []) or [], overlay.get("rules", []) or []) return merged def load_effective_admin_rules_dict(path: Path | None = None) -> dict[str, Any]: return merge_admin_rules_dict( load_default_admin_rules_dict(), load_runtime_admin_rules_overlay_dict(path), ) def ensure_runtime_admin_rules_config(path: Path | None = None) -> Path: target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH if not target.exists(): target.parent.mkdir(parents=True, exist_ok=True) target.write_text(read_runtime_admin_rules_overlay_text(), encoding="utf-8") return target def _dedupe_keep_order(values: list[str]) -> list[str]: seen: set[str] = set() output: list[str] = [] for value in values: if value in seen: continue seen.add(value) output.append(value) return output def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]: rule_type = rule.get("type") match = rule.get("match") or {} normalization = rule.get("normalization") or {} variants: list[str] = [] if rule_type in {"exact_term", "preserve_phrase"}: exact_value = str(match.get("exact_value", "")).strip() return [exact_value] if exact_value else [] if rule_type == "normalized_identifier": canonical = str(match.get("canonical_value", "")).strip() prefixes = normalization.get("accepted_prefixes") or [] separators = normalization.get("prefix_value_separators") or [" "] if normalization.get("allow_bare_value", False) and canonical: variants.append(canonical) for prefix in prefixes: for separator in separators: variants.append(f"{prefix}{separator}{canonical}") if normalization.get("multiline", False): variants.append(f"{prefix}\n{canonical}") return _dedupe_keep_order(variants)[:limit] if rule_type == "contextual_identifier": canonical = str(match.get("canonical_value", "")).strip() prefixes = match.get("context_prefixes") or [] separators = match.get("context_separators") or [": ", ":"] for prefix in prefixes: for separator in separators: variants.append(f"{prefix}{separator}{canonical}") if (rule.get("normalization") or {}).get("multiline", False): variants.append(f"{prefix}\n{canonical}") variants.append(f"{prefix} :\n{canonical}") return _dedupe_keep_order(variants)[:limit] return [] VALID_TYPES = { "exact_term", "normalized_identifier", "contextual_identifier", "preserve_phrase", } VALID_ACTIONS = {"mask", "preserve"} VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"} VALID_ENVIRONMENTS = {"test", "staging", "prod"} VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"} def validate_rules_config(data: dict[str, Any]) -> list[str]: errors: list[str] = [] version = data.get("version") if not isinstance(version, int) or version < 1: errors.append("`version` doit etre un entier >= 1.") rules = data.get("rules") if not isinstance(rules, list): errors.append("`rules` doit etre une liste.") return errors seen_ids: set[str] = set() for index, rule in enumerate(rules): prefix = f"rules[{index}]" if not isinstance(rule, dict): errors.append(f"{prefix}: chaque regle doit etre un mapping.") continue rule_id = rule.get("id") if not _is_non_empty_string(rule_id): errors.append(f"{prefix}: `id` est obligatoire.") elif rule_id in seen_ids: errors.append(f"{prefix}: `id` duplique `{rule_id}`.") else: seen_ids.add(rule_id) if not _is_non_empty_string(rule.get("label")): errors.append(f"{prefix}: `label` est obligatoire.") rule_type = rule.get("type") if rule_type not in VALID_TYPES: errors.append(f"{prefix}: `type` invalide.") action = rule.get("action") if action not in VALID_ACTIONS: errors.append(f"{prefix}: `action` invalide.") status = rule.get("status") if status not in VALID_STATUSES: errors.append(f"{prefix}: `status` invalide.") if action == "mask" and not _is_non_empty_string(rule.get("placeholder")): errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.") match = rule.get("match") if not isinstance(match, dict): errors.append(f"{prefix}: `match` doit etre un mapping.") match = {} normalization = rule.get("normalization") or {} if normalization and not isinstance(normalization, dict): errors.append(f"{prefix}: `normalization` doit etre un mapping.") normalization = {} scope = rule.get("scope") if not isinstance(scope, dict): errors.append(f"{prefix}: `scope` doit etre un mapping.") scope = {} governance = rule.get("governance") if not isinstance(governance, dict): errors.append(f"{prefix}: `governance` doit etre un mapping.") governance = {} document_families = scope.get("document_families") if not isinstance(document_families, list) or not document_families: errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.") environments = scope.get("environments") if not isinstance(environments, list) or not environments: errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.") else: invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS] if invalid_envs: errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.") sections = scope.get("sections") if not isinstance(sections, list) or not sections: errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.") else: invalid_sections = [value for value in sections if value not in VALID_SECTIONS] if invalid_sections: errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.") if not _is_non_empty_string(governance.get("owner")): errors.append(f"{prefix}: `governance.owner` est obligatoire.") if not _is_non_empty_string(governance.get("justification")): errors.append(f"{prefix}: `governance.justification` est obligatoire.") if not _is_non_empty_string(governance.get("created_at")): errors.append(f"{prefix}: `governance.created_at` est obligatoire.") tests = governance.get("tests") if not isinstance(tests, dict): errors.append(f"{prefix}: `governance.tests` doit etre un mapping.") tests = {} required_case_ids = tests.get("required_case_ids") if not isinstance(required_case_ids, list) or not required_case_ids: errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.") if rule_type == "exact_term": if not _is_non_empty_string(match.get("exact_value")): errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.") if rule_type == "preserve_phrase": if action != "preserve": errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.") if not _is_non_empty_string(match.get("exact_value")): errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.") if rule_type == "normalized_identifier": if not _is_non_empty_string(match.get("canonical_value")): errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.") if rule_type == "contextual_identifier": if not _is_non_empty_string(match.get("canonical_value")): errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.") context_prefixes = match.get("context_prefixes") if not isinstance(context_prefixes, list) or not context_prefixes: errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.") if status == "active" and governance.get("review_required_for_activation", False): if not _is_non_empty_string(governance.get("approved_by")): errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.") return errors def _placeholder_to_kind(placeholder: str) -> str: if isinstance(placeholder, str) and placeholder.startswith("[") and placeholder.endswith("]"): return placeholder[1:-1] return "MASK" def _literal_to_pattern(text: str, multiline: bool) -> str: parts: list[str] = [] for char in text: if char == " ": parts.append(r"\s*" if multiline else r"[ \t]*") elif char == "\n": parts.append(r"\s*" if multiline else r"\n") else: parts.append(re.escape(char)) return "".join(parts) def _compile_identifier_rule(rule: dict[str, Any]) -> dict[str, Any]: rule_type = rule.get("type") normalization = rule.get("normalization") or {} multiline = bool(normalization.get("multiline", False)) flags = re.IGNORECASE if normalization.get("case_insensitive", False) else 0 value = str((rule.get("match") or {}).get("canonical_value", "")).strip() value_rx = re.escape(value) boundary_before = r"(? dict[str, Any]: compiled = { "force_mask_terms": [], "whitelist_phrases": [], "detection_rules": [], "active_rule_ids": [], } for rule in data.get("rules", []) or []: if not isinstance(rule, dict): continue if rule.get("status") != "active": continue compiled["active_rule_ids"].append(rule.get("id")) rule_type = rule.get("type") action = rule.get("action") match = rule.get("match") or {} if rule_type == "exact_term" and action == "mask": value = str(match.get("exact_value", "")).strip() if value: compiled["force_mask_terms"].append(value) elif rule_type == "preserve_phrase" and action == "preserve": value = str(match.get("exact_value", "")).strip() if value: compiled["whitelist_phrases"].append(value) elif rule_type in {"normalized_identifier", "contextual_identifier"} and action == "mask": if _is_non_empty_string(match.get("canonical_value")): compiled["detection_rules"].append(_compile_identifier_rule(rule)) compiled["force_mask_terms"] = _dedupe_keep_order(compiled["force_mask_terms"]) compiled["whitelist_phrases"] = _dedupe_keep_order(compiled["whitelist_phrases"]) return compiled