Wire admin rules into ONNX anonymizer
This commit is contained in:
@@ -1,221 +1,16 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Validation semantique des regles d'administration.
|
||||
"""
|
||||
"""Validation semantique des regles d'administration."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
import sys
|
||||
|
||||
import yaml
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
VALID_TYPES = {
|
||||
"exact_term",
|
||||
"normalized_identifier",
|
||||
"contextual_identifier",
|
||||
"preserve_phrase",
|
||||
}
|
||||
VALID_ACTIONS = {"mask", "preserve"}
|
||||
VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"}
|
||||
VALID_ENVIRONMENTS = {"test", "staging", "prod"}
|
||||
VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"}
|
||||
|
||||
|
||||
def load_rules_config(path: Path) -> dict[str, Any]:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
data = yaml.safe_load(handle) or {}
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Le fichier doit contenir un mapping YAML en racine.")
|
||||
return data
|
||||
|
||||
|
||||
def _is_non_empty_string(value: Any) -> bool:
|
||||
return isinstance(value, str) and bool(value.strip())
|
||||
|
||||
|
||||
def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]:
|
||||
rule_type = rule.get("type")
|
||||
match = rule.get("match") or {}
|
||||
normalization = rule.get("normalization") or {}
|
||||
variants: list[str] = []
|
||||
|
||||
if rule_type in {"exact_term", "preserve_phrase"}:
|
||||
exact_value = str(match.get("exact_value", "")).strip()
|
||||
return [exact_value] if exact_value else []
|
||||
|
||||
if rule_type == "normalized_identifier":
|
||||
canonical = str(match.get("canonical_value", "")).strip()
|
||||
prefixes = normalization.get("accepted_prefixes") or []
|
||||
separators = normalization.get("prefix_value_separators") or [" "]
|
||||
if normalization.get("allow_bare_value", False) and canonical:
|
||||
variants.append(canonical)
|
||||
for prefix in prefixes:
|
||||
for separator in separators:
|
||||
variants.append(f"{prefix}{separator}{canonical}")
|
||||
if normalization.get("multiline", False):
|
||||
variants.append(f"{prefix}\n{canonical}")
|
||||
return _dedupe_keep_order(variants)[:limit]
|
||||
|
||||
if rule_type == "contextual_identifier":
|
||||
canonical = str(match.get("canonical_value", "")).strip()
|
||||
prefixes = match.get("context_prefixes") or []
|
||||
separators = match.get("context_separators") or [": ", ":"]
|
||||
for prefix in prefixes:
|
||||
for separator in separators:
|
||||
variants.append(f"{prefix}{separator}{canonical}")
|
||||
if (rule.get("normalization") or {}).get("multiline", False):
|
||||
variants.append(f"{prefix}\n{canonical}")
|
||||
variants.append(f"{prefix} :\n{canonical}")
|
||||
return _dedupe_keep_order(variants)[:limit]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def _dedupe_keep_order(values: list[str]) -> list[str]:
|
||||
seen: set[str] = set()
|
||||
output: list[str] = []
|
||||
for value in values:
|
||||
if value in seen:
|
||||
continue
|
||||
seen.add(value)
|
||||
output.append(value)
|
||||
return output
|
||||
|
||||
|
||||
def validate_rules_config(data: dict[str, Any]) -> list[str]:
|
||||
errors: list[str] = []
|
||||
|
||||
version = data.get("version")
|
||||
if not isinstance(version, int) or version < 1:
|
||||
errors.append("`version` doit etre un entier >= 1.")
|
||||
|
||||
rules = data.get("rules")
|
||||
if not isinstance(rules, list):
|
||||
errors.append("`rules` doit etre une liste.")
|
||||
return errors
|
||||
|
||||
seen_ids: set[str] = set()
|
||||
for index, rule in enumerate(rules):
|
||||
prefix = f"rules[{index}]"
|
||||
if not isinstance(rule, dict):
|
||||
errors.append(f"{prefix}: chaque regle doit etre un mapping.")
|
||||
continue
|
||||
|
||||
rule_id = rule.get("id")
|
||||
if not _is_non_empty_string(rule_id):
|
||||
errors.append(f"{prefix}: `id` est obligatoire.")
|
||||
elif rule_id in seen_ids:
|
||||
errors.append(f"{prefix}: `id` duplique `{rule_id}`.")
|
||||
else:
|
||||
seen_ids.add(rule_id)
|
||||
|
||||
if not _is_non_empty_string(rule.get("label")):
|
||||
errors.append(f"{prefix}: `label` est obligatoire.")
|
||||
|
||||
rule_type = rule.get("type")
|
||||
if rule_type not in VALID_TYPES:
|
||||
errors.append(f"{prefix}: `type` invalide.")
|
||||
|
||||
action = rule.get("action")
|
||||
if action not in VALID_ACTIONS:
|
||||
errors.append(f"{prefix}: `action` invalide.")
|
||||
|
||||
status = rule.get("status")
|
||||
if status not in VALID_STATUSES:
|
||||
errors.append(f"{prefix}: `status` invalide.")
|
||||
|
||||
if action == "mask" and not _is_non_empty_string(rule.get("placeholder")):
|
||||
errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.")
|
||||
|
||||
match = rule.get("match")
|
||||
if not isinstance(match, dict):
|
||||
errors.append(f"{prefix}: `match` doit etre un mapping.")
|
||||
match = {}
|
||||
|
||||
normalization = rule.get("normalization") or {}
|
||||
if normalization and not isinstance(normalization, dict):
|
||||
errors.append(f"{prefix}: `normalization` doit etre un mapping.")
|
||||
normalization = {}
|
||||
|
||||
scope = rule.get("scope")
|
||||
if not isinstance(scope, dict):
|
||||
errors.append(f"{prefix}: `scope` doit etre un mapping.")
|
||||
scope = {}
|
||||
|
||||
governance = rule.get("governance")
|
||||
if not isinstance(governance, dict):
|
||||
errors.append(f"{prefix}: `governance` doit etre un mapping.")
|
||||
governance = {}
|
||||
|
||||
document_families = scope.get("document_families")
|
||||
if not isinstance(document_families, list) or not document_families:
|
||||
errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.")
|
||||
|
||||
environments = scope.get("environments")
|
||||
if not isinstance(environments, list) or not environments:
|
||||
errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.")
|
||||
else:
|
||||
invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS]
|
||||
if invalid_envs:
|
||||
errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.")
|
||||
|
||||
sections = scope.get("sections")
|
||||
if not isinstance(sections, list) or not sections:
|
||||
errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.")
|
||||
else:
|
||||
invalid_sections = [value for value in sections if value not in VALID_SECTIONS]
|
||||
if invalid_sections:
|
||||
errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.")
|
||||
|
||||
if not _is_non_empty_string(governance.get("owner")):
|
||||
errors.append(f"{prefix}: `governance.owner` est obligatoire.")
|
||||
if not _is_non_empty_string(governance.get("justification")):
|
||||
errors.append(f"{prefix}: `governance.justification` est obligatoire.")
|
||||
if not _is_non_empty_string(governance.get("created_at")):
|
||||
errors.append(f"{prefix}: `governance.created_at` est obligatoire.")
|
||||
|
||||
tests = governance.get("tests")
|
||||
if not isinstance(tests, dict):
|
||||
errors.append(f"{prefix}: `governance.tests` doit etre un mapping.")
|
||||
tests = {}
|
||||
required_case_ids = tests.get("required_case_ids")
|
||||
if not isinstance(required_case_ids, list) or not required_case_ids:
|
||||
errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.")
|
||||
|
||||
if rule_type == "exact_term":
|
||||
if not _is_non_empty_string(match.get("exact_value")):
|
||||
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.")
|
||||
|
||||
if rule_type == "preserve_phrase":
|
||||
if action != "preserve":
|
||||
errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.")
|
||||
if not _is_non_empty_string(match.get("exact_value")):
|
||||
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.")
|
||||
|
||||
if rule_type == "normalized_identifier":
|
||||
if not _is_non_empty_string(match.get("canonical_value")):
|
||||
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.")
|
||||
prefixes = normalization.get("accepted_prefixes", [])
|
||||
if prefixes and not isinstance(prefixes, list):
|
||||
errors.append(f"{prefix}: `normalization.accepted_prefixes` doit etre une liste.")
|
||||
separators = normalization.get("prefix_value_separators", [])
|
||||
if separators and not isinstance(separators, list):
|
||||
errors.append(f"{prefix}: `normalization.prefix_value_separators` doit etre une liste.")
|
||||
|
||||
if rule_type == "contextual_identifier":
|
||||
if not _is_non_empty_string(match.get("canonical_value")):
|
||||
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.")
|
||||
context_prefixes = match.get("context_prefixes")
|
||||
if not isinstance(context_prefixes, list) or not context_prefixes:
|
||||
errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.")
|
||||
|
||||
if status == "active" and governance.get("review_required_for_activation", False):
|
||||
if not _is_non_empty_string(governance.get("approved_by")):
|
||||
errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.")
|
||||
|
||||
return errors
|
||||
from admin_rules import generate_rule_variants, load_effective_admin_rules_dict, validate_rules_config
|
||||
|
||||
|
||||
def main() -> int:
|
||||
@@ -233,7 +28,7 @@ def main() -> int:
|
||||
args = parser.parse_args()
|
||||
|
||||
config_path = Path(args.config)
|
||||
data = load_rules_config(config_path)
|
||||
data = load_effective_admin_rules_dict(config_path)
|
||||
errors = validate_rules_config(data)
|
||||
|
||||
if errors:
|
||||
|
||||
Reference in New Issue
Block a user