Files
anonymisation/admin_rules.py

407 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Helpers partagés pour les règles d'administration.
"""
from __future__ import annotations
from copy import deepcopy
from pathlib import Path
from typing import Any
import re
try:
import yaml
except Exception:
yaml = None
from config_defaults import CONFIG_DIR, deep_merge_dict
DEFAULT_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.default.yml"
RUNTIME_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.yml"
_RUNTIME_ADMIN_RULES_OVERLAY_TEXT = """# Surcharge locale des règles d'administration.
# Ce fichier est optionnel. Les règles actives de config/admin_rules.default.yml
# restent valides tant qu'aucune surcharge locale n'est définie ici.
#
# Exemple :
# version: 1
# rules:
# - id: rule_identifier_1234567
# status: active
# governance:
# approved_by: responsable_qualite
version: 1
rules: []
"""
_FALLBACK_DEFAULT_ADMIN_RULES_DICT: dict[str, Any] = {
"version": 1,
"rules": [],
}
def _is_non_empty_string(value: Any) -> bool:
return isinstance(value, str) and bool(value.strip())
def read_default_admin_rules_text() -> str:
try:
return DEFAULT_ADMIN_RULES_CONFIG_PATH.read_text(encoding="utf-8")
except Exception:
return "version: 1\nrules: []\n"
def read_runtime_admin_rules_overlay_text() -> str:
return _RUNTIME_ADMIN_RULES_OVERLAY_TEXT
def load_default_admin_rules_dict() -> dict[str, Any]:
if yaml is None:
return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT)
try:
loaded = yaml.safe_load(read_default_admin_rules_text()) or {}
if isinstance(loaded, dict):
return loaded
except Exception:
pass
return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT)
def load_runtime_admin_rules_overlay_dict(path: Path | None = None) -> dict[str, Any]:
target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH
if not target.exists() or yaml is None:
return {}
try:
loaded = yaml.safe_load(target.read_text(encoding="utf-8")) or {}
if isinstance(loaded, dict):
return loaded
except Exception:
pass
return {}
def _merge_rules_by_id(base_rules: list[dict[str, Any]], overlay_rules: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged: list[dict[str, Any]] = [deepcopy(rule) for rule in base_rules]
index_by_id = {
rule.get("id"): idx
for idx, rule in enumerate(merged)
if isinstance(rule, dict) and _is_non_empty_string(rule.get("id"))
}
for overlay_rule in overlay_rules:
if not isinstance(overlay_rule, dict):
continue
rule_id = overlay_rule.get("id")
if _is_non_empty_string(rule_id) and rule_id in index_by_id:
idx = index_by_id[rule_id]
merged[idx] = deep_merge_dict(merged[idx], overlay_rule)
else:
merged.append(deepcopy(overlay_rule))
if _is_non_empty_string(rule_id):
index_by_id[rule_id] = len(merged) - 1
return merged
def merge_admin_rules_dict(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
merged = deep_merge_dict(base, {k: v for k, v in overlay.items() if k != "rules"})
merged["rules"] = _merge_rules_by_id(base.get("rules", []) or [], overlay.get("rules", []) or [])
return merged
def load_effective_admin_rules_dict(path: Path | None = None) -> dict[str, Any]:
return merge_admin_rules_dict(
load_default_admin_rules_dict(),
load_runtime_admin_rules_overlay_dict(path),
)
def ensure_runtime_admin_rules_config(path: Path | None = None) -> Path:
target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH
if not target.exists():
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(read_runtime_admin_rules_overlay_text(), encoding="utf-8")
return target
def _dedupe_keep_order(values: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for value in values:
if value in seen:
continue
seen.add(value)
output.append(value)
return output
def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]:
rule_type = rule.get("type")
match = rule.get("match") or {}
normalization = rule.get("normalization") or {}
variants: list[str] = []
if rule_type in {"exact_term", "preserve_phrase"}:
exact_value = str(match.get("exact_value", "")).strip()
return [exact_value] if exact_value else []
if rule_type == "normalized_identifier":
canonical = str(match.get("canonical_value", "")).strip()
prefixes = normalization.get("accepted_prefixes") or []
separators = normalization.get("prefix_value_separators") or [" "]
if normalization.get("allow_bare_value", False) and canonical:
variants.append(canonical)
for prefix in prefixes:
for separator in separators:
variants.append(f"{prefix}{separator}{canonical}")
if normalization.get("multiline", False):
variants.append(f"{prefix}\n{canonical}")
return _dedupe_keep_order(variants)[:limit]
if rule_type == "contextual_identifier":
canonical = str(match.get("canonical_value", "")).strip()
prefixes = match.get("context_prefixes") or []
separators = match.get("context_separators") or [": ", ":"]
for prefix in prefixes:
for separator in separators:
variants.append(f"{prefix}{separator}{canonical}")
if (rule.get("normalization") or {}).get("multiline", False):
variants.append(f"{prefix}\n{canonical}")
variants.append(f"{prefix} :\n{canonical}")
return _dedupe_keep_order(variants)[:limit]
return []
VALID_TYPES = {
"exact_term",
"normalized_identifier",
"contextual_identifier",
"preserve_phrase",
}
VALID_ACTIONS = {"mask", "preserve"}
VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"}
VALID_ENVIRONMENTS = {"test", "staging", "prod"}
VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"}
def validate_rules_config(data: dict[str, Any]) -> list[str]:
errors: list[str] = []
version = data.get("version")
if not isinstance(version, int) or version < 1:
errors.append("`version` doit etre un entier >= 1.")
rules = data.get("rules")
if not isinstance(rules, list):
errors.append("`rules` doit etre une liste.")
return errors
seen_ids: set[str] = set()
for index, rule in enumerate(rules):
prefix = f"rules[{index}]"
if not isinstance(rule, dict):
errors.append(f"{prefix}: chaque regle doit etre un mapping.")
continue
rule_id = rule.get("id")
if not _is_non_empty_string(rule_id):
errors.append(f"{prefix}: `id` est obligatoire.")
elif rule_id in seen_ids:
errors.append(f"{prefix}: `id` duplique `{rule_id}`.")
else:
seen_ids.add(rule_id)
if not _is_non_empty_string(rule.get("label")):
errors.append(f"{prefix}: `label` est obligatoire.")
rule_type = rule.get("type")
if rule_type not in VALID_TYPES:
errors.append(f"{prefix}: `type` invalide.")
action = rule.get("action")
if action not in VALID_ACTIONS:
errors.append(f"{prefix}: `action` invalide.")
status = rule.get("status")
if status not in VALID_STATUSES:
errors.append(f"{prefix}: `status` invalide.")
if action == "mask" and not _is_non_empty_string(rule.get("placeholder")):
errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.")
match = rule.get("match")
if not isinstance(match, dict):
errors.append(f"{prefix}: `match` doit etre un mapping.")
match = {}
normalization = rule.get("normalization") or {}
if normalization and not isinstance(normalization, dict):
errors.append(f"{prefix}: `normalization` doit etre un mapping.")
normalization = {}
scope = rule.get("scope")
if not isinstance(scope, dict):
errors.append(f"{prefix}: `scope` doit etre un mapping.")
scope = {}
governance = rule.get("governance")
if not isinstance(governance, dict):
errors.append(f"{prefix}: `governance` doit etre un mapping.")
governance = {}
document_families = scope.get("document_families")
if not isinstance(document_families, list) or not document_families:
errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.")
environments = scope.get("environments")
if not isinstance(environments, list) or not environments:
errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.")
else:
invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS]
if invalid_envs:
errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.")
sections = scope.get("sections")
if not isinstance(sections, list) or not sections:
errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.")
else:
invalid_sections = [value for value in sections if value not in VALID_SECTIONS]
if invalid_sections:
errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.")
if not _is_non_empty_string(governance.get("owner")):
errors.append(f"{prefix}: `governance.owner` est obligatoire.")
if not _is_non_empty_string(governance.get("justification")):
errors.append(f"{prefix}: `governance.justification` est obligatoire.")
if not _is_non_empty_string(governance.get("created_at")):
errors.append(f"{prefix}: `governance.created_at` est obligatoire.")
tests = governance.get("tests")
if not isinstance(tests, dict):
errors.append(f"{prefix}: `governance.tests` doit etre un mapping.")
tests = {}
required_case_ids = tests.get("required_case_ids")
if not isinstance(required_case_ids, list) or not required_case_ids:
errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.")
if rule_type == "exact_term":
if not _is_non_empty_string(match.get("exact_value")):
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.")
if rule_type == "preserve_phrase":
if action != "preserve":
errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.")
if not _is_non_empty_string(match.get("exact_value")):
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.")
if rule_type == "normalized_identifier":
if not _is_non_empty_string(match.get("canonical_value")):
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.")
if rule_type == "contextual_identifier":
if not _is_non_empty_string(match.get("canonical_value")):
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.")
context_prefixes = match.get("context_prefixes")
if not isinstance(context_prefixes, list) or not context_prefixes:
errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.")
if status == "active" and governance.get("review_required_for_activation", False):
if not _is_non_empty_string(governance.get("approved_by")):
errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.")
return errors
def _placeholder_to_kind(placeholder: str) -> str:
if isinstance(placeholder, str) and placeholder.startswith("[") and placeholder.endswith("]"):
return placeholder[1:-1]
return "MASK"
def _literal_to_pattern(text: str, multiline: bool) -> str:
parts: list[str] = []
for char in text:
if char == " ":
parts.append(r"\s*" if multiline else r"[ \t]*")
elif char == "\n":
parts.append(r"\s*" if multiline else r"\n")
else:
parts.append(re.escape(char))
return "".join(parts)
def _compile_identifier_rule(rule: dict[str, Any]) -> dict[str, Any]:
rule_type = rule.get("type")
normalization = rule.get("normalization") or {}
multiline = bool(normalization.get("multiline", False))
flags = re.IGNORECASE if normalization.get("case_insensitive", False) else 0
value = str((rule.get("match") or {}).get("canonical_value", "")).strip()
value_rx = re.escape(value)
boundary_before = r"(?<![A-Za-z0-9])"
boundary_after = r"(?![A-Za-z0-9])"
patterns = []
if rule_type == "normalized_identifier":
if normalization.get("allow_bare_value", False):
patterns.append(re.compile(rf"{boundary_before}({value_rx}){boundary_after}", flags | re.MULTILINE))
prefixes = normalization.get("accepted_prefixes") or []
separators = normalization.get("prefix_value_separators") or [" "]
else:
prefixes = (rule.get("match") or {}).get("context_prefixes") or []
separators = (rule.get("match") or {}).get("context_separators") or [": ", ":"]
gap = r"\s*" if multiline else r"[ \t]*"
for prefix in prefixes:
prefix_rx = _literal_to_pattern(str(prefix), multiline)
for separator in separators:
separator_rx = _literal_to_pattern(str(separator), multiline)
patterns.append(
re.compile(
rf"{boundary_before}{prefix_rx}{separator_rx}{gap}({value_rx}){boundary_after}",
flags | re.MULTILINE,
)
)
return {
"id": rule.get("id"),
"type": rule_type,
"kind": _placeholder_to_kind(rule.get("placeholder", "[MASK]")),
"placeholder": rule.get("placeholder", "[MASK]"),
"patterns": patterns,
}
def compile_active_admin_rules(data: dict[str, Any]) -> dict[str, Any]:
compiled = {
"force_mask_terms": [],
"whitelist_phrases": [],
"detection_rules": [],
"active_rule_ids": [],
}
for rule in data.get("rules", []) or []:
if not isinstance(rule, dict):
continue
if rule.get("status") != "active":
continue
compiled["active_rule_ids"].append(rule.get("id"))
rule_type = rule.get("type")
action = rule.get("action")
match = rule.get("match") or {}
if rule_type == "exact_term" and action == "mask":
value = str(match.get("exact_value", "")).strip()
if value:
compiled["force_mask_terms"].append(value)
elif rule_type == "preserve_phrase" and action == "preserve":
value = str(match.get("exact_value", "")).strip()
if value:
compiled["whitelist_phrases"].append(value)
elif rule_type in {"normalized_identifier", "contextual_identifier"} and action == "mask":
if _is_non_empty_string(match.get("canonical_value")):
compiled["detection_rules"].append(_compile_identifier_rule(rule))
compiled["force_mask_terms"] = _dedupe_keep_order(compiled["force_mask_terms"])
compiled["whitelist_phrases"] = _dedupe_keep_order(compiled["whitelist_phrases"])
return compiled